[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1298865134 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { +// No in-flight transaction +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} + +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", +bootstrapMetadata.records().size(), +metadataVersion, +bootstrapMetadata.source()); +records.addAll(bootstrapMetadata.records()); + +if (metadataVersion.isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed " + +"until the ZK metadata has been migrated"); +records.add(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +records.add(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} + +if (metadataVersion.isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +return ControllerResult.of(records, null); +} else { +return ControllerResult.atomicOf(records, null); +} +} + +static ControllerResult recordsForNonEmptyLog( +Logger log, +
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1297536422 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { +// No in-flight transaction +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} + +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", +bootstrapMetadata.records().size(), +metadataVersion, +bootstrapMetadata.source()); +records.addAll(bootstrapMetadata.records()); + +if (metadataVersion.isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed " + +"until the ZK metadata has been migrated"); +records.add(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +records.add(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} + +if (metadataVersion.isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +return ControllerResult.of(records, null); +} else { +return ControllerResult.atomicOf(records, null); +} +} + +static ControllerResult recordsForNonEmptyLog( +Logger log, +
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1297536422 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { +// No in-flight transaction +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} + +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", +bootstrapMetadata.records().size(), +metadataVersion, +bootstrapMetadata.source()); +records.addAll(bootstrapMetadata.records()); + +if (metadataVersion.isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed " + +"until the ZK metadata has been migrated"); +records.add(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +records.add(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} + +if (metadataVersion.isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +return ControllerResult.of(records, null); +} else { +return ControllerResult.atomicOf(records, null); +} +} + +static ControllerResult recordsForNonEmptyLog( +Logger log, +
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1297533375 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { +// No in-flight transaction +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} + +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", +bootstrapMetadata.records().size(), +metadataVersion, +bootstrapMetadata.source()); +records.addAll(bootstrapMetadata.records()); + +if (metadataVersion.isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed " + +"until the ZK metadata has been migrated"); +records.add(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +records.add(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} + +if (metadataVersion.isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +return ControllerResult.of(records, null); +} else { +return ControllerResult.atomicOf(records, null); +} +} + +static ControllerResult recordsForNonEmptyLog( +Logger log, +
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1297531045 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { +// No in-flight transaction +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} + +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", +bootstrapMetadata.records().size(), +metadataVersion, +bootstrapMetadata.source()); +records.addAll(bootstrapMetadata.records()); + +if (metadataVersion.isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed " + +"until the ZK metadata has been migrated"); +records.add(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +records.add(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} + +if (metadataVersion.isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +return ControllerResult.of(records, null); +} else { +return ControllerResult.atomicOf(records, null); +} +} + +static ControllerResult recordsForNonEmptyLog( +Logger log, +
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1297529651 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { +// No in-flight transaction +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} + +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", +bootstrapMetadata.records().size(), +metadataVersion, +bootstrapMetadata.source()); +records.addAll(bootstrapMetadata.records()); + +if (metadataVersion.isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed " + +"until the ZK metadata has been migrated"); +records.add(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +records.add(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} + +if (metadataVersion.isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +return ControllerResult.of(records, null); +} else { +return ControllerResult.atomicOf(records, null); +} +} + +static ControllerResult recordsForNonEmptyLog( +Logger log, +
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1297529002 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { +// No in-flight transaction +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} + +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", +bootstrapMetadata.records().size(), +metadataVersion, +bootstrapMetadata.source()); +records.addAll(bootstrapMetadata.records()); + +if (metadataVersion.isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed " + +"until the ZK metadata has been migrated"); +records.add(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +records.add(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} + +if (metadataVersion.isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +return ControllerResult.of(records, null); +} else { +return ControllerResult.atomicOf(records, null); +} +} + +static ControllerResult recordsForNonEmptyLog( +Logger log, +
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1297526521 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { +// No in-flight transaction +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} + +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + +"at metadata.version {} from {}.", +bootstrapMetadata.records().size(), +metadataVersion, +bootstrapMetadata.source()); +records.addAll(bootstrapMetadata.records()); + +if (metadataVersion.isMigrationSupported()) { +if (zkMigrationEnabled) { +log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed " + +"until the ZK metadata has been migrated"); +records.add(ZkMigrationState.PRE_MIGRATION.toRecord()); +} else { +log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster."); +records.add(ZkMigrationState.NONE.toRecord()); +} +} else { +if (zkMigrationEnabled) { +throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + +" does not support ZK migrations. Cannot continue with ZK migrations enabled."); +} +} + +if (metadataVersion.isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +return ControllerResult.of(records, null); +} else { +return ControllerResult.atomicOf(records, null); +} +} + +static ControllerResult recordsForNonEmptyLog( +Logger log, +
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1297525659 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { +// No in-flight transaction +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} + +// If no records have been replayed, we need to write out the bootstrap records. +// This will include the new metadata.version, as well as things like SCRAM +// initialization, etc. +log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) " + Review Comment: I think it would be good to consolidate these log messages into one message that says: - whether we had to abort a bootstrap transaction - whether we're de-novo or migrating - MV - # of bootstrap records and I guess that log message can be WARN. It is quite important, after all ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.End
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1297521425 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { Review Comment: seems like we could get rid of the "else" here and only have one place where we add the BeginTransactionRecord ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1297521425 ## metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.AbortTransactionRecord; +import org.apache.kafka.common.metadata.BeginTransactionRecord; +import org.apache.kafka.common.metadata.EndTransactionRecord; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.migration.ZkMigrationState; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + +public class ActivationRecordsGenerator { + +static ControllerResult recordsForEmptyLog( +Logger log, +long transactionStartOffset, +boolean zkMigrationEnabled, +BootstrapMetadata bootstrapMetadata, +MetadataVersion metadataVersion +) { +List records = new ArrayList<>(); + +if (transactionStartOffset != -1L) { +// In-flight bootstrap transaction +if (!metadataVersion.isMetadataTransactionSupported()) { +throw new RuntimeException("Detected partial bootstrap records transaction at " + +transactionStartOffset + ", but the metadata.version " + metadataVersion + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected partial bootstrap records transaction at " + transactionStartOffset + +" during controller activation. Aborting this transaction and re-committing bootstrap records"); +records.add(new ApiMessageAndVersion( +new AbortTransactionRecord().setReason("Controller failover"), (short) 0)); +records.add(new ApiMessageAndVersion( +new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); +} +} else if (metadataVersion.isMetadataTransactionSupported()) { Review Comment: seems like we could get rid of the "else" here and only have one place where we add the BeginTransactionRecord ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296512550 ## metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java: ## @@ -212,10 +213,14 @@ long lastStableOffset() { } /** - * @return the transaction start offset, or -1 if there is no transaction. + * @return the transaction start offset if present, or empty if there is no transaction. */ -long transactionStartOffset() { -return transactionStartOffset; +OptionalLong transactionStartOffset() { Review Comment: thanks ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; +} else { +log.warn("Starting ZK Migration without metadata transactions enabled. This is not safe since " + +"a controller failover or processing error may lead to partially migrated metadata."); +return CompletableFuture.completedFuture(null); +} } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); -// TODO use KIP-868 transaction -ControllerWriteEvent event = new ControllerWriteEvent<>("Complete ZK Migration", -new MigrationWriteOperation( - Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), -EnumSet.of(RUNS_IN_PREMIGRATION)); +List records = new ArrayList<>(2); +records.add(ZkMigrationState.MIGRATION.toRecord()); +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +} +ControllerWriteEvent event = new ControllerWriteEvent<>( +"Complete ZK Migration", +new MigrationWriteOperation(records), +eventFlags); queue.append(event); return event.future.thenApply(__ -> highestMigrationRecordOffset); } @Override -public void abortMigration() { +public CompletableFuture abortMigration() { fatalFaultHandler.handleFault("Aborting the ZK migration"); Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296172460 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; +} else { +log.warn("Starting ZK Migration without metadata transactions enabled. This is not safe since " + +"a controller failover or processing error may lead to partially migrated metadata."); +return CompletableFuture.completedFuture(null); +} } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); Review Comment: I think "Completing ZK Migration" is a misleading log message since the migration is not truly complete at this point. How about a message like "Completing initial ZooKeeper state loading process" ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); Review Comment: How about a message like "Starting initial ZooKeeper state loading process" ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296200794 ## metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java: ## @@ -66,6 +119,10 @@ public LogDeltaManifest( this.numBytes = numBytes; } +public static Builder newBuilder() { Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r129629 ## metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java: ## @@ -212,10 +213,14 @@ long lastStableOffset() { } /** - * @return the transaction start offset, or -1 if there is no transaction. + * @return the transaction start offset if present, or empty if there is no transaction. */ -long transactionStartOffset() { -return transactionStartOffset; +OptionalLong transactionStartOffset() { Review Comment: I’m not loving this idea of OffsetControlManager.transactionStartOffset returning an OptionalLong. Seems inconsistent with the other accessors. Maybe let’s just return a regular long? I had some vague efficiency related reasons for preferring the negative / non-negative convention (to be fair, haven’t benchmarked) ## metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java: ## @@ -212,10 +213,14 @@ long lastStableOffset() { } /** - * @return the transaction start offset, or -1 if there is no transaction. + * @return the transaction start offset if present, or empty if there is no transaction. */ -long transactionStartOffset() { -return transactionStartOffset; +OptionalLong transactionStartOffset() { Review Comment: I’m not loving this idea of OffsetControlManager.transactionStartOffset returning an OptionalLong. Seems inconsistent with the other accessors. Maybe let’s just return a regular long? I had some vague efficiency related reasons for preferring the negative / non-negative convention (to be fair, haven’t benchmarked) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296178891 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; +} else { +log.warn("Starting ZK Migration without metadata transactions enabled. This is not safe since " + +"a controller failover or processing error may lead to partially migrated metadata."); +return CompletableFuture.completedFuture(null); +} } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); -// TODO use KIP-868 transaction -ControllerWriteEvent event = new ControllerWriteEvent<>("Complete ZK Migration", -new MigrationWriteOperation( - Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), -EnumSet.of(RUNS_IN_PREMIGRATION)); +List records = new ArrayList<>(2); +records.add(ZkMigrationState.MIGRATION.toRecord()); +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +} +ControllerWriteEvent event = new ControllerWriteEvent<>( +"Complete ZK Migration", +new MigrationWriteOperation(records), +eventFlags); queue.append(event); return event.future.thenApply(__ -> highestMigrationRecordOffset); } @Override -public void abortMigration() { +public CompletableFuture abortMigration() { fatalFaultHandler.handleFault("Aborting the ZK migration"); Review Comment: If you abort the process here by calling `fatalFaultHandler.handleFault`, the code you have below to add an AbortTransaction will never be executed. One approach would be to just abort the process, and rely on the new controller that comes up to put an initial AbortTransaction into the log to clear the current pending transaction. We have to be able to handle that case anyway, since crashing during initial ZK state load is always possible. If you take this approach, then basically it means you can delete the code below to add an AbortTransaction. Maybe add a comment here saying that the new controller will abort the transaction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact I
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296178891 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; +} else { +log.warn("Starting ZK Migration without metadata transactions enabled. This is not safe since " + +"a controller failover or processing error may lead to partially migrated metadata."); +return CompletableFuture.completedFuture(null); +} } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); -// TODO use KIP-868 transaction -ControllerWriteEvent event = new ControllerWriteEvent<>("Complete ZK Migration", -new MigrationWriteOperation( - Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), -EnumSet.of(RUNS_IN_PREMIGRATION)); +List records = new ArrayList<>(2); +records.add(ZkMigrationState.MIGRATION.toRecord()); +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +} +ControllerWriteEvent event = new ControllerWriteEvent<>( +"Complete ZK Migration", +new MigrationWriteOperation(records), +eventFlags); queue.append(event); return event.future.thenApply(__ -> highestMigrationRecordOffset); } @Override -public void abortMigration() { +public CompletableFuture abortMigration() { fatalFaultHandler.handleFault("Aborting the ZK migration"); Review Comment: If you abort the process here by calling `fatalFaultHandler.handleFault`, the code you have below to add an AbortTransaction will never be executed. One approach would be to just abort the process, and rely on the new controller that comes up to put an initial AbortTransaction into the log to clear the current pending transaction. We have to be able to handle that case anyway, since crashing during initial ZK state load is always possible. If you take this approach, then basically it means you can delete the code below to add an AbortTransaction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296178891 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; +} else { +log.warn("Starting ZK Migration without metadata transactions enabled. This is not safe since " + +"a controller failover or processing error may lead to partially migrated metadata."); +return CompletableFuture.completedFuture(null); +} } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); -// TODO use KIP-868 transaction -ControllerWriteEvent event = new ControllerWriteEvent<>("Complete ZK Migration", -new MigrationWriteOperation( - Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), -EnumSet.of(RUNS_IN_PREMIGRATION)); +List records = new ArrayList<>(2); +records.add(ZkMigrationState.MIGRATION.toRecord()); +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); +} +ControllerWriteEvent event = new ControllerWriteEvent<>( +"Complete ZK Migration", +new MigrationWriteOperation(records), +eventFlags); queue.append(event); return event.future.thenApply(__ -> highestMigrationRecordOffset); } @Override -public void abortMigration() { +public CompletableFuture abortMigration() { fatalFaultHandler.handleFault("Aborting the ZK migration"); Review Comment: If you abort the process here by calling `fatalFaultHandler.handleFault`, the code you have below to add an AbortTransaction will never be executed. One approach would be to just abort the process, and rely on the new controller that comes up to put an initial AbortTransaction into the log to clear the current pending transaction. We have to be able to handle that case anyway, since crashing during initial ZK state load is always possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296173796 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); Review Comment: How about a message like "Starting initial ZooKeeper state rehosting process" ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296172460 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; +} else { +log.warn("Starting ZK Migration without metadata transactions enabled. This is not safe since " + +"a controller failover or processing error may lead to partially migrated metadata."); +return CompletableFuture.completedFuture(null); +} } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); Review Comment: I think "Completing ZK Migration" is a misleading log message since the migration is not truly complete at this point. How about a message like "Completing initial ZooKeeper state rehosting process" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296173796 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); Review Comment: How about a message like "Starting initial ZooKeeper state lift and shift process" ? ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; +} else { +log.warn("Starting ZK Migration without metadata transactions enabled. This is not safe since " + +"a controller failover or processing error may lead to partially migrated metadata."); +return CompletableFuture.completedFuture(null); +} } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); Review Comment: I think "Completing ZK Migration" is a misleading log message since the migration is not truly complete at this point. How about a message like "Completing initial ZooKeeper state lift and shift process" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1296172460 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +915,73 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { -log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +public CompletableFuture beginMigration() { +if (featureControl.metadataVersion().isMetadataTransactionSupported()) { +log.info("Starting ZK Migration"); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; +} else { +log.warn("Starting ZK Migration without metadata transactions enabled. This is not safe since " + +"a controller failover or processing error may lead to partially migrated metadata."); +return CompletableFuture.completedFuture(null); +} } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); Review Comment: I think "Completing ZK Migration" is a misleading log message since the migration is not truly complete at this point. How about a message like "Completing initial ZooKeeper state loading process" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077023 ## metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java: ## @@ -80,6 +137,7 @@ public LeaderAndEpoch leaderAndEpoch() { return leaderAndEpoch; } + Review Comment: whitespace not needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295076740 ## metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java: ## @@ -27,6 +27,59 @@ * Contains information about a set of changes that were loaded from the metadata log. */ public class LogDeltaManifest implements LoaderManifest { + +public static class Builder { +private MetadataProvenance provenance; +private LeaderAndEpoch leaderAndEpoch; +private Integer numBatches; Review Comment: seems a bit weird to use boxed primitives. It's quite inefficient in Java. If you want them to start with invalid values, just make them negative (that works for numBatches, elapsedNs, numBytes ... negative values for those don't make sense.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295091800 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader; + +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.fault.FaultHandler; +import org.slf4j.Logger; + +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Loads batches of metadata updates from Raft commits into MetadataDelta-s. Multiple batches from a commit + * are buffered into a MetadataDelta to achieve batching of records and reduce the number of times + * MetadataPublishers must be updated. This class also supports metadata transactions (KIP-866). + * + * + */ +public class MetadataBatchLoader { + +enum TransactionState { +NO_TRANSACTION, +STARTED_TRANSACTION, +CONTINUED_TRANSACTION, +ENDED_TRANSACTION, +ABORTED_TRANSACTION; +} + +@FunctionalInterface +public interface MetadataUpdater { +void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest manifest); +} + +private final Logger log; +private final Time time; +private final FaultHandler faultHandler; +private final Supplier leaderAndEpochSupplier; +private final MetadataUpdater callback; + +private MetadataImage image; +private MetadataDelta delta; +private long lastOffset; +private int lastEpoch; +private long lastContainedLogTimeMs; +private long numBytes; +private int numBatches; +private long totalBatchElapsedNs; +private TransactionState transactionState; + +public MetadataBatchLoader( +LogContext logContext, +Time time, +FaultHandler faultHandler, +Supplier leaderAndEpochSupplier, +MetadataUpdater callback +) { +this.log = logContext.logger(MetadataBatchLoader.class); +this.time = time; +this.faultHandler = faultHandler; +this.leaderAndEpochSupplier = leaderAndEpochSupplier; +this.callback = callback; +} + +/** + * Reset the state of this batch loader to the given image. Any un-flushed state will be + * discarded. + * + * @param image Metadata image to reset this batch loader's state to. + */ +public void resetToImage(MetadataImage image) { +this.image = image; +this.delta = new MetadataDelta.Builder().setImage(image).build(); +this.transactionState = TransactionState.NO_TRANSACTION; +this.lastOffset = image.provenance().lastContainedOffset(); +this.lastEpoch = image.provenance().lastContainedEpoch(); +this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs(); +this.numBytes = 0; +this.numBatches = 0; +this.totalBatchElapsedNs = 0; +} + +/** + * Load a batch of records from the log. We have to do some bookkeeping here to + * translate between batch offsets and record offsets, and track the number of bytes we + * have read. Additionally, there is the chance that one of the records is a metadata + * version change which needs to be handled differently. + * + * If this batch starts a transaction, any records preceding the transaction in this + * batch will be implicitly added to the transaction. + * + * @param batchThe reader which yields the batches. + * @return The time in nanoseconds that elapsed while loading this batch + */ + +public long loadBatch(Batch batch) { +long startNs = time.nanoseconds(); +int indexWithinBatch = 0; + +lastContainedLogTimeMs = batch
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295084331 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader; + +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.fault.FaultHandler; +import org.slf4j.Logger; + +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Loads batches of metadata updates from Raft commits into MetadataDelta-s. Multiple batches from a commit + * are buffered into a MetadataDelta to achieve batching of records and reduce the number of times + * MetadataPublishers must be updated. This class also supports metadata transactions (KIP-866). + * + * + */ +public class MetadataBatchLoader { + +enum TransactionState { +NO_TRANSACTION, +STARTED_TRANSACTION, +CONTINUED_TRANSACTION, +ENDED_TRANSACTION, +ABORTED_TRANSACTION; +} + +@FunctionalInterface +public interface MetadataUpdater { +void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest manifest); +} + +private final Logger log; +private final Time time; +private final FaultHandler faultHandler; +private final Supplier leaderAndEpochSupplier; +private final MetadataUpdater callback; + +private MetadataImage image; +private MetadataDelta delta; +private long lastOffset; +private int lastEpoch; +private long lastContainedLogTimeMs; +private long numBytes; +private int numBatches; +private long totalBatchElapsedNs; +private TransactionState transactionState; + +public MetadataBatchLoader( +LogContext logContext, +Time time, +FaultHandler faultHandler, +Supplier leaderAndEpochSupplier, +MetadataUpdater callback +) { +this.log = logContext.logger(MetadataBatchLoader.class); +this.time = time; +this.faultHandler = faultHandler; +this.leaderAndEpochSupplier = leaderAndEpochSupplier; +this.callback = callback; +} + +/** + * Reset the state of this batch loader to the given image. Any un-flushed state will be + * discarded. + * + * @param image Metadata image to reset this batch loader's state to. + */ +public void resetToImage(MetadataImage image) { +this.image = image; +this.delta = new MetadataDelta.Builder().setImage(image).build(); +this.transactionState = TransactionState.NO_TRANSACTION; +this.lastOffset = image.provenance().lastContainedOffset(); +this.lastEpoch = image.provenance().lastContainedEpoch(); +this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs(); +this.numBytes = 0; +this.numBatches = 0; +this.totalBatchElapsedNs = 0; +} + +/** + * Load a batch of records from the log. We have to do some bookkeeping here to + * translate between batch offsets and record offsets, and track the number of bytes we + * have read. Additionally, there is the chance that one of the records is a metadata + * version change which needs to be handled differently. + * + * If this batch starts a transaction, any records preceding the transaction in this + * batch will be implicitly added to the transaction. + * + * @param batchThe reader which yields the batches. + * @return The time in nanoseconds that elapsed while loading this batch + */ + +public long loadBatch(Batch batch) { +long startNs = time.nanoseconds(); +int indexWithinBatch = 0; + +lastContainedLogTimeMs = batch
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295090413 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ## @@ -183,6 +183,8 @@ public MetadataLoader build() { */ private MetadataImage image; +private MetadataBatchLoader batchLoader; Review Comment: it seems like in the current implementation, this is `final` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295084521 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader; + +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.fault.FaultHandler; +import org.slf4j.Logger; + +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Loads batches of metadata updates from Raft commits into MetadataDelta-s. Multiple batches from a commit + * are buffered into a MetadataDelta to achieve batching of records and reduce the number of times + * MetadataPublishers must be updated. This class also supports metadata transactions (KIP-866). + * + * + */ +public class MetadataBatchLoader { + +enum TransactionState { +NO_TRANSACTION, +STARTED_TRANSACTION, +CONTINUED_TRANSACTION, +ENDED_TRANSACTION, +ABORTED_TRANSACTION; +} + +@FunctionalInterface +public interface MetadataUpdater { +void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest manifest); +} + +private final Logger log; +private final Time time; +private final FaultHandler faultHandler; +private final Supplier leaderAndEpochSupplier; +private final MetadataUpdater callback; + +private MetadataImage image; +private MetadataDelta delta; +private long lastOffset; +private int lastEpoch; +private long lastContainedLogTimeMs; +private long numBytes; +private int numBatches; +private long totalBatchElapsedNs; +private TransactionState transactionState; + +public MetadataBatchLoader( +LogContext logContext, +Time time, +FaultHandler faultHandler, +Supplier leaderAndEpochSupplier, +MetadataUpdater callback +) { +this.log = logContext.logger(MetadataBatchLoader.class); +this.time = time; +this.faultHandler = faultHandler; +this.leaderAndEpochSupplier = leaderAndEpochSupplier; +this.callback = callback; +} + +/** + * Reset the state of this batch loader to the given image. Any un-flushed state will be + * discarded. + * + * @param image Metadata image to reset this batch loader's state to. + */ +public void resetToImage(MetadataImage image) { +this.image = image; +this.delta = new MetadataDelta.Builder().setImage(image).build(); +this.transactionState = TransactionState.NO_TRANSACTION; +this.lastOffset = image.provenance().lastContainedOffset(); +this.lastEpoch = image.provenance().lastContainedEpoch(); +this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs(); +this.numBytes = 0; +this.numBatches = 0; +this.totalBatchElapsedNs = 0; +} + +/** + * Load a batch of records from the log. We have to do some bookkeeping here to + * translate between batch offsets and record offsets, and track the number of bytes we + * have read. Additionally, there is the chance that one of the records is a metadata + * version change which needs to be handled differently. + * + * If this batch starts a transaction, any records preceding the transaction in this + * batch will be implicitly added to the transaction. + * + * @param batchThe reader which yields the batches. + * @return The time in nanoseconds that elapsed while loading this batch + */ + +public long loadBatch(Batch batch) { +long startNs = time.nanoseconds(); +int indexWithinBatch = 0; + +lastContainedLogTimeMs = batch
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077023 ## metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java: ## @@ -80,6 +137,7 @@ public LeaderAndEpoch leaderAndEpoch() { return leaderAndEpoch; } + Review Comment: whitespace change not needed? ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -786,6 +798,7 @@ public void complete(Throwable exception) { } } + Review Comment: looks like extra whitespace? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295081634 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader; + +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.fault.FaultHandler; +import org.slf4j.Logger; + +import java.util.function.Supplier; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * Loads batches of metadata updates from Raft commits into MetadataDelta-s. Multiple batches from a commit + * are buffered into a MetadataDelta to achieve batching of records and reduce the number of times + * MetadataPublishers must be updated. This class also supports metadata transactions (KIP-866). + * + * + */ +public class MetadataBatchLoader { + +enum TransactionState { +NO_TRANSACTION, +STARTED_TRANSACTION, +CONTINUED_TRANSACTION, +ENDED_TRANSACTION, +ABORTED_TRANSACTION; +} + +@FunctionalInterface +public interface MetadataUpdater { +void update(MetadataDelta delta, MetadataImage image, LogDeltaManifest manifest); +} + +private final Logger log; +private final Time time; +private final FaultHandler faultHandler; +private final Supplier leaderAndEpochSupplier; +private final MetadataUpdater callback; + +private MetadataImage image; +private MetadataDelta delta; +private long lastOffset; +private int lastEpoch; +private long lastContainedLogTimeMs; +private long numBytes; +private int numBatches; +private long totalBatchElapsedNs; +private TransactionState transactionState; + +public MetadataBatchLoader( +LogContext logContext, +Time time, +FaultHandler faultHandler, +Supplier leaderAndEpochSupplier, +MetadataUpdater callback +) { +this.log = logContext.logger(MetadataBatchLoader.class); +this.time = time; +this.faultHandler = faultHandler; +this.leaderAndEpochSupplier = leaderAndEpochSupplier; +this.callback = callback; +} + +/** + * Reset the state of this batch loader to the given image. Any un-flushed state will be + * discarded. + * + * @param image Metadata image to reset this batch loader's state to. + */ +public void resetToImage(MetadataImage image) { +this.image = image; +this.delta = new MetadataDelta.Builder().setImage(image).build(); +this.transactionState = TransactionState.NO_TRANSACTION; +this.lastOffset = image.provenance().lastContainedOffset(); +this.lastEpoch = image.provenance().lastContainedEpoch(); +this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs(); +this.numBytes = 0; +this.numBatches = 0; +this.totalBatchElapsedNs = 0; +} + +/** + * Load a batch of records from the log. We have to do some bookkeeping here to + * translate between batch offsets and record offsets, and track the number of bytes we + * have read. Additionally, there is the chance that one of the records is a metadata + * version change which needs to be handled differently. + * + * If this batch starts a transaction, any records preceding the transaction in this + * batch will be implicitly added to the transaction. + * + * @param batchThe reader which yields the batches. + * @return The time in nanoseconds that elapsed while loading this batch + */ + +public long loadBatch(Batch batch) { +long startNs = time.nanoseconds(); +int indexWithinBatch = 0; + +lastContainedLogTimeMs = batch
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295077448 ## metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java: ## @@ -66,6 +119,10 @@ public LogDeltaManifest( this.numBytes = numBytes; } +public static Builder newBuilder() { Review Comment: curious why this is better than just having a public Builder constructor (I don't feel strongly, I guess...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295074841 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1200,6 +1235,16 @@ public static List generateActivationRecords( throw new RuntimeException("Should not have ZK migrations enabled on a cluster running metadata.version " + featureControl.metadataVersion()); } } + +if (inTransaction) { +if (!featureControl.metadataVersion().isMetadataTransactionSupported()) { +throw new RuntimeException("Detected in-progress transaction, but the metadata.version " + featureControl.metadataVersion() + +" does not support transactions. Cannot continue."); +} else { +log.warn("Detected in-progress transaction during controller activation. Aborting this transaction."); Review Comment: I think we should include the start transaction offset in the log message (if you want, pass an OptionalLong to this function rather than a boolean?) That helps match up the beginning of the transaction with where it ends. Also should fill in that "reason" field, presumably with something like "controller failover" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295072484 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +909,62 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { +public CompletableFuture beginMigration() { log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); -// TODO use KIP-868 transaction -ControllerWriteEvent event = new ControllerWriteEvent<>("Complete ZK Migration", +ControllerWriteEvent event = new ControllerWriteEvent<>( +"Complete ZK Migration", new MigrationWriteOperation( - Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), -EnumSet.of(RUNS_IN_PREMIGRATION)); +Arrays.asList( +ZkMigrationState.MIGRATION.toRecord(), +new ApiMessageAndVersion( +new EndTransactionRecord(), (short) 0) +)), +eventFlags); queue.append(event); return event.future.thenApply(__ -> highestMigrationRecordOffset); } @Override -public void abortMigration() { +public CompletableFuture abortMigration() { fatalFaultHandler.handleFault("Aborting the ZK migration"); -// TODO use KIP-868 transaction +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( Review Comment: check mv Set the "reason" field to "aborting ZK migration" ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295073210 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -954,11 +985,14 @@ public void handleCommit(BatchReader reader) { // so we don't need to do it here. log.debug("Completing purgatory items up to offset {} and epoch {}.", offset, epoch); -// Complete any events in the purgatory that were waiting for this offset. -deferredEventQueue.completeUpTo(offset); +// Advance the committed and stable offsets then complete any pending purgatory +// items that were waiting for these offsets. +offsetControl.handleCommitBatch(batch); + deferredEventQueue.completeUpTo(offsetControl.lastStableOffset()); + deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset()); // The active controller can delete up to the current committed offset. -snapshotRegistry.deleteSnapshotsUpTo(offset); + snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset()); Review Comment: I guess we need to call this one out as a bug fix. Good find. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295069992 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -623,7 +624,14 @@ enum ControllerOperationFlag { * even though the cluster really does have metadata. Very few operations should * use this flag. */ -RUNS_IN_PREMIGRATION +RUNS_IN_PREMIGRATION, + +/** + * This flag signifies that an event will be completed even if it is part of an unfinished transaction. + * This is needed for metadata transactions so that external callers can add records to a transaction Review Comment: Maybe mention ZK migration records as an example ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14208: KAFKA-14538 Metadata transactions in MetadataLoader
cmccabe commented on code in PR #14208: URL: https://github.com/apache/kafka/pull/14208#discussion_r1295071752 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -892,48 +909,62 @@ class MigrationWriteOperation implements ControllerWriteOperation { } @Override public ControllerResult generateRecordsAndResult() { -return ControllerResult.atomicOf(batch, null); +return ControllerResult.of(batch, null); } public void processBatchEndOffset(long offset) { highestMigrationRecordOffset = new OffsetAndEpoch(offset, curClaimEpoch); } } @Override -public void beginMigration() { +public CompletableFuture beginMigration() { log.info("Starting ZK Migration"); -// TODO use KIP-868 transaction +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"Begin ZK Migration Transaction", +new MigrationWriteOperation(Collections.singletonList( +new ApiMessageAndVersion( +new BeginTransactionRecord().setName("ZK Migration"), (short) 0)) +), eventFlags); +queue.append(batchEvent); +return batchEvent.future; } @Override public CompletableFuture acceptBatch(List recordBatch) { -if (queue.size() > 100) { // TODO configure this -CompletableFuture future = new CompletableFuture<>(); -future.completeExceptionally(new NotControllerException("Cannot accept migration record batch. Controller queue is too large")); -return future; -} -ControllerWriteEvent batchEvent = new ControllerWriteEvent<>("ZK Migration Batch", -new MigrationWriteOperation(recordBatch), EnumSet.of(RUNS_IN_PREMIGRATION)); +ControllerWriteEvent batchEvent = new ControllerWriteEvent<>( +"ZK Migration Batch", +new MigrationWriteOperation(recordBatch), eventFlags); queue.append(batchEvent); return batchEvent.future; } @Override public CompletableFuture completeMigration() { log.info("Completing ZK Migration"); -// TODO use KIP-868 transaction -ControllerWriteEvent event = new ControllerWriteEvent<>("Complete ZK Migration", +ControllerWriteEvent event = new ControllerWriteEvent<>( +"Complete ZK Migration", new MigrationWriteOperation( - Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), -EnumSet.of(RUNS_IN_PREMIGRATION)); +Arrays.asList( Review Comment: This is another case where we have to check the MV I guess -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org