This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5bbc421a13e MINOR: update TransactionLog#readTxnRecordValue to
initialize TransactionMetadata with non-empty topic partitions (#20370)
5bbc421a13e is described below
commit 5bbc421a13ee159bbff1cfabd012988061fc0c47
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Aug 26 10:36:45 2025 +0800
MINOR: update TransactionLog#readTxnRecordValue to initialize
TransactionMetadata with non-empty topic partitions (#20370)
This is followup PR for https://github.com/apache/kafka/pull/19699.
* Update TransactionLog#readTxnRecordValue to initialize
TransactionMetadata with non-empty topic partitions
* Update `TxnTransitMetadata` comment, because it's not immutable.
Reviewers: TengYao Chi <[email protected]>, Justine Olshan
<[email protected]>, Kuan-Po Tseng <[email protected]>, Chia-Ping
Tsai <[email protected]>
---
.../coordinator/transaction/TransactionLog.scala | 23 ++++++++++------------
.../transaction/TransactionMetadata.java | 5 +++++
.../transaction/TxnTransitMetadata.java | 2 +-
3 files changed, 16 insertions(+), 14 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index 75baa98da15..a3e9eacb66f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -115,7 +115,13 @@ object TransactionLog {
val version = buffer.getShort
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version
<= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
val value = new TransactionLogValue(new ByteBufferAccessor(buffer),
version)
- val transactionMetadata = new TransactionMetadata(
+ val state = TransactionState.fromId(value.transactionStatus)
+ val tps: util.Set[TopicPartition] = new util.HashSet[TopicPartition]()
+ if (!state.equals(TransactionState.EMPTY))
+ value.transactionPartitions.forEach(partitionsSchema => {
+ partitionsSchema.partitionIds.forEach(partitionId => tps.add(new
TopicPartition(partitionsSchema.topic, partitionId.intValue())))
+ })
+ Some(new TransactionMetadata(
transactionalId,
value.producerId,
value.previousProducerId,
@@ -123,20 +129,11 @@ object TransactionLog {
value.producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH,
value.transactionTimeoutMs,
- TransactionState.fromId(value.transactionStatus),
- util.Set.of(),
+ state,
+ tps,
value.transactionStartTimestampMs,
value.transactionLastUpdateTimestampMs,
- TransactionVersion.fromFeatureLevel(value.clientTransactionVersion))
-
- if (!transactionMetadata.state.equals(TransactionState.EMPTY))
- value.transactionPartitions.forEach(partitionsSchema => {
- transactionMetadata.addPartitions(partitionsSchema.partitionIds
- .stream
- .map(partitionId => new TopicPartition(partitionsSchema.topic,
partitionId.intValue()))
- .toList)
- })
- Some(transactionMetadata)
+ TransactionVersion.fromFeatureLevel(value.clientTransactionVersion)))
} else throw new IllegalStateException(s"Unknown version $version from
the transaction log message value")
}
}
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
index 96a92dd01c9..8efeedc3ec4 100644
---
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
@@ -117,6 +117,7 @@ public class TransactionMetadata {
}
}
+ // VisibleForTesting
public void addPartitions(Collection<TopicPartition> partitions) {
topicPartitions.addAll(partitions);
}
@@ -500,6 +501,7 @@ public class TransactionMetadata {
return transactionalId;
}
+ // VisibleForTesting
public void setProducerId(long producerId) {
this.producerId = producerId;
}
@@ -507,6 +509,7 @@ public class TransactionMetadata {
return producerId;
}
+ // VisibleForTesting
public void setPrevProducerId(long prevProducerId) {
this.prevProducerId = prevProducerId;
}
@@ -534,6 +537,7 @@ public class TransactionMetadata {
return txnTimeoutMs;
}
+ // VisibleForTesting
public void state(TransactionState state) {
this.state = state;
}
@@ -550,6 +554,7 @@ public class TransactionMetadata {
return txnStartTimestamp;
}
+ // VisibleForTesting
public void txnLastUpdateTimestamp(long txnLastUpdateTimestamp) {
this.txnLastUpdateTimestamp = txnLastUpdateTimestamp;
}
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java
index 452c168687e..b2e78d45da3 100644
---
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java
@@ -22,7 +22,7 @@ import org.apache.kafka.server.common.TransactionVersion;
import java.util.HashSet;
/**
- * Immutable object representing the target transition of the transaction
metadata
+ * Represent the target transition of the transaction metadata. The
topicPartitions field is mutable.
*/
public record TxnTransitMetadata(
long producerId,