bq. record = new ProducerRecord<>("test", 0, (long)0, Long.toString(0));
What was the rationale of passing 0 as the third parameter in the second transaction ? Cheers On Wed, Dec 20, 2017 at 5:08 AM, HKT <dushukuang...@163.com> wrote: > Hi, I have runned the program over 10 hours! It doesn't stop. > It generate 800+MB log, but almost of them is > 2017-12-20 20:04:28 [kafka-producer-network-thread | producer-1] DEBUG > o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1, > transactionalId=hello] Sending transactional request > (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, > producerEpoch=0, partitions=[test-0]) to node 192.168.245.1:9092 (id: 0 > rack: null) > 2017-12-20 20:04:28 [kafka-producer-network-thread | producer-1] DEBUG > o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1, > transactionalId=hello] Enqueuing transactional request > (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, > producerEpoch=0, partitions=[test-0]) > > I had debuged into the client library and I found the code in > Sender.run(long now) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214): > > > 214 } else if (transactionManager.hasInFlightTransactionalRequest() || > maybeSendTransactionalRequest(now)) { > > 215 // as long as there are outstanding transactional requests, we > simply wait for them to return > > 216 client.poll(retryBackoffMs, now); > > 217 return; > > 218 } > > and I found that Sender.run(long now) will call > Sender.maybeSendTransactionalRequest() that will call > TransactionManager.nextRequestHandler(). > the TransactionManager has a pendingRequests. > In nextRequestHandler will poll a TransactionManager$AddPartitionsToTxnHandler > or a TransactionManager$FindCoordinatorHandler. > the behavior result the maybeSendTransactionalRequest returns true, here > is the stack: > at org.apache.kafka.clients.producer.internals.Sender.maybeSend > TransactionalRequest(Sender.java:360) > at org.apache.kafka.clients.producer.internals.Sender.run(Sende > r.java:214) > at org.apache.kafka.clients.producer.internals.Sender.run(Sende > r.java:163) > at java.lang.Thread.run(Thread.java:745) > > > so, It will call client.poll, and the client.poll will call > NetworkClient.completeResponses which will call enqueueRequest finally: > at org.apache.kafka.clients.producer.internals.TransactionManag > er.enqueueRequest(TransactionManager.java:799) > at org.apache.kafka.clients.producer.internals.TransactionManag > er.access$700(TransactionManager.java:64) > at org.apache.kafka.clients.producer.internals.TransactionManag > er$TxnRequestHandler.reenqueue(TransactionManager.java:880) > - locked <0x7d1> (a org.apache.kafka.clients.produ > cer.internals.TransactionManager) > at org.apache.kafka.clients.producer.internals.TransactionManag > er$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1024) > at org.apache.kafka.clients.producer.internals.TransactionManag > er$TxnRequestHandler.onComplete(TransactionManager.java:905) > at org.apache.kafka.clients.ClientResponse.onComplete(ClientRes > ponse.java:101) > at org.apache.kafka.clients.NetworkClient.completeResponses(Net > workClient.java:482) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) > at org.apache.kafka.clients.producer.internals.Sender.run(Sende > r.java:216) > enqueueRequest will add other a TransactionManager$AddPartitionsToTxnHandler > or a TransactionManager$FindCoordinatorHandler into the pendingRequests. > and cause the infinite loop. > > I think the problem is in the > at org.apache.kafka.clients.producer.internals.TransactionManag > er$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1024) > the code is > > 1022 } else if (error == Errors.CONCURRENT_TRANSACTIONS) { > > 1023 maybeOverrideRetryBackoffMs(); > > 1024 reenqueue(); > > 1025 return; > > 1026 } > > the error is Errors.CONCURRENT_TRANSACTIONS, but I don't have tow > transactions. > > I think there are somewhere I didn't config right. and here is my complete > code: > > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.Producer; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.clients.producer.internals.RecordAccumulator; > import org.apache.kafka.common.serialization.LongSerializer; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.kafka.streams.kstream.KStream; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import java.util.HashMap; > import java.util.Properties; > import java.util.UUID; > > public class ApacheKafkaHello > { > public static final String Home; > > static > { > final String homeEnv = "APACHE_KAFKA_HELLO_HOME"; > String home = System.getenv(homeEnv); > if (home == null) { > System.err.printf("HOME ENVIRONMENT IS UNDEFINED: name=%s\n", > homeEnv); > System.exit(-1); > } > Home = home; > } > > private static final Logger logger = LoggerFactory.getLogger(Apache > KafkaHello.class); > > public static void main(final String[] args) > { > logger.info("apache kafka hello starts: Home={}", Home); > for (int i = 0;i != args.length; ++i) { > logger.info(" args[{}]={}", i, args[i]); > } > > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > kafkaProps.setProperty("key.serializer", > LongSerializer.class.getName()); > kafkaProps.setProperty("value.serializer", > StringSerializer.class.getName()); > kafkaProps.setProperty("transactional.id", "hello"); > try (KafkaProducer<Long, String> producer = new > KafkaProducer<>(kafkaProps)) { > producer.initTransactions(); > producer.beginTransaction(); > ProducerRecord<Long, String> record = new > ProducerRecord<>("test", 0, (long) 0, Long.toString(0)); > producer.send(record); > producer.commitTransaction(); > producer.beginTransaction(); > record = new ProducerRecord<>("test", 0, (long)0, > Long.toString(0)); > producer.send(record); > producer.commitTransaction(); > } > } > } > 在 2017/12/19 20:54, HKT 写道: > >> Here is the server side DEBUG log. >> >> JDK Version is >> > java -version >> java version "1.8.0_121" >> Java(TM) SE Runtime Environment (build 1.8.0_121-b13) >> Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode) >> >> Thanks >> >> 在 2017/12/19 11:13, Ted Yu 写道: >> >>> For the server log, is it possible to enable DEBUG logging ? >>> >>> Thanks >>> >>> On Mon, Dec 18, 2017 at 4:35 PM, HKT <dushukuang...@163.com> wrote: >>> >>> Thanks for reply. >>>> >>>> here is the client side log: >>>> >>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager - >>>> [Producer clientId=producer-1, transactionalId=hello] Transition from >>>> state >>>> READY to IN_TRANSACTION >>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager - >>>> [Producer clientId=producer-1, transactionalId=hello] Begin adding new >>>> partition test-0 to transaction >>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager - >>>> [Producer clientId=producer-1, transactionalId=hello] Transition from >>>> state >>>> IN_TRANSACTION to COMMITTING_TRANSACTION >>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager - >>>> [Producer clientId=producer-1, transactionalId=hello] Enqueuing >>>> transactional request (type=AddPartitionsToTxnRequest, >>>> transactionalId=hello, producerId=0, producerEpoch=0, >>>> partitions=[test-0]) >>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager - >>>> [Producer clientId=producer-1, transactionalId=hello] Enqueuing >>>> transactional request (type=EndTxnRequest, transactionalId=hello, >>>> producerId=0, producerEpoch=0, result=COMMIT) >>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG >>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1, >>>> transactionalId=hello] Sending transactional request >>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, >>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: >>>> null) >>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG >>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1, >>>> transactionalId=hello] Enqueuing transactional request >>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, >>>> producerEpoch=0, partitions=[test-0]) >>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG >>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1, >>>> transactionalId=hello] Sending transactional request >>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, >>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: >>>> null) >>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG >>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1, >>>> transactionalId=hello] Enqueuing transactional request >>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, >>>> producerEpoch=0, partitions=[test-0]) >>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG >>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1, >>>> transactionalId=hello] Sending transactional request >>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, >>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: >>>> null) >>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG >>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1, >>>> transactionalId=hello] Enqueuing transactional request >>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, >>>> producerEpoch=0, partitions=[test-0]) >>>> ... // duplicate messages >>>> 2017-12-19 08:26:14 [kafka-producer-network-thread | producer-1] DEBUG >>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1, >>>> transactionalId=hello] Enqueuing transactional request >>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, >>>> producerEpoch=0, partitions=[test-0]) >>>> 2017-12-19 08:26:14 [kafka-producer-network-thread | producer-1] DEBUG >>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1, >>>> transactionalId=hello] Sending transactional request >>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, >>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: >>>> null) >>>> >>>> and the server.log: >>>> [2017-12-19 08:26:08,408] INFO Completed load of log >>>> __transaction_state-9 >>>> with 1 log segments, log start offset 0 and log end offset 0 in 15 ms >>>> (kafka.log.Log) >>>> [2017-12-19 08:26:08,408] INFO Created log for partition >>>> [__transaction_state,9] in D:\tmp\kafka-logs with properties >>>> {compression.type -> uncompressed, message.format.version -> 1.0-IV0, >>>> file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, >>>> min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, >>>> min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, >>>> min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, >>>> unclean.leader.election.enable -> false, retention.bytes -> -1, >>>> delete.retention.ms -> 86400000, cleanup.policy -> compact, flush.ms -> >>>> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> >>>> 104857600, >>>> retention.ms -> 604800000, message.timestamp.difference.max.ms -> >>>> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> >>>> 9223372036854775807}. (kafka.log.LogManager) >>>> [2017-12-19 08:26:08,408] INFO [Partition __transaction_state-9 >>>> broker=0] >>>> No checkpointed highwatermark is found for partition >>>> __transaction_state-9 >>>> (kafka.cluster.Partition) >>>> [2017-12-19 08:26:08,408] INFO Replica loaded for partition >>>> __transaction_state-9 with initial high watermark 0 >>>> (kafka.cluster.Replica) >>>> [2017-12-19 08:26:08,408] INFO [Partition __transaction_state-9 >>>> broker=0] >>>> __transaction_state-9 starts at Leader Epoch 0 from offset 0. Previous >>>> Leader Epoch was: -1 (kafka.cluster.Partition) >>>> [2017-12-19 08:26:08,408] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-1 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-4 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-7 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-10 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-13 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-16 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-19 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-22 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-25 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-28 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-31 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-34 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-37 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-2 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-5 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-8 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-11 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-14 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-17 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-20 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-23 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-26 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-29 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-32 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-35 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-38 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-41 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-44 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-47 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-21 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-24 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-27 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-30 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-33 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-36 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-39 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-42 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-45 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-48 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-40 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-43 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-46 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-49 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-0 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-3 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-6 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-9 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-12 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,471] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-15 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,471] INFO [Transaction State Manager 0]: Loading >>>> transaction metadata from __transaction_state-18 >>>> (kafka.coordinator.transaction.TransactionStateManager) >>>> [2017-12-19 08:26:08,627] INFO Updated PartitionLeaderEpoch. New: >>>> {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: >>>> __transaction_state-22. Cache now contains 0 entries. >>>> (kafka.server.epoch.LeaderEpochFileCache) >>>> [2017-12-19 08:26:08,658] INFO [TransactionCoordinator id=0] Initialized >>>> transactionalId hello with producerId 0 and producer epoch 0 on >>>> partition >>>> __transaction_state-22 (kafka.coordinator.transaction >>>> .TransactionCoordinator) >>>> [2017-12-19 08:26:08,705] INFO Updated PartitionLeaderEpoch. New: >>>> {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: >>>> test-0. >>>> Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache) >>>> >>>> >>>> 在 2017/12/19 0:42, Ted Yu 写道: >>>> >>>> Can you capture stack trace on the broker and pastebin it ? >>>>> >>>>> Broker log may also provide some clue. >>>>> >>>>> Thanks >>>>> >>>>> On Mon, Dec 18, 2017 at 4:46 AM, HKT <dushukuang...@163.com> wrote: >>>>> >>>>> Hello, >>>>> >>>>>> I was testing the transactional message on kafka. >>>>>> but I get a problem. >>>>>> the producer always blocking at second commitTransaction. >>>>>> Here is my code: >>>>>> >>>>>> Properties kafkaProps = new Properties(); >>>>>> kafkaProps.setProperty("bootstrap.servers", >>>>>> "localhost:9092"); >>>>>> kafkaProps.setProperty("key.serializer", >>>>>> LongSerializer.class.getName()); >>>>>> kafkaProps.setProperty("value.serializer", >>>>>> StringSerializer.class.getName()); >>>>>> kafkaProps.setProperty("transactional.id", "hello"); >>>>>> try (KafkaProducer<Long, String> producer = new >>>>>> KafkaProducer<>(kafkaProps)) { >>>>>> producer.initTransactions(); >>>>>> producer.beginTransaction(); >>>>>> ProducerRecord<Long, String> record = new >>>>>> ProducerRecord<>("test", 0, (long) 0, Long.toString(0)); >>>>>> producer.send(record); >>>>>> producer.sendOffsetsToTransaction(new HashMap<>(), ""); >>>>>> producer.commitTransaction(); >>>>>> producer.beginTransaction(); >>>>>> record = new ProducerRecord<>("test", 0, (long)0, >>>>>> Long.toString(0)); >>>>>> producer.send(record); >>>>>> producer.commitTransaction(); // blocking here >>>>>> } >>>>>> >>>>>> Enviroment: >>>>>> Kafka broker: 1.0.0 >>>>>> broker count: 1 >>>>>> Kafka Client: 1.0.0 >>>>>> and I use the default server.properties in config/ >>>>>> >>>>>> broker.id=0 >>>>>> num.network.threads=3 >>>>>> num.io.threads=8 >>>>>> socket.send.buffer.bytes=102400 >>>>>> socket.receive.buffer.bytes=102400 >>>>>> socket.request.max.bytes=104857600 >>>>>> log.dirs=/tmp/kafka-logs >>>>>> num.partitions=1 >>>>>> num.recovery.threads.per.data.dir=1 >>>>>> offsets.topic.replication.factor=1 >>>>>> transaction.state.log.replication.factor=1 >>>>>> transaction.state.log.min.isr=1 >>>>>> log.retention.hours=168 >>>>>> log.segment.bytes=1073741824 >>>>>> log.retention.check.interval.ms=300000 >>>>>> zookeeper.connect=localhost:2181 >>>>>> zookeeper.connection.timeout.ms=6000 >>>>>> group.initial.rebalance.delay.ms=0 >>>>>> >>>>>> I have run the program in Windows 7 and CentOS 6.9. >>>>>> but it blocking in the second commitTransaction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >> > >