hachikuji commented on a change in pull request #8725:
URL: https://github.com/apache/kafka/pull/8725#discussion_r490627471



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test tries to test out the EOS robustness on the client side. It 
features a {@link TransactionSimulationCoordinator}
+ * which handles the incoming transactional produce/metadata requests and 
gives basic feedback through {@link MockClient}.
+ *
+ * Each iteration the transaction manager will append one record through 
accumulator and commit offset at the same time. The request
+ * being transmitted is not guaranteed to be processed or processed correctly, 
so a state checking loop is enforced to make the client
+ * and the coordinator interact with each other and ensure the state could be 
eventually clean using {@link TransactionManager#isReady}.
+ * By the end of the test we will check whether all the committed transactions 
are successfully materialized on the coordinator side.
+ *
+ * Features supported:
+ * 
+ * 1. Randomly abort transaction
+ * 2. Fault injection on response
+ * 3. Random message drop
+ */
+public class TransactionEventSimulationTest {
+
+    private TransactionManager transactionManager;
+    private TransactionSimulationCoordinator transactionCoordinator;
+    private Sender sender;
+    private final LogContext logContext = new LogContext();
+
+    private final MockTime time = new MockTime();
+    private final int requestTimeoutMs = 100;
+    private final int retryBackOffMs = 0;
+    private final long apiVersion = 0L;
+
+    private ProducerMetadata metadata = new ProducerMetadata(0, 
Long.MAX_VALUE, 10,
+        new LogContext(), new ClusterResourceListeners(), time);
+    private MockClient client = new MockClient(time, metadata);
+
+    @Before
+    public void setup() {
+        transactionManager = new TransactionManager(logContext, "txn-id",
+            requestTimeoutMs, apiVersion, new ApiVersions(), false);
+        transactionCoordinator = new TransactionSimulationCoordinator(client);
+    }
+
+    @Test
+    public void simulateTxnEvents() throws InterruptedException {
+        final int batchSize = 100;
+        final int lingerMs = 0;
+        final int deliveryTimeoutMs = 10;
+
+        RecordAccumulator accumulator = new RecordAccumulator(logContext, 
batchSize, CompressionType.GZIP,
+            lingerMs, retryBackOffMs, deliveryTimeoutMs, new Metrics(), 
"accumulator", time, new ApiVersions(), transactionManager,

Review comment:
       nit: make `Metrics` a field

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and 
group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction 
turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable 
error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();
+
+        if (incomingRequests.peek() == null) {
+            return;
+        }
+
+        final AbstractResponse response;
+        AbstractRequest nextRequest = 
incomingRequests.peek().requestBuilder().build();
+        if (nextRequest instanceof FindCoordinatorRequest) {
+            response = handleFindCoordinator(faultInject);
+        } else if (nextRequest instanceof InitProducerIdRequest) {
+            response = handleInitProducerId((InitProducerIdRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof AddPartitionsToTxnRequest) {
+            response = handleAddPartitionToTxn((AddPartitionsToTxnRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof AddOffsetsToTxnRequest) {
+            response = handleAddOffsetsToTxn((AddOffsetsToTxnRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof TxnOffsetCommitRequest) {
+            response = handleTxnCommit((TxnOffsetCommitRequest) nextRequest, 
faultInject);
+        } else if (nextRequest instanceof ProduceRequest) {
+            response = handleProduce((ProduceRequest) nextRequest, 
faultInject);
+        } else if (nextRequest instanceof EndTxnRequest) {
+            response = handleEndTxn((EndTxnRequest) nextRequest, faultInject);
+        } else {
+            throw new IllegalArgumentException("Unknown request: " + 
nextRequest);
+        }
+
+        networkClient.respond(response, dropMessage);
+    }
+
+    private FindCoordinatorResponse handleFindCoordinator(final boolean 
faultInject) {
+        if (faultInject) {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+        } else {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setHost("localhost")
+                    .setNodeId(0)
+                    .setPort(2211)
+            );
+        }
+    }
+
+    private InitProducerIdResponse handleInitProducerId(InitProducerIdRequest 
request,
+                                                        final boolean 
faultInject) {
+        if (faultInject) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+        } else if (request.data.producerId() != NO_PRODUCER_ID &&
+                    request.data.producerId() != currentProducerId) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != NO_PRODUCER_EPOCH &&
+                    request.data.producerEpoch() != currentEpoch) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            currentProducerId += 1;
+            currentEpoch += 1;
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setProducerId(currentProducerId)
+                    .setProducerEpoch(currentEpoch)
+                    .setErrorCode(Errors.NONE.code())
+            );
+        }
+    }
+
+    private AddPartitionsToTxnResponse 
handleAddPartitionToTxn(AddPartitionsToTxnRequest request,
+                                                               final boolean 
faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.partitions().forEach(topicPartition -> {
+            if (faultInject) {
+                errors.put(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(topicPartition, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(topicPartition, Errors.INVALID_PRODUCER_EPOCH);
+            } else {
+                errors.put(topicPartition, Errors.NONE);
+            }
+        });
+
+        return new AddPartitionsToTxnResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AddOffsetsToTxnResponse 
handleAddOffsetsToTxn(AddOffsetsToTxnRequest request,
+                                                          final boolean 
faultInject) {
+        if (faultInject) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerId() != currentProducerId) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != currentEpoch) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            offsetsAddedToTxn = true;
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        }
+    }
+
+    private AbstractResponse handleTxnCommit(TxnOffsetCommitRequest request,
+                                             final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.data.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
+            TopicPartition key = new TopicPartition(topic.name(), 
partition.partitionIndex());
+            if (faultInject) {
+                errors.put(key, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(key, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(key, Errors.INVALID_PRODUCER_EPOCH);
+            } else if (offsetsAddedToTxn) {
+                pendingOffsets.put(key, partition.committedOffset());
+                errors.put(key, Errors.NONE);
+            } else {
+                errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION);

Review comment:
       In this case, the client has send TxnOffsetCommit before the partition 
has been added to the transaction, which is an illegal state transition. Could 
we throw an assertion error or something directly to cause the test to fail?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test tries to test out the EOS robustness on the client side. It 
features a {@link TransactionSimulationCoordinator}
+ * which handles the incoming transactional produce/metadata requests and 
gives basic feedback through {@link MockClient}.
+ *
+ * Each iteration the transaction manager will append one record through 
accumulator and commit offset at the same time. The request
+ * being transmitted is not guaranteed to be processed or processed correctly, 
so a state checking loop is enforced to make the client
+ * and the coordinator interact with each other and ensure the state could be 
eventually clean using {@link TransactionManager#isReady}.
+ * By the end of the test we will check whether all the committed transactions 
are successfully materialized on the coordinator side.
+ *
+ * Features supported:
+ * 
+ * 1. Randomly abort transaction
+ * 2. Fault injection on response
+ * 3. Random message drop
+ */
+public class TransactionEventSimulationTest {
+
+    private TransactionManager transactionManager;
+    private TransactionSimulationCoordinator transactionCoordinator;
+    private Sender sender;
+    private final LogContext logContext = new LogContext();
+
+    private final MockTime time = new MockTime();
+    private final int requestTimeoutMs = 100;
+    private final int retryBackOffMs = 0;
+    private final long apiVersion = 0L;
+
+    private ProducerMetadata metadata = new ProducerMetadata(0, 
Long.MAX_VALUE, 10,
+        new LogContext(), new ClusterResourceListeners(), time);

Review comment:
       nit: use `logContext

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test tries to test out the EOS robustness on the client side. It 
features a {@link TransactionSimulationCoordinator}
+ * which handles the incoming transactional produce/metadata requests and 
gives basic feedback through {@link MockClient}.
+ *
+ * Each iteration the transaction manager will append one record through 
accumulator and commit offset at the same time. The request
+ * being transmitted is not guaranteed to be processed or processed correctly, 
so a state checking loop is enforced to make the client
+ * and the coordinator interact with each other and ensure the state could be 
eventually clean using {@link TransactionManager#isReady}.
+ * By the end of the test we will check whether all the committed transactions 
are successfully materialized on the coordinator side.
+ *
+ * Features supported:
+ * 
+ * 1. Randomly abort transaction
+ * 2. Fault injection on response
+ * 3. Random message drop
+ */
+public class TransactionEventSimulationTest {
+
+    private TransactionManager transactionManager;
+    private TransactionSimulationCoordinator transactionCoordinator;
+    private Sender sender;
+    private final LogContext logContext = new LogContext();
+
+    private final MockTime time = new MockTime();
+    private final int requestTimeoutMs = 100;
+    private final int retryBackOffMs = 0;
+    private final long apiVersion = 0L;
+
+    private ProducerMetadata metadata = new ProducerMetadata(0, 
Long.MAX_VALUE, 10,
+        new LogContext(), new ClusterResourceListeners(), time);
+    private MockClient client = new MockClient(time, metadata);
+
+    @Before
+    public void setup() {
+        transactionManager = new TransactionManager(logContext, "txn-id",
+            requestTimeoutMs, apiVersion, new ApiVersions(), false);
+        transactionCoordinator = new TransactionSimulationCoordinator(client);
+    }
+
+    @Test
+    public void simulateTxnEvents() throws InterruptedException {
+        final int batchSize = 100;
+        final int lingerMs = 0;
+        final int deliveryTimeoutMs = 10;
+
+        RecordAccumulator accumulator = new RecordAccumulator(logContext, 
batchSize, CompressionType.GZIP,
+            lingerMs, retryBackOffMs, deliveryTimeoutMs, new Metrics(), 
"accumulator", time, new ApiVersions(), transactionManager,
+            new BufferPool(1000, 100, new Metrics(), time, 
"producer-internal-metrics"));
+
+        metadata.add("topic", time.milliseconds());
+        
metadata.update(metadata.newMetadataRequestAndVersion(time.milliseconds()).requestVersion,
+            TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 
2)), true, time.milliseconds());
+
+        sender = new Sender(logContext, client, metadata, accumulator, false, 
100, (short) 1,
+            Integer.MAX_VALUE, new SenderMetricsRegistry(new Metrics()), time, 
requestTimeoutMs, 10, transactionManager, new ApiVersions());
+
+        transactionManager.initializeTransactions();
+        sender.runOnce();
+        resolvePendingRequests();
+        final int numTransactions = 100;
+
+        TopicPartition key = new TopicPartition("topic", 0);
+        long committedOffsets = 0L;
+        Random abortTxn = new Random();
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, 
Collections.singletonMap("topic", 2)));
+        final long timestamp = 0L;
+        final int maxBlockTime = 0;
+
+        for (int i = 0; i < numTransactions; i++) {
+            transactionManager.beginTransaction();
+            transactionManager.maybeAddPartitionToTransaction(key);
+            accumulator.append(key, timestamp, new byte[1], new byte[1],
+                Record.EMPTY_HEADERS, null, maxBlockTime, false, 
time.milliseconds());
+            transactionManager.sendOffsetsToTransaction(
+                Collections.singletonMap(key, new 
OffsetAndMetadata(committedOffsets)),
+                new ConsumerGroupMetadata("group"));
+
+            if (abortTxn.nextBoolean()) {
+                transactionManager.beginCommit();
+                committedOffsets += 1;
+            } else {
+                transactionManager.beginAbort();
+            }
+
+            resolvePendingRequests();
+        }
+
+        
assertTrue(transactionCoordinator.persistentPartitionData().containsKey(key));
+        assertTrue(transactionCoordinator.committedOffsets().containsKey(key));
+        assertEquals(committedOffsets - 1, (long) 
transactionCoordinator.committedOffsets().get(key));
+    }
+
+    private void resolvePendingRequests() {
+        Random dropMessageRandom = new Random();

Review comment:
       It's useful if the simulation test is deterministic. That way failures 
are easy to reproduce. Perhaps we can use a shared `Random` instance (between 
this class and the coordinator) with a defined seed.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and 
group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction 
turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable 
error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();

Review comment:
       Wondering if it would be more useful if we can control the faults more 
explicitly. For example, we could add a hook to make the coordinator 
temporarily unavailable and to restore it later.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and 
group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction 
turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable 
error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {

Review comment:
       nit: maybe `disconnect` is a better name given actual behavior.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and 
group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction 
turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable 
error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();
+
+        if (incomingRequests.peek() == null) {
+            return;
+        }
+
+        final AbstractResponse response;
+        AbstractRequest nextRequest = 
incomingRequests.peek().requestBuilder().build();
+        if (nextRequest instanceof FindCoordinatorRequest) {
+            response = handleFindCoordinator(faultInject);
+        } else if (nextRequest instanceof InitProducerIdRequest) {
+            response = handleInitProducerId((InitProducerIdRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof AddPartitionsToTxnRequest) {
+            response = handleAddPartitionToTxn((AddPartitionsToTxnRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof AddOffsetsToTxnRequest) {
+            response = handleAddOffsetsToTxn((AddOffsetsToTxnRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof TxnOffsetCommitRequest) {
+            response = handleTxnCommit((TxnOffsetCommitRequest) nextRequest, 
faultInject);
+        } else if (nextRequest instanceof ProduceRequest) {
+            response = handleProduce((ProduceRequest) nextRequest, 
faultInject);
+        } else if (nextRequest instanceof EndTxnRequest) {
+            response = handleEndTxn((EndTxnRequest) nextRequest, faultInject);
+        } else {
+            throw new IllegalArgumentException("Unknown request: " + 
nextRequest);
+        }
+
+        networkClient.respond(response, dropMessage);
+    }
+
+    private FindCoordinatorResponse handleFindCoordinator(final boolean 
faultInject) {
+        if (faultInject) {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+        } else {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setHost("localhost")
+                    .setNodeId(0)
+                    .setPort(2211)
+            );
+        }
+    }
+
+    private InitProducerIdResponse handleInitProducerId(InitProducerIdRequest 
request,
+                                                        final boolean 
faultInject) {
+        if (faultInject) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+        } else if (request.data.producerId() != NO_PRODUCER_ID &&
+                    request.data.producerId() != currentProducerId) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != NO_PRODUCER_EPOCH &&
+                    request.data.producerEpoch() != currentEpoch) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            currentProducerId += 1;
+            currentEpoch += 1;
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setProducerId(currentProducerId)
+                    .setProducerEpoch(currentEpoch)
+                    .setErrorCode(Errors.NONE.code())
+            );
+        }
+    }
+
+    private AddPartitionsToTxnResponse 
handleAddPartitionToTxn(AddPartitionsToTxnRequest request,
+                                                               final boolean 
faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.partitions().forEach(topicPartition -> {
+            if (faultInject) {
+                errors.put(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(topicPartition, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(topicPartition, Errors.INVALID_PRODUCER_EPOCH);
+            } else {
+                errors.put(topicPartition, Errors.NONE);
+            }
+        });
+
+        return new AddPartitionsToTxnResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AddOffsetsToTxnResponse 
handleAddOffsetsToTxn(AddOffsetsToTxnRequest request,
+                                                          final boolean 
faultInject) {
+        if (faultInject) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerId() != currentProducerId) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != currentEpoch) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            offsetsAddedToTxn = true;
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        }
+    }
+
+    private AbstractResponse handleTxnCommit(TxnOffsetCommitRequest request,
+                                             final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.data.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
+            TopicPartition key = new TopicPartition(topic.name(), 
partition.partitionIndex());
+            if (faultInject) {
+                errors.put(key, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(key, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(key, Errors.INVALID_PRODUCER_EPOCH);
+            } else if (offsetsAddedToTxn) {
+                pendingOffsets.put(key, partition.committedOffset());
+                errors.put(key, Errors.NONE);
+            } else {
+                errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+            }
+        }));
+
+        return new TxnOffsetCommitResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AbstractResponse handleProduce(ProduceRequest request,

Review comment:
       To make this really interesting, we would need to add some sequence 
number bookkeeping. Really its the sequence/epoch bookkeeping which makes the 
implementation so complex.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and 
group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction 
turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable 
error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();
+
+        if (incomingRequests.peek() == null) {
+            return;
+        }
+
+        final AbstractResponse response;
+        AbstractRequest nextRequest = 
incomingRequests.peek().requestBuilder().build();
+        if (nextRequest instanceof FindCoordinatorRequest) {
+            response = handleFindCoordinator(faultInject);
+        } else if (nextRequest instanceof InitProducerIdRequest) {
+            response = handleInitProducerId((InitProducerIdRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof AddPartitionsToTxnRequest) {
+            response = handleAddPartitionToTxn((AddPartitionsToTxnRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof AddOffsetsToTxnRequest) {
+            response = handleAddOffsetsToTxn((AddOffsetsToTxnRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof TxnOffsetCommitRequest) {
+            response = handleTxnCommit((TxnOffsetCommitRequest) nextRequest, 
faultInject);
+        } else if (nextRequest instanceof ProduceRequest) {
+            response = handleProduce((ProduceRequest) nextRequest, 
faultInject);
+        } else if (nextRequest instanceof EndTxnRequest) {
+            response = handleEndTxn((EndTxnRequest) nextRequest, faultInject);
+        } else {
+            throw new IllegalArgumentException("Unknown request: " + 
nextRequest);
+        }
+
+        networkClient.respond(response, dropMessage);
+    }
+
+    private FindCoordinatorResponse handleFindCoordinator(final boolean 
faultInject) {
+        if (faultInject) {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+        } else {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setHost("localhost")
+                    .setNodeId(0)
+                    .setPort(2211)
+            );
+        }
+    }
+
+    private InitProducerIdResponse handleInitProducerId(InitProducerIdRequest 
request,
+                                                        final boolean 
faultInject) {
+        if (faultInject) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+        } else if (request.data.producerId() != NO_PRODUCER_ID &&
+                    request.data.producerId() != currentProducerId) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != NO_PRODUCER_EPOCH &&
+                    request.data.producerEpoch() != currentEpoch) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            currentProducerId += 1;
+            currentEpoch += 1;
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setProducerId(currentProducerId)
+                    .setProducerEpoch(currentEpoch)
+                    .setErrorCode(Errors.NONE.code())
+            );
+        }
+    }
+
+    private AddPartitionsToTxnResponse 
handleAddPartitionToTxn(AddPartitionsToTxnRequest request,
+                                                               final boolean 
faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.partitions().forEach(topicPartition -> {
+            if (faultInject) {
+                errors.put(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(topicPartition, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(topicPartition, Errors.INVALID_PRODUCER_EPOCH);
+            } else {
+                errors.put(topicPartition, Errors.NONE);
+            }
+        });
+
+        return new AddPartitionsToTxnResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AddOffsetsToTxnResponse 
handleAddOffsetsToTxn(AddOffsetsToTxnRequest request,
+                                                          final boolean 
faultInject) {
+        if (faultInject) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerId() != currentProducerId) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != currentEpoch) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            offsetsAddedToTxn = true;
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        }
+    }
+
+    private AbstractResponse handleTxnCommit(TxnOffsetCommitRequest request,
+                                             final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.data.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
+            TopicPartition key = new TopicPartition(topic.name(), 
partition.partitionIndex());
+            if (faultInject) {
+                errors.put(key, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(key, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(key, Errors.INVALID_PRODUCER_EPOCH);
+            } else if (offsetsAddedToTxn) {
+                pendingOffsets.put(key, partition.committedOffset());
+                errors.put(key, Errors.NONE);
+            } else {
+                errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+            }
+        }));
+
+        return new TxnOffsetCommitResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AbstractResponse handleProduce(ProduceRequest request,
+                                           final boolean faultInject) {
+        Map<TopicPartition, PartitionResponse> errors = new HashMap<>();
+        Map<TopicPartition, MemoryRecords> partitionRecords = 
request.partitionRecordsOrFail();
+
+        partitionRecords.forEach((topicPartition, records) -> {
+            if (faultInject) {
+                // Trigger KIP-360 path.
+                errors.put(topicPartition, new 
PartitionResponse(Errors.UNKNOWN_PRODUCER_ID));
+            } else {
+                List<Record> sentRecords = 
pendingPartitionData.getOrDefault(topicPartition, new ArrayList<>());

Review comment:
       Similar to the offset commit path, it would be useful to validate here 
that each partition that was written to was first added to the transaction 
properly.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and 
group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction 
turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable 
error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();
+
+        if (incomingRequests.peek() == null) {
+            return;
+        }
+
+        final AbstractResponse response;
+        AbstractRequest nextRequest = 
incomingRequests.peek().requestBuilder().build();
+        if (nextRequest instanceof FindCoordinatorRequest) {
+            response = handleFindCoordinator(faultInject);
+        } else if (nextRequest instanceof InitProducerIdRequest) {
+            response = handleInitProducerId((InitProducerIdRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof AddPartitionsToTxnRequest) {
+            response = handleAddPartitionToTxn((AddPartitionsToTxnRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof AddOffsetsToTxnRequest) {
+            response = handleAddOffsetsToTxn((AddOffsetsToTxnRequest) 
nextRequest, faultInject);
+        } else if (nextRequest instanceof TxnOffsetCommitRequest) {
+            response = handleTxnCommit((TxnOffsetCommitRequest) nextRequest, 
faultInject);
+        } else if (nextRequest instanceof ProduceRequest) {
+            response = handleProduce((ProduceRequest) nextRequest, 
faultInject);
+        } else if (nextRequest instanceof EndTxnRequest) {
+            response = handleEndTxn((EndTxnRequest) nextRequest, faultInject);
+        } else {
+            throw new IllegalArgumentException("Unknown request: " + 
nextRequest);
+        }
+
+        networkClient.respond(response, dropMessage);
+    }
+
+    private FindCoordinatorResponse handleFindCoordinator(final boolean 
faultInject) {
+        if (faultInject) {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+        } else {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setHost("localhost")
+                    .setNodeId(0)
+                    .setPort(2211)
+            );
+        }
+    }
+
+    private InitProducerIdResponse handleInitProducerId(InitProducerIdRequest 
request,
+                                                        final boolean 
faultInject) {
+        if (faultInject) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+        } else if (request.data.producerId() != NO_PRODUCER_ID &&
+                    request.data.producerId() != currentProducerId) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != NO_PRODUCER_EPOCH &&
+                    request.data.producerEpoch() != currentEpoch) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            currentProducerId += 1;
+            currentEpoch += 1;
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setProducerId(currentProducerId)
+                    .setProducerEpoch(currentEpoch)
+                    .setErrorCode(Errors.NONE.code())
+            );
+        }
+    }
+
+    private AddPartitionsToTxnResponse 
handleAddPartitionToTxn(AddPartitionsToTxnRequest request,
+                                                               final boolean 
faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.partitions().forEach(topicPartition -> {
+            if (faultInject) {
+                errors.put(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(topicPartition, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(topicPartition, Errors.INVALID_PRODUCER_EPOCH);
+            } else {
+                errors.put(topicPartition, Errors.NONE);
+            }
+        });
+
+        return new AddPartitionsToTxnResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AddOffsetsToTxnResponse 
handleAddOffsetsToTxn(AddOffsetsToTxnRequest request,
+                                                          final boolean 
faultInject) {
+        if (faultInject) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerId() != currentProducerId) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != currentEpoch) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            offsetsAddedToTxn = true;
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        }
+    }
+
+    private AbstractResponse handleTxnCommit(TxnOffsetCommitRequest request,
+                                             final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.data.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
+            TopicPartition key = new TopicPartition(topic.name(), 
partition.partitionIndex());
+            if (faultInject) {
+                errors.put(key, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(key, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(key, Errors.INVALID_PRODUCER_EPOCH);
+            } else if (offsetsAddedToTxn) {
+                pendingOffsets.put(key, partition.committedOffset());
+                errors.put(key, Errors.NONE);
+            } else {
+                errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+            }
+        }));
+
+        return new TxnOffsetCommitResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AbstractResponse handleProduce(ProduceRequest request,
+                                           final boolean faultInject) {
+        Map<TopicPartition, PartitionResponse> errors = new HashMap<>();
+        Map<TopicPartition, MemoryRecords> partitionRecords = 
request.partitionRecordsOrFail();
+
+        partitionRecords.forEach((topicPartition, records) -> {
+            if (faultInject) {
+                // Trigger KIP-360 path.
+                errors.put(topicPartition, new 
PartitionResponse(Errors.UNKNOWN_PRODUCER_ID));
+            } else {
+                List<Record> sentRecords = 
pendingPartitionData.getOrDefault(topicPartition, new ArrayList<>());
+                for (Record partitionRecord  : records.records()) {
+                    sentRecords.add(partitionRecord);
+                }
+
+                pendingPartitionData.put(topicPartition, sentRecords);
+                errors.put(topicPartition, new PartitionResponse(Errors.NONE));
+            }
+        });
+
+        return new ProduceResponse(errors, throttleTimeMs);
+    }
+
+    private EndTxnResponse handleEndTxn(EndTxnRequest request, final boolean 
faultInject) {

Review comment:
       Another class of failure that we can simulate is when a request reaches 
the broker and gets handled, but the connection is lost before the response is 
sent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to