chia7712 commented on code in PR #17562: URL: https://github.com/apache/kafka/pull/17562#discussion_r1817933302
########## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/ProducerIdManager.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.NodeToControllerChannelManager; + +import java.util.function.Supplier; + +/** + * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way + * such that the same producerId will not be assigned twice across multiple transaction coordinators. + * <p> + * ProducerIds are managed by the controller. When requesting a new range of IDs, we are guaranteed to receive + * a unique block. + */ +public interface ProducerIdManager { + + Long generateProducerId() throws Exception; Review Comment: `Long` -> `long` ########## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ########## @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.coordinator.transaction - -import kafka.coordinator.transaction.ProducerIdManager.{IterationLimit, NoRetry, RetryBackoffMs} -import kafka.utils.Logging -import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode} -import org.apache.kafka.clients.ClientResponse -import org.apache.kafka.common.KafkaException -import org.apache.kafka.common.message.AllocateProducerIdsRequestData -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{AllocateProducerIdsRequest, AllocateProducerIdsResponse} -import org.apache.kafka.common.utils.Time -import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} -import org.apache.kafka.server.common.ProducerIdsBlock - -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} -import scala.jdk.OptionConverters.RichOptional -import scala.util.{Failure, Success, Try} - -/** - * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way - * such that the same producerId will not be assigned twice across multiple transaction coordinators. - * - * ProducerIds are managed by the controller. When requesting a new range of IDs, we are guaranteed to receive - * a unique block. - */ - -object ProducerIdManager { - // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block - val PidPrefetchThreshold: Double = 0.90 - val IterationLimit: Int = 3 - val RetryBackoffMs: Int = 50 - val NoRetry: Long = -1L - - // Creates a ProducerIdGenerate that directly interfaces with ZooKeeper, IBP < 3.0-IV0 - def zk(brokerId: Int, zkClient: KafkaZkClient): ZkProducerIdManager = { - new ZkProducerIdManager(brokerId, zkClient) - } - - // Creates a ProducerIdGenerate that uses AllocateProducerIds RPC, IBP >= 3.0-IV0 - def rpc(brokerId: Int, - time: Time, - brokerEpochSupplier: () => Long, - controllerChannel: NodeToControllerChannelManager): RPCProducerIdManager = { - - new RPCProducerIdManager(brokerId, time, brokerEpochSupplier, controllerChannel) - } -} - -trait ProducerIdManager { - def generateProducerId(): Try[Long] - def shutdown() : Unit = {} - - // For testing purposes - def hasValidBlock: Boolean -} - -object ZkProducerIdManager { - def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = { - // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other - // brokers may be generating PID blocks during a rolling upgrade - var zkWriteComplete = false - while (!zkWriteComplete) { - // refresh current producerId block from zookeeper again - val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path) - - // generate the new producerId block - val newProducerIdBlock = dataOpt match { - case Some(data) => - val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data) - logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion") - - if (currProducerIdBlock.lastProducerId > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) { - // we have exhausted all producerIds (wow!), treat it as a fatal error - logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.lastProducerId})") - throw new KafkaException("Have exhausted all producerIds.") - } - - new ProducerIdsBlock(brokerId, currProducerIdBlock.nextBlockFirstId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) - case None => - logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block") - new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) - } - - val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock) - - // try to write the new producerId block into zookeeper - val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None) - zkWriteComplete = succeeded - - if (zkWriteComplete) { - logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version") - return newProducerIdBlock - } - } - throw new IllegalStateException() - } -} - -class ZkProducerIdManager(brokerId: Int, zkClient: KafkaZkClient) extends ProducerIdManager with Logging { - - this.logIdent = "[ZK ProducerId Manager " + brokerId + "]: " - - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = _ - - // grab the first block of producerIds - this synchronized { - allocateNewProducerIdBlock() - nextProducerId = currentProducerIdBlock.firstProducerId - } - - private def allocateNewProducerIdBlock(): Unit = { - this synchronized { - currentProducerIdBlock = ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this) - } - } - - def generateProducerId(): Try[Long] = { - this synchronized { - // grab a new block of producerIds if this block has been exhausted - if (nextProducerId > currentProducerIdBlock.lastProducerId) { - try { - allocateNewProducerIdBlock() - } catch { - case t: Throwable => - return Failure(t) - } - nextProducerId = currentProducerIdBlock.firstProducerId - } - nextProducerId += 1 - Success(nextProducerId - 1) - } - } - - override def hasValidBlock: Boolean = { - this synchronized { - !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) - } - } -} - -/** - * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests Review Comment: Could you please keep this comment? ########## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AllocateProducerIdsRequest; +import org.apache.kafka.common.requests.AllocateProducerIdsResponse; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.ControllerRequestCompletionHandler; +import org.apache.kafka.server.common.NodeToControllerChannelManager; +import org.apache.kafka.server.common.ProducerIdsBlock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public class RPCProducerIdManager implements ProducerIdManager { + + public static final int RETRY_BACKOFF_MS = 50; + // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block + protected static final double PID_PREFETCH_THRESHOLD = 0.90; Review Comment: private ########## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AllocateProducerIdsRequest; +import org.apache.kafka.common.requests.AllocateProducerIdsResponse; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.ControllerRequestCompletionHandler; +import org.apache.kafka.server.common.NodeToControllerChannelManager; +import org.apache.kafka.server.common.ProducerIdsBlock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public class RPCProducerIdManager implements ProducerIdManager { + + public static final int RETRY_BACKOFF_MS = 50; + // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block + protected static final double PID_PREFETCH_THRESHOLD = 0.90; + protected static final int ITERATION_LIMIT = 3; + protected static final long NO_RETRY = -1L; + + private static final Logger log = LoggerFactory.getLogger(RPCProducerIdManager.class); + private final String logPrefix; + + private final int brokerId; + private final Time time; + private final Supplier<Long> brokerEpochSupplier; + private final NodeToControllerChannelManager controllerChannel; + + // Visible for testing + final AtomicReference<ProducerIdsBlock> nextProducerIdBlock = new AtomicReference<>(null); + private final AtomicReference<ProducerIdsBlock> currentProducerIdBlock = new AtomicReference<>(ProducerIdsBlock.EMPTY); + private final AtomicBoolean requestInFlight = new AtomicBoolean(false); + private final AtomicLong backoffDeadlineMs = new AtomicLong(NO_RETRY); + + public RPCProducerIdManager(int brokerId, + Time time, + Supplier<Long> brokerEpochSupplier, + NodeToControllerChannelManager controllerChannel + ) { + this.brokerId = brokerId; + this.time = time; + this.brokerEpochSupplier = brokerEpochSupplier; + this.controllerChannel = controllerChannel; + this.logPrefix = "[RPC ProducerId Manager " + brokerId + "]: "; + } + + + @Override + public Long generateProducerId() { + var iteration = 0; + while (iteration <= ITERATION_LIMIT) { + var claimNextId = currentProducerIdBlock.get().claimNextId(); + if (claimNextId.isPresent()) { + var nextProducerId = claimNextId.get(); + // Check if we need to prefetch the next block + var prefetchTarget = currentProducerIdBlock.get().firstProducerId() + + (long) (currentProducerIdBlock.get().size() * PID_PREFETCH_THRESHOLD); + if (nextProducerId == prefetchTarget) { + maybePrefetchNextBlock(); + } + return nextProducerId; + } else { + // Check the next block if current block is full + var block = nextProducerIdBlock.getAndSet(null); + if (block == null) { + // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal + // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. + maybeRequestNextBlock(); + throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"); + } else { + currentProducerIdBlock.set(block); + requestInFlight.set(false); + iteration++; + } + } + } + throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"); + } + + @Override + public void shutdown() { + + } + + private void maybePrefetchNextBlock() { + var retryTimestamp = backoffDeadlineMs.get(); + if (retryTimestamp == NO_RETRY || time.milliseconds() >= retryTimestamp) { + // Send a request only if we reached the retry deadline, or if no deadline was set. + if (nextProducerIdBlock.get() == null && + requestInFlight.compareAndSet(false, true)) { + backoffDeadlineMs.set(NO_RETRY); + sendRequest(); + // Reset backoff after a successful send. + backoffDeadlineMs.set(NO_RETRY); + } + } + } + + private void maybeRequestNextBlock() { + var retryTimestamp = backoffDeadlineMs.get(); + if (retryTimestamp == NO_RETRY || time.milliseconds() >= retryTimestamp) { + // Send a request only if we reached the retry deadline, or if no deadline was set. + if (nextProducerIdBlock.get() == null && + requestInFlight.compareAndSet(false, true)) { + sendRequest(); + // Reset backoff after a successful send. + backoffDeadlineMs.set(NO_RETRY); + } + } + } + + protected void sendRequest() { + var message = new AllocateProducerIdsRequestData() + .setBrokerEpoch(brokerEpochSupplier.get()) + .setBrokerId(brokerId); + var request = new AllocateProducerIdsRequest.Builder(message); + controllerChannel.sendRequest(request, new ControllerRequestCompletionHandler() { + + @Override + public void onComplete(ClientResponse response) { + if (response.responseBody() instanceof AllocateProducerIdsResponse) { + handleAllocateProducerIdsResponse((AllocateProducerIdsResponse) response.responseBody()); + } + } + + @Override + public void onTimeout() { + handleTimeout(); Review Comment: Could you please inline `handleTimeout`? ########## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AllocateProducerIdsRequest; +import org.apache.kafka.common.requests.AllocateProducerIdsResponse; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.ControllerRequestCompletionHandler; +import org.apache.kafka.server.common.NodeToControllerChannelManager; +import org.apache.kafka.server.common.ProducerIdsBlock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public class RPCProducerIdManager implements ProducerIdManager { + + public static final int RETRY_BACKOFF_MS = 50; + // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block + protected static final double PID_PREFETCH_THRESHOLD = 0.90; + protected static final int ITERATION_LIMIT = 3; Review Comment: private ########## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AllocateProducerIdsRequest; +import org.apache.kafka.common.requests.AllocateProducerIdsResponse; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.ControllerRequestCompletionHandler; +import org.apache.kafka.server.common.NodeToControllerChannelManager; +import org.apache.kafka.server.common.ProducerIdsBlock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public class RPCProducerIdManager implements ProducerIdManager { + + public static final int RETRY_BACKOFF_MS = 50; Review Comment: package-private ########## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AllocateProducerIdsRequest; +import org.apache.kafka.common.requests.AllocateProducerIdsResponse; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.ControllerRequestCompletionHandler; +import org.apache.kafka.server.common.NodeToControllerChannelManager; +import org.apache.kafka.server.common.ProducerIdsBlock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public class RPCProducerIdManager implements ProducerIdManager { + + public static final int RETRY_BACKOFF_MS = 50; + // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block + protected static final double PID_PREFETCH_THRESHOLD = 0.90; + protected static final int ITERATION_LIMIT = 3; + protected static final long NO_RETRY = -1L; Review Comment: private ########## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AllocateProducerIdsRequest; +import org.apache.kafka.common.requests.AllocateProducerIdsResponse; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.ControllerRequestCompletionHandler; +import org.apache.kafka.server.common.NodeToControllerChannelManager; +import org.apache.kafka.server.common.ProducerIdsBlock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public class RPCProducerIdManager implements ProducerIdManager { + + public static final int RETRY_BACKOFF_MS = 50; + // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block + protected static final double PID_PREFETCH_THRESHOLD = 0.90; + protected static final int ITERATION_LIMIT = 3; + protected static final long NO_RETRY = -1L; + + private static final Logger log = LoggerFactory.getLogger(RPCProducerIdManager.class); + private final String logPrefix; + + private final int brokerId; + private final Time time; + private final Supplier<Long> brokerEpochSupplier; + private final NodeToControllerChannelManager controllerChannel; + + // Visible for testing + final AtomicReference<ProducerIdsBlock> nextProducerIdBlock = new AtomicReference<>(null); + private final AtomicReference<ProducerIdsBlock> currentProducerIdBlock = new AtomicReference<>(ProducerIdsBlock.EMPTY); + private final AtomicBoolean requestInFlight = new AtomicBoolean(false); + private final AtomicLong backoffDeadlineMs = new AtomicLong(NO_RETRY); + + public RPCProducerIdManager(int brokerId, + Time time, + Supplier<Long> brokerEpochSupplier, + NodeToControllerChannelManager controllerChannel + ) { + this.brokerId = brokerId; + this.time = time; + this.brokerEpochSupplier = brokerEpochSupplier; + this.controllerChannel = controllerChannel; + this.logPrefix = "[RPC ProducerId Manager " + brokerId + "]: "; + } + + + @Override + public Long generateProducerId() { + var iteration = 0; + while (iteration <= ITERATION_LIMIT) { + var claimNextId = currentProducerIdBlock.get().claimNextId(); + if (claimNextId.isPresent()) { + var nextProducerId = claimNextId.get(); + // Check if we need to prefetch the next block + var prefetchTarget = currentProducerIdBlock.get().firstProducerId() + + (long) (currentProducerIdBlock.get().size() * PID_PREFETCH_THRESHOLD); + if (nextProducerId == prefetchTarget) { + maybePrefetchNextBlock(); + } + return nextProducerId; + } else { + // Check the next block if current block is full + var block = nextProducerIdBlock.getAndSet(null); + if (block == null) { + // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal + // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. + maybeRequestNextBlock(); + throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"); + } else { + currentProducerIdBlock.set(block); + requestInFlight.set(false); + iteration++; + } + } + } + throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block"); + } + + @Override + public void shutdown() { + + } + + private void maybePrefetchNextBlock() { Review Comment: why we need this method? it is almost same to `maybeRequestNextBlock` ########## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ########## @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.coordinator.transaction - -import kafka.coordinator.transaction.ProducerIdManager.{IterationLimit, NoRetry, RetryBackoffMs} -import kafka.utils.Logging -import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode} -import org.apache.kafka.clients.ClientResponse -import org.apache.kafka.common.KafkaException -import org.apache.kafka.common.message.AllocateProducerIdsRequestData -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{AllocateProducerIdsRequest, AllocateProducerIdsResponse} -import org.apache.kafka.common.utils.Time -import org.apache.kafka.server.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} -import org.apache.kafka.server.common.ProducerIdsBlock - -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} -import scala.jdk.OptionConverters.RichOptional -import scala.util.{Failure, Success, Try} - -/** - * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way - * such that the same producerId will not be assigned twice across multiple transaction coordinators. - * - * ProducerIds are managed by the controller. When requesting a new range of IDs, we are guaranteed to receive - * a unique block. - */ - -object ProducerIdManager { - // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block - val PidPrefetchThreshold: Double = 0.90 - val IterationLimit: Int = 3 - val RetryBackoffMs: Int = 50 - val NoRetry: Long = -1L - - // Creates a ProducerIdGenerate that directly interfaces with ZooKeeper, IBP < 3.0-IV0 - def zk(brokerId: Int, zkClient: KafkaZkClient): ZkProducerIdManager = { - new ZkProducerIdManager(brokerId, zkClient) - } - - // Creates a ProducerIdGenerate that uses AllocateProducerIds RPC, IBP >= 3.0-IV0 - def rpc(brokerId: Int, - time: Time, - brokerEpochSupplier: () => Long, - controllerChannel: NodeToControllerChannelManager): RPCProducerIdManager = { - - new RPCProducerIdManager(brokerId, time, brokerEpochSupplier, controllerChannel) - } -} - -trait ProducerIdManager { - def generateProducerId(): Try[Long] - def shutdown() : Unit = {} - - // For testing purposes - def hasValidBlock: Boolean -} - -object ZkProducerIdManager { - def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = { - // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other - // brokers may be generating PID blocks during a rolling upgrade - var zkWriteComplete = false - while (!zkWriteComplete) { - // refresh current producerId block from zookeeper again - val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path) - - // generate the new producerId block - val newProducerIdBlock = dataOpt match { - case Some(data) => - val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data) - logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion") - - if (currProducerIdBlock.lastProducerId > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) { - // we have exhausted all producerIds (wow!), treat it as a fatal error - logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.lastProducerId})") - throw new KafkaException("Have exhausted all producerIds.") - } - - new ProducerIdsBlock(brokerId, currProducerIdBlock.nextBlockFirstId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) - case None => - logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block") - new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) - } - - val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock) - - // try to write the new producerId block into zookeeper - val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None) - zkWriteComplete = succeeded - - if (zkWriteComplete) { - logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version") - return newProducerIdBlock - } - } - throw new IllegalStateException() - } -} - -class ZkProducerIdManager(brokerId: Int, zkClient: KafkaZkClient) extends ProducerIdManager with Logging { - - this.logIdent = "[ZK ProducerId Manager " + brokerId + "]: " - - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = _ - - // grab the first block of producerIds - this synchronized { - allocateNewProducerIdBlock() - nextProducerId = currentProducerIdBlock.firstProducerId - } - - private def allocateNewProducerIdBlock(): Unit = { - this synchronized { - currentProducerIdBlock = ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this) - } - } - - def generateProducerId(): Try[Long] = { - this synchronized { - // grab a new block of producerIds if this block has been exhausted - if (nextProducerId > currentProducerIdBlock.lastProducerId) { - try { - allocateNewProducerIdBlock() - } catch { - case t: Throwable => - return Failure(t) - } - nextProducerId = currentProducerIdBlock.firstProducerId - } - nextProducerId += 1 - Success(nextProducerId - 1) - } - } - - override def hasValidBlock: Boolean = { - this synchronized { - !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) - } - } -} - -/** - * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests - * for producers to retry if it does not have an available producer id and is waiting on a new block. - */ -class RPCProducerIdManager(brokerId: Int, - time: Time, - brokerEpochSupplier: () => Long, - controllerChannel: NodeToControllerChannelManager) extends ProducerIdManager with Logging { - - this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - - // Visible for testing - private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) - private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) - private val requestInFlight = new AtomicBoolean(false) - private val backoffDeadlineMs = new AtomicLong(NoRetry) - - override def hasValidBlock: Boolean = { - nextProducerIdBlock.get != null - } - - override def generateProducerId(): Try[Long] = { - var result: Try[Long] = null - var iteration = 0 - while (result == null) { - currentProducerIdBlock.get.claimNextId().toScala match { - case None => - // Check the next block if current block is full - val block = nextProducerIdBlock.getAndSet(null) - if (block == null) { - // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal - // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. - maybeRequestNextBlock() - result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block")) - } else { - currentProducerIdBlock.set(block) - requestInFlight.set(false) - iteration = iteration + 1 - } - - case Some(nextProducerId) => - // Check if we need to prefetch the next block - val prefetchTarget = currentProducerIdBlock.get.firstProducerId + (currentProducerIdBlock.get.size * ProducerIdManager.PidPrefetchThreshold).toLong - if (nextProducerId == prefetchTarget) { - maybeRequestNextBlock() - } - result = Success(nextProducerId) - } - if (iteration == IterationLimit) { - result = Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block")) - } - } - result - } - - - private def maybeRequestNextBlock(): Unit = { - val retryTimestamp = backoffDeadlineMs.get() - if (retryTimestamp == NoRetry || time.milliseconds() >= retryTimestamp) { - // Send a request only if we reached the retry deadline, or if no deadline was set. - - if (nextProducerIdBlock.get == null && - requestInFlight.compareAndSet(false, true)) { - - sendRequest() - // Reset backoff after a successful send. - backoffDeadlineMs.set(NoRetry) - } - } - } - - // Visible for testing - private[transaction] def sendRequest(): Unit = { - val message = new AllocateProducerIdsRequestData() - .setBrokerEpoch(brokerEpochSupplier.apply()) - .setBrokerId(brokerId) - - val request = new AllocateProducerIdsRequest.Builder(message) - debug("Requesting next Producer ID block") Review Comment: why removing this debug message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
