[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-18 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1298865134


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {
+// No in-flight transaction
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things like 
SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} bootstrap 
record(s) " +
+"at metadata.version {} from {}.",
+bootstrapMetadata.records().size(),
+metadataVersion,
+bootstrapMetadata.source());
+records.addAll(bootstrapMetadata.records());
+
+if (metadataVersion.isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. No 
metadata updates will be allowed " +
+"until the ZK metadata has been migrated");
+records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since this 
is a de-novo KRaft cluster.");
+records.add(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version " + 
bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with ZK 
migrations enabled.");
+}
+}
+
+if (metadataVersion.isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new EndTransactionRecord(), 
(short) 0));
+return ControllerResult.of(records, null);
+} else {
+return ControllerResult.atomicOf(records, null);
+}
+}
+
+static ControllerResult recordsForNonEmptyLog(
+Logger log,
+

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-17 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1297536422


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {
+// No in-flight transaction
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things like 
SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} bootstrap 
record(s) " +
+"at metadata.version {} from {}.",
+bootstrapMetadata.records().size(),
+metadataVersion,
+bootstrapMetadata.source());
+records.addAll(bootstrapMetadata.records());
+
+if (metadataVersion.isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. No 
metadata updates will be allowed " +
+"until the ZK metadata has been migrated");
+records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since this 
is a de-novo KRaft cluster.");
+records.add(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version " + 
bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with ZK 
migrations enabled.");
+}
+}
+
+if (metadataVersion.isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new EndTransactionRecord(), 
(short) 0));
+return ControllerResult.of(records, null);
+} else {
+return ControllerResult.atomicOf(records, null);
+}
+}
+
+static ControllerResult recordsForNonEmptyLog(
+Logger log,
+

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-17 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1297536422


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {
+// No in-flight transaction
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things like 
SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} bootstrap 
record(s) " +
+"at metadata.version {} from {}.",
+bootstrapMetadata.records().size(),
+metadataVersion,
+bootstrapMetadata.source());
+records.addAll(bootstrapMetadata.records());
+
+if (metadataVersion.isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. No 
metadata updates will be allowed " +
+"until the ZK metadata has been migrated");
+records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since this 
is a de-novo KRaft cluster.");
+records.add(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version " + 
bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with ZK 
migrations enabled.");
+}
+}
+
+if (metadataVersion.isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new EndTransactionRecord(), 
(short) 0));
+return ControllerResult.of(records, null);
+} else {
+return ControllerResult.atomicOf(records, null);
+}
+}
+
+static ControllerResult recordsForNonEmptyLog(
+Logger log,
+

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-17 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1297533375


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {
+// No in-flight transaction
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things like 
SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} bootstrap 
record(s) " +
+"at metadata.version {} from {}.",
+bootstrapMetadata.records().size(),
+metadataVersion,
+bootstrapMetadata.source());
+records.addAll(bootstrapMetadata.records());
+
+if (metadataVersion.isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. No 
metadata updates will be allowed " +
+"until the ZK metadata has been migrated");
+records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since this 
is a de-novo KRaft cluster.");
+records.add(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version " + 
bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with ZK 
migrations enabled.");
+}
+}
+
+if (metadataVersion.isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new EndTransactionRecord(), 
(short) 0));
+return ControllerResult.of(records, null);
+} else {
+return ControllerResult.atomicOf(records, null);
+}
+}
+
+static ControllerResult recordsForNonEmptyLog(
+Logger log,
+

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-17 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1297531045


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {
+// No in-flight transaction
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things like 
SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} bootstrap 
record(s) " +
+"at metadata.version {} from {}.",
+bootstrapMetadata.records().size(),
+metadataVersion,
+bootstrapMetadata.source());
+records.addAll(bootstrapMetadata.records());
+
+if (metadataVersion.isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. No 
metadata updates will be allowed " +
+"until the ZK metadata has been migrated");
+records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since this 
is a de-novo KRaft cluster.");
+records.add(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version " + 
bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with ZK 
migrations enabled.");
+}
+}
+
+if (metadataVersion.isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new EndTransactionRecord(), 
(short) 0));
+return ControllerResult.of(records, null);
+} else {
+return ControllerResult.atomicOf(records, null);
+}
+}
+
+static ControllerResult recordsForNonEmptyLog(
+Logger log,
+

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-17 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1297529651


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {
+// No in-flight transaction
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things like 
SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} bootstrap 
record(s) " +
+"at metadata.version {} from {}.",
+bootstrapMetadata.records().size(),
+metadataVersion,
+bootstrapMetadata.source());
+records.addAll(bootstrapMetadata.records());
+
+if (metadataVersion.isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. No 
metadata updates will be allowed " +
+"until the ZK metadata has been migrated");
+records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since this 
is a de-novo KRaft cluster.");
+records.add(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version " + 
bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with ZK 
migrations enabled.");
+}
+}
+
+if (metadataVersion.isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new EndTransactionRecord(), 
(short) 0));
+return ControllerResult.of(records, null);
+} else {
+return ControllerResult.atomicOf(records, null);
+}
+}
+
+static ControllerResult recordsForNonEmptyLog(
+Logger log,
+

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-17 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1297529002


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {
+// No in-flight transaction
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things like 
SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} bootstrap 
record(s) " +
+"at metadata.version {} from {}.",
+bootstrapMetadata.records().size(),
+metadataVersion,
+bootstrapMetadata.source());
+records.addAll(bootstrapMetadata.records());
+
+if (metadataVersion.isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. No 
metadata updates will be allowed " +
+"until the ZK metadata has been migrated");
+records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since this 
is a de-novo KRaft cluster.");
+records.add(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version " + 
bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with ZK 
migrations enabled.");
+}
+}
+
+if (metadataVersion.isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new EndTransactionRecord(), 
(short) 0));
+return ControllerResult.of(records, null);
+} else {
+return ControllerResult.atomicOf(records, null);
+}
+}
+
+static ControllerResult recordsForNonEmptyLog(
+Logger log,
+

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-17 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1297526521


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {
+// No in-flight transaction
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things like 
SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} bootstrap 
record(s) " +
+"at metadata.version {} from {}.",
+bootstrapMetadata.records().size(),
+metadataVersion,
+bootstrapMetadata.source());
+records.addAll(bootstrapMetadata.records());
+
+if (metadataVersion.isMigrationSupported()) {
+if (zkMigrationEnabled) {
+log.info("Putting the controller into pre-migration mode. No 
metadata updates will be allowed " +
+"until the ZK metadata has been migrated");
+records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
+} else {
+log.debug("Setting the ZK migration state to NONE since this 
is a de-novo KRaft cluster.");
+records.add(ZkMigrationState.NONE.toRecord());
+}
+} else {
+if (zkMigrationEnabled) {
+throw new RuntimeException("The bootstrap metadata.version " + 
bootstrapMetadata.metadataVersion() +
+" does not support ZK migrations. Cannot continue with ZK 
migrations enabled.");
+}
+}
+
+if (metadataVersion.isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new EndTransactionRecord(), 
(short) 0));
+return ControllerResult.of(records, null);
+} else {
+return ControllerResult.atomicOf(records, null);
+}
+}
+
+static ControllerResult recordsForNonEmptyLog(
+Logger log,
+

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-17 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1297525659


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {
+// No in-flight transaction
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+
+// If no records have been replayed, we need to write out the 
bootstrap records.
+// This will include the new metadata.version, as well as things like 
SCRAM
+// initialization, etc.
+log.info("The metadata log appears to be empty. Appending {} bootstrap 
record(s) " +

Review Comment:
   I think it would be good to consolidate these log messages into one message 
that says:
   - whether we had to abort a bootstrap transaction
   - whether we're de-novo or migrating
   - MV
   - # of bootstrap records
   
   and I guess that log message can be WARN. It is quite important, after 
all



##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.End

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-17 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1297521425


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {

Review Comment:
   seems like we could get rid of the "else" here and only have one place where 
we add the BeginTransactionRecord ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-17 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1297521425


##
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.AbortTransactionRecord;
+import org.apache.kafka.common.metadata.BeginTransactionRecord;
+import org.apache.kafka.common.metadata.EndTransactionRecord;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.migration.ZkMigrationState;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActivationRecordsGenerator {
+
+static ControllerResult recordsForEmptyLog(
+Logger log,
+long transactionStartOffset,
+boolean zkMigrationEnabled,
+BootstrapMetadata bootstrapMetadata,
+MetadataVersion metadataVersion
+) {
+List records = new ArrayList<>();
+
+if (transactionStartOffset != -1L) {
+// In-flight bootstrap transaction
+if (!metadataVersion.isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected partial bootstrap records 
transaction at " +
+transactionStartOffset + ", but the metadata.version " + 
metadataVersion +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected partial bootstrap records transaction at " 
+ transactionStartOffset +
+" during controller activation. Aborting this transaction 
and re-committing bootstrap records");
+records.add(new ApiMessageAndVersion(
+new AbortTransactionRecord().setReason("Controller 
failover"), (short) 0));
+records.add(new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("Bootstrap records"), 
(short) 0));
+}
+} else if (metadataVersion.isMetadataTransactionSupported()) {

Review Comment:
   seems like we could get rid of the "else" here and only have one place where 
we add the BeginTransactionRecord ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296512550


##
metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java:
##
@@ -212,10 +213,14 @@ long lastStableOffset() {
 }
 
 /**
- * @return the transaction start offset, or -1 if there is no transaction.
+ * @return the transaction start offset if present, or empty if there is 
no transaction.
  */
-long transactionStartOffset() {
-return transactionStartOffset;
+OptionalLong transactionStartOffset() {

Review Comment:
   thanks



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");
+ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK 
Migration"), (short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
+} else {
+log.warn("Starting ZK Migration without metadata transactions 
enabled. This is not safe since " +
+"a controller failover or processing error may lead to 
partially migrated metadata.");
+return CompletableFuture.completedFuture(null);
+}
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");
-// TODO use KIP-868 transaction
-ControllerWriteEvent event = new 
ControllerWriteEvent<>("Complete ZK Migration",
-new MigrationWriteOperation(
-
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
-EnumSet.of(RUNS_IN_PREMIGRATION));
+List records = new ArrayList<>(2);
+records.add(ZkMigrationState.MIGRATION.toRecord());
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new 
EndTransactionRecord(), (short) 0));
+}
+ControllerWriteEvent event = new ControllerWriteEvent<>(
+"Complete ZK Migration",
+new MigrationWriteOperation(records),
+eventFlags);
 queue.append(event);
 return event.future.thenApply(__ -> highestMigrationRecordOffset);
 }
 
 @Override
-public void abortMigration() {
+public CompletableFuture abortMigration() {
 fatalFaultHandler.handleFault("Aborting the ZK migration");

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296172460


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");
+ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK 
Migration"), (short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
+} else {
+log.warn("Starting ZK Migration without metadata transactions 
enabled. This is not safe since " +
+"a controller failover or processing error may lead to 
partially migrated metadata.");
+return CompletableFuture.completedFuture(null);
+}
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");

Review Comment:
   I think "Completing ZK Migration" is a misleading log message since the 
migration is not truly complete at this point.
   
   How about a message like "Completing initial ZooKeeper state loading process"



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");

Review Comment:
   How about a message like "Starting initial ZooKeeper state loading process" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296200794


##
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java:
##
@@ -66,6 +119,10 @@ public LogDeltaManifest(
 this.numBytes = numBytes;
 }
 
+public static Builder newBuilder() {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r129629


##
metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java:
##
@@ -212,10 +213,14 @@ long lastStableOffset() {
 }
 
 /**
- * @return the transaction start offset, or -1 if there is no transaction.
+ * @return the transaction start offset if present, or empty if there is 
no transaction.
  */
-long transactionStartOffset() {
-return transactionStartOffset;
+OptionalLong transactionStartOffset() {

Review Comment:
   I’m not loving this idea of OffsetControlManager.transactionStartOffset 
returning an OptionalLong. Seems inconsistent with the other accessors.
   
   Maybe let’s just return a regular long? I had some vague efficiency related 
reasons for preferring the negative / non-negative convention (to be fair, 
haven’t benchmarked)



##
metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java:
##
@@ -212,10 +213,14 @@ long lastStableOffset() {
 }
 
 /**
- * @return the transaction start offset, or -1 if there is no transaction.
+ * @return the transaction start offset if present, or empty if there is 
no transaction.
  */
-long transactionStartOffset() {
-return transactionStartOffset;
+OptionalLong transactionStartOffset() {

Review Comment:
   I’m not loving this idea of OffsetControlManager.transactionStartOffset 
returning an OptionalLong. Seems inconsistent with the other accessors.
   
   Maybe let’s just return a regular long? I had some vague efficiency related 
reasons for preferring the negative / non-negative convention (to be fair, 
haven’t benchmarked)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296178891


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");
+ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK 
Migration"), (short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
+} else {
+log.warn("Starting ZK Migration without metadata transactions 
enabled. This is not safe since " +
+"a controller failover or processing error may lead to 
partially migrated metadata.");
+return CompletableFuture.completedFuture(null);
+}
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");
-// TODO use KIP-868 transaction
-ControllerWriteEvent event = new 
ControllerWriteEvent<>("Complete ZK Migration",
-new MigrationWriteOperation(
-
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
-EnumSet.of(RUNS_IN_PREMIGRATION));
+List records = new ArrayList<>(2);
+records.add(ZkMigrationState.MIGRATION.toRecord());
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new 
EndTransactionRecord(), (short) 0));
+}
+ControllerWriteEvent event = new ControllerWriteEvent<>(
+"Complete ZK Migration",
+new MigrationWriteOperation(records),
+eventFlags);
 queue.append(event);
 return event.future.thenApply(__ -> highestMigrationRecordOffset);
 }
 
 @Override
-public void abortMigration() {
+public CompletableFuture abortMigration() {
 fatalFaultHandler.handleFault("Aborting the ZK migration");

Review Comment:
   If you abort the process here by calling `fatalFaultHandler.handleFault`, 
the code you have below to add an AbortTransaction will never be executed.
   
   One approach would be to just abort the process, and rely on the new 
controller that comes up to put an initial AbortTransaction into the log to 
clear the current pending transaction. We have to be able to handle that case 
anyway, since crashing during initial ZK state load is always possible.
   
   If you take this approach, then basically it means you can delete the code 
below to add an AbortTransaction. Maybe add a comment here saying that the new 
controller will abort the transaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact I

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296178891


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");
+ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK 
Migration"), (short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
+} else {
+log.warn("Starting ZK Migration without metadata transactions 
enabled. This is not safe since " +
+"a controller failover or processing error may lead to 
partially migrated metadata.");
+return CompletableFuture.completedFuture(null);
+}
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");
-// TODO use KIP-868 transaction
-ControllerWriteEvent event = new 
ControllerWriteEvent<>("Complete ZK Migration",
-new MigrationWriteOperation(
-
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
-EnumSet.of(RUNS_IN_PREMIGRATION));
+List records = new ArrayList<>(2);
+records.add(ZkMigrationState.MIGRATION.toRecord());
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new 
EndTransactionRecord(), (short) 0));
+}
+ControllerWriteEvent event = new ControllerWriteEvent<>(
+"Complete ZK Migration",
+new MigrationWriteOperation(records),
+eventFlags);
 queue.append(event);
 return event.future.thenApply(__ -> highestMigrationRecordOffset);
 }
 
 @Override
-public void abortMigration() {
+public CompletableFuture abortMigration() {
 fatalFaultHandler.handleFault("Aborting the ZK migration");

Review Comment:
   If you abort the process here by calling `fatalFaultHandler.handleFault`, 
the code you have below to add an AbortTransaction will never be executed.
   
   One approach would be to just abort the process, and rely on the new 
controller that comes up to put an initial AbortTransaction into the log to 
clear the current pending transaction. We have to be able to handle that case 
anyway, since crashing during initial ZK state load is always possible.
   
   If you take this approach, then basically it means you can delete the code 
below to add an AbortTransaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296178891


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");
+ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK 
Migration"), (short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
+} else {
+log.warn("Starting ZK Migration without metadata transactions 
enabled. This is not safe since " +
+"a controller failover or processing error may lead to 
partially migrated metadata.");
+return CompletableFuture.completedFuture(null);
+}
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");
-// TODO use KIP-868 transaction
-ControllerWriteEvent event = new 
ControllerWriteEvent<>("Complete ZK Migration",
-new MigrationWriteOperation(
-
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
-EnumSet.of(RUNS_IN_PREMIGRATION));
+List records = new ArrayList<>(2);
+records.add(ZkMigrationState.MIGRATION.toRecord());
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+records.add(new ApiMessageAndVersion(new 
EndTransactionRecord(), (short) 0));
+}
+ControllerWriteEvent event = new ControllerWriteEvent<>(
+"Complete ZK Migration",
+new MigrationWriteOperation(records),
+eventFlags);
 queue.append(event);
 return event.future.thenApply(__ -> highestMigrationRecordOffset);
 }
 
 @Override
-public void abortMigration() {
+public CompletableFuture abortMigration() {
 fatalFaultHandler.handleFault("Aborting the ZK migration");

Review Comment:
   If you abort the process here by calling `fatalFaultHandler.handleFault`, 
the code you have below to add an AbortTransaction will never be executed.
   
   One approach would be to just abort the process, and rely on the new 
controller that comes up to put an initial AbortTransaction into the log to 
clear the current pending transaction. We have to be able to handle that case 
anyway, since crashing during initial ZK state load is always possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296173796


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");

Review Comment:
   How about a message like "Starting initial ZooKeeper state rehosting 
process" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296172460


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");
+ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK 
Migration"), (short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
+} else {
+log.warn("Starting ZK Migration without metadata transactions 
enabled. This is not safe since " +
+"a controller failover or processing error may lead to 
partially migrated metadata.");
+return CompletableFuture.completedFuture(null);
+}
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");

Review Comment:
   I think "Completing ZK Migration" is a misleading log message since the 
migration is not truly complete at this point.
   
   How about a message like "Completing initial ZooKeeper state rehosting 
process"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296173796


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");

Review Comment:
   How about a message like "Starting initial ZooKeeper state lift and shift 
process" ?



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");
+ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK 
Migration"), (short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
+} else {
+log.warn("Starting ZK Migration without metadata transactions 
enabled. This is not safe since " +
+"a controller failover or processing error may lead to 
partially migrated metadata.");
+return CompletableFuture.completedFuture(null);
+}
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");

Review Comment:
   I think "Completing ZK Migration" is a misleading log message since the 
migration is not truly complete at this point.
   
   How about a message like "Completing initial ZooKeeper state lift and shift 
process"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-16 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1296172460


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +915,73 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
-log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+public CompletableFuture beginMigration() {
+if 
(featureControl.metadataVersion().isMetadataTransactionSupported()) {
+log.info("Starting ZK Migration");
+ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK 
Migration"), (short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
+} else {
+log.warn("Starting ZK Migration without metadata transactions 
enabled. This is not safe since " +
+"a controller failover or processing error may lead to 
partially migrated metadata.");
+return CompletableFuture.completedFuture(null);
+}
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");

Review Comment:
   I think "Completing ZK Migration" is a misleading log message since the 
migration is not truly complete at this point.
   
   How about a message like "Completing initial ZooKeeper state loading process"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077023


##
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java:
##
@@ -80,6 +137,7 @@ public LeaderAndEpoch leaderAndEpoch() {
 return leaderAndEpoch;
 }
 
+

Review Comment:
   whitespace not needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295076740


##
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java:
##
@@ -27,6 +27,59 @@
  * Contains information about a set of changes that were loaded from the 
metadata log.
  */
 public class LogDeltaManifest implements LoaderManifest {
+
+public static class Builder {
+private MetadataProvenance provenance;
+private LeaderAndEpoch leaderAndEpoch;
+private Integer numBatches;

Review Comment:
   seems a bit weird to use boxed primitives. It's quite inefficient in Java.
   
   If you want them to start with invalid values, just make them negative (that 
works for numBatches, elapsedNs, numBytes ... negative values for those don't 
make sense.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295091800


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Loads batches of metadata updates from Raft commits into MetadataDelta-s. 
Multiple batches from a commit
+ * are buffered into a MetadataDelta to achieve batching of records and reduce 
the number of times
+ * MetadataPublishers must be updated. This class also supports metadata 
transactions (KIP-866).
+ *
+ *
+ */
+public class MetadataBatchLoader {
+
+enum TransactionState {
+NO_TRANSACTION,
+STARTED_TRANSACTION,
+CONTINUED_TRANSACTION,
+ENDED_TRANSACTION,
+ABORTED_TRANSACTION;
+}
+
+@FunctionalInterface
+public interface MetadataUpdater {
+void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest 
manifest);
+}
+
+private final Logger log;
+private final Time time;
+private final FaultHandler faultHandler;
+private final Supplier leaderAndEpochSupplier;
+private final MetadataUpdater callback;
+
+private MetadataImage image;
+private MetadataDelta delta;
+private long lastOffset;
+private int lastEpoch;
+private long lastContainedLogTimeMs;
+private long numBytes;
+private int numBatches;
+private long totalBatchElapsedNs;
+private TransactionState transactionState;
+
+public MetadataBatchLoader(
+LogContext logContext,
+Time time,
+FaultHandler faultHandler,
+Supplier leaderAndEpochSupplier,
+MetadataUpdater callback
+) {
+this.log = logContext.logger(MetadataBatchLoader.class);
+this.time = time;
+this.faultHandler = faultHandler;
+this.leaderAndEpochSupplier = leaderAndEpochSupplier;
+this.callback = callback;
+}
+
+/**
+ * Reset the state of this batch loader to the given image. Any un-flushed 
state will be
+ * discarded.
+ *
+ * @param image Metadata image to reset this batch loader's state to.
+ */
+public void resetToImage(MetadataImage image) {
+this.image = image;
+this.delta = new MetadataDelta.Builder().setImage(image).build();
+this.transactionState = TransactionState.NO_TRANSACTION;
+this.lastOffset = image.provenance().lastContainedOffset();
+this.lastEpoch = image.provenance().lastContainedEpoch();
+this.lastContainedLogTimeMs = 
image.provenance().lastContainedLogTimeMs();
+this.numBytes = 0;
+this.numBatches = 0;
+this.totalBatchElapsedNs = 0;
+}
+
+/**
+ * Load a batch of records from the log. We have to do some bookkeeping 
here to
+ * translate between batch offsets and record offsets, and track the 
number of bytes we
+ * have read. Additionally, there is the chance that one of the records is 
a metadata
+ * version change which needs to be handled differently.
+ * 
+ * If this batch starts a transaction, any records preceding the 
transaction in this
+ * batch will be implicitly added to the transaction.
+ *
+ * @param batchThe reader which yields the batches.
+ * @return The time in nanoseconds that elapsed while loading this 
batch
+ */
+
+public long loadBatch(Batch batch) {
+long startNs = time.nanoseconds();
+int indexWithinBatch = 0;
+
+lastContainedLogTimeMs = batch

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295084331


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Loads batches of metadata updates from Raft commits into MetadataDelta-s. 
Multiple batches from a commit
+ * are buffered into a MetadataDelta to achieve batching of records and reduce 
the number of times
+ * MetadataPublishers must be updated. This class also supports metadata 
transactions (KIP-866).
+ *
+ *
+ */
+public class MetadataBatchLoader {
+
+enum TransactionState {
+NO_TRANSACTION,
+STARTED_TRANSACTION,
+CONTINUED_TRANSACTION,
+ENDED_TRANSACTION,
+ABORTED_TRANSACTION;
+}
+
+@FunctionalInterface
+public interface MetadataUpdater {
+void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest 
manifest);
+}
+
+private final Logger log;
+private final Time time;
+private final FaultHandler faultHandler;
+private final Supplier leaderAndEpochSupplier;
+private final MetadataUpdater callback;
+
+private MetadataImage image;
+private MetadataDelta delta;
+private long lastOffset;
+private int lastEpoch;
+private long lastContainedLogTimeMs;
+private long numBytes;
+private int numBatches;
+private long totalBatchElapsedNs;
+private TransactionState transactionState;
+
+public MetadataBatchLoader(
+LogContext logContext,
+Time time,
+FaultHandler faultHandler,
+Supplier leaderAndEpochSupplier,
+MetadataUpdater callback
+) {
+this.log = logContext.logger(MetadataBatchLoader.class);
+this.time = time;
+this.faultHandler = faultHandler;
+this.leaderAndEpochSupplier = leaderAndEpochSupplier;
+this.callback = callback;
+}
+
+/**
+ * Reset the state of this batch loader to the given image. Any un-flushed 
state will be
+ * discarded.
+ *
+ * @param image Metadata image to reset this batch loader's state to.
+ */
+public void resetToImage(MetadataImage image) {
+this.image = image;
+this.delta = new MetadataDelta.Builder().setImage(image).build();
+this.transactionState = TransactionState.NO_TRANSACTION;
+this.lastOffset = image.provenance().lastContainedOffset();
+this.lastEpoch = image.provenance().lastContainedEpoch();
+this.lastContainedLogTimeMs = 
image.provenance().lastContainedLogTimeMs();
+this.numBytes = 0;
+this.numBatches = 0;
+this.totalBatchElapsedNs = 0;
+}
+
+/**
+ * Load a batch of records from the log. We have to do some bookkeeping 
here to
+ * translate between batch offsets and record offsets, and track the 
number of bytes we
+ * have read. Additionally, there is the chance that one of the records is 
a metadata
+ * version change which needs to be handled differently.
+ * 
+ * If this batch starts a transaction, any records preceding the 
transaction in this
+ * batch will be implicitly added to the transaction.
+ *
+ * @param batchThe reader which yields the batches.
+ * @return The time in nanoseconds that elapsed while loading this 
batch
+ */
+
+public long loadBatch(Batch batch) {
+long startNs = time.nanoseconds();
+int indexWithinBatch = 0;
+
+lastContainedLogTimeMs = batch

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295090413


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##
@@ -183,6 +183,8 @@ public MetadataLoader build() {
  */
 private MetadataImage image;
 
+private MetadataBatchLoader batchLoader;

Review Comment:
   it seems like in the current implementation, this is `final` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295084521


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Loads batches of metadata updates from Raft commits into MetadataDelta-s. 
Multiple batches from a commit
+ * are buffered into a MetadataDelta to achieve batching of records and reduce 
the number of times
+ * MetadataPublishers must be updated. This class also supports metadata 
transactions (KIP-866).
+ *
+ *
+ */
+public class MetadataBatchLoader {
+
+enum TransactionState {
+NO_TRANSACTION,
+STARTED_TRANSACTION,
+CONTINUED_TRANSACTION,
+ENDED_TRANSACTION,
+ABORTED_TRANSACTION;
+}
+
+@FunctionalInterface
+public interface MetadataUpdater {
+void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest 
manifest);
+}
+
+private final Logger log;
+private final Time time;
+private final FaultHandler faultHandler;
+private final Supplier leaderAndEpochSupplier;
+private final MetadataUpdater callback;
+
+private MetadataImage image;
+private MetadataDelta delta;
+private long lastOffset;
+private int lastEpoch;
+private long lastContainedLogTimeMs;
+private long numBytes;
+private int numBatches;
+private long totalBatchElapsedNs;
+private TransactionState transactionState;
+
+public MetadataBatchLoader(
+LogContext logContext,
+Time time,
+FaultHandler faultHandler,
+Supplier leaderAndEpochSupplier,
+MetadataUpdater callback
+) {
+this.log = logContext.logger(MetadataBatchLoader.class);
+this.time = time;
+this.faultHandler = faultHandler;
+this.leaderAndEpochSupplier = leaderAndEpochSupplier;
+this.callback = callback;
+}
+
+/**
+ * Reset the state of this batch loader to the given image. Any un-flushed 
state will be
+ * discarded.
+ *
+ * @param image Metadata image to reset this batch loader's state to.
+ */
+public void resetToImage(MetadataImage image) {
+this.image = image;
+this.delta = new MetadataDelta.Builder().setImage(image).build();
+this.transactionState = TransactionState.NO_TRANSACTION;
+this.lastOffset = image.provenance().lastContainedOffset();
+this.lastEpoch = image.provenance().lastContainedEpoch();
+this.lastContainedLogTimeMs = 
image.provenance().lastContainedLogTimeMs();
+this.numBytes = 0;
+this.numBatches = 0;
+this.totalBatchElapsedNs = 0;
+}
+
+/**
+ * Load a batch of records from the log. We have to do some bookkeeping 
here to
+ * translate between batch offsets and record offsets, and track the 
number of bytes we
+ * have read. Additionally, there is the chance that one of the records is 
a metadata
+ * version change which needs to be handled differently.
+ * 
+ * If this batch starts a transaction, any records preceding the 
transaction in this
+ * batch will be implicitly added to the transaction.
+ *
+ * @param batchThe reader which yields the batches.
+ * @return The time in nanoseconds that elapsed while loading this 
batch
+ */
+
+public long loadBatch(Batch batch) {
+long startNs = time.nanoseconds();
+int indexWithinBatch = 0;
+
+lastContainedLogTimeMs = batch

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077023


##
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java:
##
@@ -80,6 +137,7 @@ public LeaderAndEpoch leaderAndEpoch() {
 return leaderAndEpoch;
 }
 
+

Review Comment:
   whitespace change not needed?



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -786,6 +798,7 @@ public void complete(Throwable exception) {
 }
 }
 
+

Review Comment:
   looks like extra whitespace?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295081634


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * Loads batches of metadata updates from Raft commits into MetadataDelta-s. 
Multiple batches from a commit
+ * are buffered into a MetadataDelta to achieve batching of records and reduce 
the number of times
+ * MetadataPublishers must be updated. This class also supports metadata 
transactions (KIP-866).
+ *
+ *
+ */
+public class MetadataBatchLoader {
+
+enum TransactionState {
+NO_TRANSACTION,
+STARTED_TRANSACTION,
+CONTINUED_TRANSACTION,
+ENDED_TRANSACTION,
+ABORTED_TRANSACTION;
+}
+
+@FunctionalInterface
+public interface MetadataUpdater {
+void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest 
manifest);
+}
+
+private final Logger log;
+private final Time time;
+private final FaultHandler faultHandler;
+private final Supplier leaderAndEpochSupplier;
+private final MetadataUpdater callback;
+
+private MetadataImage image;
+private MetadataDelta delta;
+private long lastOffset;
+private int lastEpoch;
+private long lastContainedLogTimeMs;
+private long numBytes;
+private int numBatches;
+private long totalBatchElapsedNs;
+private TransactionState transactionState;
+
+public MetadataBatchLoader(
+LogContext logContext,
+Time time,
+FaultHandler faultHandler,
+Supplier leaderAndEpochSupplier,
+MetadataUpdater callback
+) {
+this.log = logContext.logger(MetadataBatchLoader.class);
+this.time = time;
+this.faultHandler = faultHandler;
+this.leaderAndEpochSupplier = leaderAndEpochSupplier;
+this.callback = callback;
+}
+
+/**
+ * Reset the state of this batch loader to the given image. Any un-flushed 
state will be
+ * discarded.
+ *
+ * @param image Metadata image to reset this batch loader's state to.
+ */
+public void resetToImage(MetadataImage image) {
+this.image = image;
+this.delta = new MetadataDelta.Builder().setImage(image).build();
+this.transactionState = TransactionState.NO_TRANSACTION;
+this.lastOffset = image.provenance().lastContainedOffset();
+this.lastEpoch = image.provenance().lastContainedEpoch();
+this.lastContainedLogTimeMs = 
image.provenance().lastContainedLogTimeMs();
+this.numBytes = 0;
+this.numBatches = 0;
+this.totalBatchElapsedNs = 0;
+}
+
+/**
+ * Load a batch of records from the log. We have to do some bookkeeping 
here to
+ * translate between batch offsets and record offsets, and track the 
number of bytes we
+ * have read. Additionally, there is the chance that one of the records is 
a metadata
+ * version change which needs to be handled differently.
+ * 
+ * If this batch starts a transaction, any records preceding the 
transaction in this
+ * batch will be implicitly added to the transaction.
+ *
+ * @param batchThe reader which yields the batches.
+ * @return The time in nanoseconds that elapsed while loading this 
batch
+ */
+
+public long loadBatch(Batch batch) {
+long startNs = time.nanoseconds();
+int indexWithinBatch = 0;
+
+lastContainedLogTimeMs = batch

[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077448


##
metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java:
##
@@ -66,6 +119,10 @@ public LogDeltaManifest(
 this.numBytes = numBytes;
 }
 
+public static Builder newBuilder() {

Review Comment:
   curious why this is better than just having a public Builder constructor (I 
don't feel strongly, I guess...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295074841


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1200,6 +1235,16 @@ public static List 
generateActivationRecords(
 throw new RuntimeException("Should not have ZK migrations 
enabled on a cluster running metadata.version " + 
featureControl.metadataVersion());
 }
 }
+
+if (inTransaction) {
+if 
(!featureControl.metadataVersion().isMetadataTransactionSupported()) {
+throw new RuntimeException("Detected in-progress 
transaction, but the metadata.version " + featureControl.metadataVersion() +
+" does not support transactions. Cannot continue.");
+} else {
+log.warn("Detected in-progress transaction during 
controller activation. Aborting this transaction.");

Review Comment:
   I think we should include the start transaction offset in the log message 
(if you want, pass an OptionalLong to this function rather than a boolean?) 
That helps match up the beginning of the transaction with where it ends.
   
   Also should fill in that "reason" field, presumably with something like 
"controller failover"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295072484


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +909,62 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
+public CompletableFuture beginMigration() {
 log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK Migration"), 
(short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");
-// TODO use KIP-868 transaction
-ControllerWriteEvent event = new 
ControllerWriteEvent<>("Complete ZK Migration",
+ControllerWriteEvent event = new ControllerWriteEvent<>(
+"Complete ZK Migration",
 new MigrationWriteOperation(
-
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
-EnumSet.of(RUNS_IN_PREMIGRATION));
+Arrays.asList(
+ZkMigrationState.MIGRATION.toRecord(),
+new ApiMessageAndVersion(
+new EndTransactionRecord(), (short) 0)
+)),
+eventFlags);
 queue.append(event);
 return event.future.thenApply(__ -> highestMigrationRecordOffset);
 }
 
 @Override
-public void abortMigration() {
+public CompletableFuture abortMigration() {
 fatalFaultHandler.handleFault("Aborting the ZK migration");
-// TODO use KIP-868 transaction
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(

Review Comment:
   check mv
   
   Set the "reason" field to "aborting ZK migration" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295073210


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -954,11 +985,14 @@ public void 
handleCommit(BatchReader reader) {
 // so we don't need to do it here.
 log.debug("Completing purgatory items up to offset 
{} and epoch {}.", offset, epoch);
 
-// Complete any events in the purgatory that were 
waiting for this offset.
-deferredEventQueue.completeUpTo(offset);
+// Advance the committed and stable offsets then 
complete any pending purgatory
+// items that were waiting for these offsets.
+offsetControl.handleCommitBatch(batch);
+
deferredEventQueue.completeUpTo(offsetControl.lastStableOffset());
+
deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset());
 
 // The active controller can delete up to the 
current committed offset.
-snapshotRegistry.deleteSnapshotsUpTo(offset);
+
snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset());

Review Comment:
   I guess we need to call this one out as a bug fix.
   
   Good find.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295069992


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -623,7 +624,14 @@ enum ControllerOperationFlag {
  * even though the cluster really does have metadata. Very few 
operations should
  * use this flag.
  */
-RUNS_IN_PREMIGRATION
+RUNS_IN_PREMIGRATION,
+
+/**
+ * This flag signifies that an event will be completed even if it is 
part of an unfinished transaction.
+ * This is needed for metadata transactions so that external callers 
can add records to a transaction

Review Comment:
   Maybe mention ZK migration records as an example ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader

2023-08-15 Thread via GitHub


cmccabe commented on code in PR #14208:
URL: https://github.com/apache/kafka/pull/14208#discussion_r1295071752


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -892,48 +909,62 @@ class MigrationWriteOperation implements 
ControllerWriteOperation {
 }
 @Override
 public ControllerResult generateRecordsAndResult() {
-return ControllerResult.atomicOf(batch, null);
+return ControllerResult.of(batch, null);
 }
 
 public void processBatchEndOffset(long offset) {
 highestMigrationRecordOffset = new OffsetAndEpoch(offset, 
curClaimEpoch);
 }
 }
 @Override
-public void beginMigration() {
+public CompletableFuture beginMigration() {
 log.info("Starting ZK Migration");
-// TODO use KIP-868 transaction
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"Begin ZK Migration Transaction",
+new MigrationWriteOperation(Collections.singletonList(
+new ApiMessageAndVersion(
+new BeginTransactionRecord().setName("ZK Migration"), 
(short) 0))
+), eventFlags);
+queue.append(batchEvent);
+return batchEvent.future;
 }
 
 @Override
 public CompletableFuture acceptBatch(List 
recordBatch) {
-if (queue.size() > 100) { // TODO configure this
-CompletableFuture future = new CompletableFuture<>();
-future.completeExceptionally(new 
NotControllerException("Cannot accept migration record batch. Controller queue 
is too large"));
-return future;
-}
-ControllerWriteEvent batchEvent = new 
ControllerWriteEvent<>("ZK Migration Batch",
-new MigrationWriteOperation(recordBatch), 
EnumSet.of(RUNS_IN_PREMIGRATION));
+ControllerWriteEvent batchEvent = new ControllerWriteEvent<>(
+"ZK Migration Batch",
+new MigrationWriteOperation(recordBatch), eventFlags);
 queue.append(batchEvent);
 return batchEvent.future;
 }
 
 @Override
 public CompletableFuture completeMigration() {
 log.info("Completing ZK Migration");
-// TODO use KIP-868 transaction
-ControllerWriteEvent event = new 
ControllerWriteEvent<>("Complete ZK Migration",
+ControllerWriteEvent event = new ControllerWriteEvent<>(
+"Complete ZK Migration",
 new MigrationWriteOperation(
-
Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())),
-EnumSet.of(RUNS_IN_PREMIGRATION));
+Arrays.asList(

Review Comment:
   This is another case where we have to check the MV I guess



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org