This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 50598191dcd MINOR: Add tests on TxnOffsetCommit and EndTxnMarker
protection against invalid producer epoch when TV2 is used (#20024)
50598191dcd is described below
commit 50598191dcd4763384ef1798fc07db899f731f3d
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Sun Jul 20 18:34:29 2025 -0400
MINOR: Add tests on TxnOffsetCommit and EndTxnMarker protection against
invalid producer epoch when TV2 is used (#20024)
This patch adds an API level integration test for the producer epoch
verification when processing transactional offset commit and end txn
markers.
Reviewers: PoAn Yang <[email protected]>, TengYao Chi
<[email protected]>, Sean Quah <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../server/GroupCoordinatorBaseRequestTest.scala | 56 ++++++-
.../kafka/server/TxnOffsetCommitRequestTest.scala | 141 +++++++++++++----
.../kafka/server/WriteTxnMarkersRequestTest.scala | 172 +++++++++++++++++++++
3 files changed, 338 insertions(+), 31 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 67e4cb8df53..431e431504f 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -19,14 +19,16 @@ package kafka.server
import kafka.network.SocketServer
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord,
RecordMetadata}
+import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.{TopicCollection, TopicIdPartition,
TopicPartition, Uuid}
import
org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult,
DeletableGroupResultCollection}
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData,
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData,
DeleteGroupsResponseData, DescribeGroupsRequestData,
DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData,
HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData,
JoinGroupResponseData, LeaveGroupRes [...]
+import
org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker,
WritableTxnMarkerTopic}
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData,
AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData,
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData,
DeleteGroupsResponseData, DescribeGroupsRequestData,
DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData,
HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData,
JoinGroupResponseData, LeaveGroupRes [...]
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest,
ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest,
ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse,
DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse,
HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest,
InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, L [...]
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest,
ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest,
ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse,
DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse,
HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest,
InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, L [...]
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
@@ -352,6 +354,35 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
assertEquals(expectedError,
connectAndReceive[EndTxnResponse](request).error)
}
+ protected def writeTxnMarkers(
+ producerId: Long,
+ producerEpoch: Short,
+ committed: Boolean,
+ expectedError: Errors = Errors.NONE,
+ version: Short =
ApiKeys.WRITE_TXN_MARKERS.latestVersion(isUnstableApiEnabled)
+ ): Unit = {
+ val request = new WriteTxnMarkersRequest.Builder(
+ new WriteTxnMarkersRequestData()
+ .setMarkers(List(
+ new WritableTxnMarker()
+ .setProducerId(producerId)
+ .setProducerEpoch(producerEpoch)
+ .setTransactionResult(committed)
+ .setTopics(List(
+ new WritableTxnMarkerTopic()
+ .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+ .setPartitionIndexes(List[Integer](0).asJava)
+ ).asJava)
+ .setCoordinatorEpoch(0)
+ ).asJava)
+ ).build(version)
+
+ assertEquals(
+ expectedError.code,
+
connectAndReceive[WriteTxnMarkersResponse](request).data.markers.get(0).topics.get(0).partitions.get(0).errorCode
+ )
+ }
+
protected def fetchOffsets(
groups: List[OffsetFetchRequestData.OffsetFetchRequestGroup],
requireStable: Boolean,
@@ -422,6 +453,27 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
groupResponse
}
+ protected def fetchOffset(
+ groupId: String,
+ topic: String,
+ partition: Int
+ ): Long = {
+ val groupIdRecord = fetchOffsets(
+ group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(groupId)
+ .setTopics(List(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName(topic)
+ .setPartitionIndexes(List[Integer](partition).asJava)
+ ).asJava),
+ requireStable = true,
+ version = 9
+ )
+ val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head
+ val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex
== partition).head
+ partitionRecord.committedOffset
+ }
+
protected def deleteOffset(
groupId: String,
topic: String,
diff --git
a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
index b0f1bee2333..aef40390d85 100644
--- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
@@ -16,19 +16,16 @@
*/
package kafka.server
-import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.UnsupportedVersionException
-import org.apache.kafka.common.message.OffsetFetchRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.JoinGroupRequest
+import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest}
import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
-
-import scala.jdk.CollectionConverters._
+import org.junit.jupiter.api.Assertions.{assertNotEquals, assertThrows}
@ClusterTestDefaults(
types = Array(Type.KRAFT),
@@ -51,6 +48,16 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
testTxnOffsetCommit(false)
}
+ @ClusterTest
+ def
testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithNewConsumerGroupProtocol():
Unit = {
+ testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(true)
+ }
+
+ @ClusterTest
+ def
testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithOldConsumerGroupProtocol():
Unit = {
+ testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(false)
+ }
+
private def testTxnOffsetCommit(useNewProtocol: Boolean): Unit = {
val topic = "topic"
val partition = 0
@@ -65,8 +72,8 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
// Join the consumer group. Note that we don't heartbeat here so we must
use
// a session long enough for the duration of the test.
val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId,
useNewProtocol)
- assertTrue(memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID)
- assertTrue(memberEpoch != JoinGroupRequest.UNKNOWN_GENERATION_ID)
+ assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
+ assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
createTopic(topic, 1)
@@ -178,7 +185,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
transactionalId = transactionalId
)
- val originalOffset = fetchOffset(topic, partition, groupId)
+ val originalOffset = fetchOffset(groupId, topic, partition)
commitTxnOffset(
groupId = groupId,
@@ -207,31 +214,107 @@ class
TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
TestUtils.waitUntilTrue(() =>
try {
- fetchOffset(topic, partition, groupId) == expectedOffset
+ fetchOffset(groupId, topic, partition) == expectedOffset
} catch {
case _: Throwable => false
}, "txn commit offset validation failed"
)
}
- private def fetchOffset(
- topic: String,
- partition: Int,
- groupId: String
- ): Long = {
- val groupIdRecord = fetchOffsets(
- group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
- .setGroupId(groupId)
- .setTopics(List(
- new OffsetFetchRequestData.OffsetFetchRequestTopics()
- .setName(topic)
- .setPartitionIndexes(List[Integer](partition).asJava)
- ).asJava),
- requireStable = true,
- version = 9
- )
- val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head
- val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex
== partition).head
- partitionRecord.committedOffset
+ private def
testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(useNewProtocol: Boolean):
Unit = {
+ val topic = "topic"
+ val partition = 0
+ val transactionalId = "txn"
+ val groupId = "group"
+ val offset = 100L
+
+ // Creates the __consumer_offsets and __transaction_state topics because
it won't be created automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+ createTransactionStateTopic()
+
+ // Join the consumer group. Note that we don't heartbeat here so we must
use
+ // a session long enough for the duration of the test.
+ val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId,
useNewProtocol)
+ assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
+ assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
+
+ createTopic(topic, 1)
+
+ for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+ val useTV2 = version >
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
+
+ // Initialize producer. Wait until the coordinator finishes loading.
+ var producerIdAndEpoch: ProducerIdAndEpoch = null
+ TestUtils.waitUntilTrue(() =>
+ try {
+ producerIdAndEpoch = initProducerId(
+ transactionalId = transactionalId,
+ producerIdAndEpoch = ProducerIdAndEpoch.NONE,
+ expectedError = Errors.NONE
+ )
+ true
+ } catch {
+ case _: Throwable => false
+ }, "initProducerId request failed"
+ )
+
+ addOffsetsToTxn(
+ groupId = groupId,
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = producerIdAndEpoch.epoch,
+ transactionalId = transactionalId
+ )
+
+ // Complete the transaction.
+ endTxn(
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = producerIdAndEpoch.epoch,
+ transactionalId = transactionalId,
+ isTransactionV2Enabled = useTV2,
+ committed = true,
+ expectedError = Errors.NONE
+ )
+
+ // Start a new transaction. Wait for the previous transaction to
complete.
+ TestUtils.waitUntilTrue(() =>
+ try {
+ addOffsetsToTxn(
+ groupId = groupId,
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort
else producerIdAndEpoch.epoch,
+ transactionalId = transactionalId
+ )
+ true
+ } catch {
+ case _: Throwable => false
+ }, "addOffsetsToTxn request failed"
+ )
+
+ // Committing offset with old epoch succeeds for TV1 and fails for TV2.
+ commitTxnOffset(
+ groupId = groupId,
+ memberId = if (version >= 3) memberId else
JoinGroupRequest.UNKNOWN_MEMBER_ID,
+ generationId = if (version >= 3) 1 else
JoinGroupRequest.UNKNOWN_GENERATION_ID,
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = producerIdAndEpoch.epoch,
+ transactionalId = transactionalId,
+ topic = topic,
+ partition = partition,
+ offset = offset,
+ expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else
Errors.NONE,
+ version = version.toShort
+ )
+
+ // Complete the transaction.
+ endTxn(
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort
else producerIdAndEpoch.epoch,
+ transactionalId = transactionalId,
+ isTransactionV2Enabled = useTV2,
+ committed = true,
+ expectedError = Errors.NONE
+ )
+ }
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
new file mode 100644
index 00000000000..a68de4dacc0
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest}
+import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
+import org.apache.kafka.common.utils.ProducerIdAndEpoch
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.junit.jupiter.api.Assertions.assertNotEquals
+
+@ClusterTestDefaults(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ )
+)
+class WriteTxnMarkersRequestTest(cluster:ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
+ @ClusterTest
+ def
testDelayedWriteTxnMarkersShouldNotCommitTxnOffsetWithNewConsumerGroupProtocol():
Unit = {
+ testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(true)
+ }
+
+ @ClusterTest
+ def
testDelayedWriteTxnMarkersShouldNotCommitTxnOffsetWithOldConsumerGroupProtocol():
Unit = {
+ testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(false)
+ }
+
+ private def
testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(useNewProtocol: Boolean):
Unit = {
+ val topic = "topic"
+ val partition = 0
+ val transactionalId = "txn"
+ val groupId = "group"
+ val offset = 100L
+
+ // Creates the __consumer_offsets and __transaction_state topics because
it won't be created automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+ createTransactionStateTopic()
+
+ // Join the consumer group. Note that we don't heartbeat here so we must
use
+ // a session long enough for the duration of the test.
+ val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId,
useNewProtocol)
+ assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
+ assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
+
+ createTopic(topic, 1)
+
+ for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+ val useTV2 = version >
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
+
+ // Initialize producer. Wait until the coordinator finishes loading.
+ var producerIdAndEpoch: ProducerIdAndEpoch = null
+ TestUtils.waitUntilTrue(() =>
+ try {
+ producerIdAndEpoch = initProducerId(
+ transactionalId = transactionalId,
+ producerIdAndEpoch = ProducerIdAndEpoch.NONE,
+ expectedError = Errors.NONE
+ )
+ true
+ } catch {
+ case _: Throwable => false
+ }, "initProducerId request failed"
+ )
+
+ addOffsetsToTxn(
+ groupId = groupId,
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = producerIdAndEpoch.epoch,
+ transactionalId = transactionalId
+ )
+
+ // Complete the transaction.
+ endTxn(
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = producerIdAndEpoch.epoch,
+ transactionalId = transactionalId,
+ isTransactionV2Enabled = useTV2,
+ committed = true,
+ expectedError = Errors.NONE
+ )
+
+ // Start a new transaction. Wait for the previous transaction to
complete.
+ TestUtils.waitUntilTrue(() =>
+ try {
+ addOffsetsToTxn(
+ groupId = groupId,
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort
else producerIdAndEpoch.epoch,
+ transactionalId = transactionalId
+ )
+ true
+ } catch {
+ case _: Throwable => false
+ }, "addOffsetsToTxn request failed"
+ )
+
+ commitTxnOffset(
+ groupId = groupId,
+ memberId = if (version >= 3) memberId else
JoinGroupRequest.UNKNOWN_MEMBER_ID,
+ generationId = if (version >= 3) 1 else
JoinGroupRequest.UNKNOWN_GENERATION_ID,
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort
else producerIdAndEpoch.epoch,
+ transactionalId = transactionalId,
+ topic = topic,
+ partition = partition,
+ offset = offset + version,
+ expectedError = Errors.NONE,
+ version = version.toShort
+ )
+
+ // Delayed txn marker should be accepted for TV1 and rejected for TV2.
+ // Note that for the ideal case, producer epoch + 1 should also be
rejected for TV2,
+ // which is still under fixing.
+ writeTxnMarkers(
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = producerIdAndEpoch.epoch,
+ committed = true,
+ expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else
Errors.NONE
+ )
+
+ // The offset is committed for TV1 and not committed for TV2.
+ TestUtils.waitUntilTrue(() =>
+ try {
+ fetchOffset(groupId, topic, partition) == (if (useTV2) -1L else
offset + version)
+ } catch {
+ case _: Throwable => false
+ }, "unexpected txn commit offset"
+ )
+
+ // Complete the transaction.
+ endTxn(
+ producerId = producerIdAndEpoch.producerId,
+ producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort
else producerIdAndEpoch.epoch,
+ transactionalId = transactionalId,
+ isTransactionV2Enabled = useTV2,
+ committed = true,
+ expectedError = Errors.NONE
+ )
+
+ // The offset is committed for TV2.
+ TestUtils.waitUntilTrue(() =>
+ try {
+ fetchOffset(groupId, topic, partition) == offset + version
+ } catch {
+ case _: Throwable => false
+ }, "txn commit offset validation failed"
+ )
+ }
+ }
+}