bq.             record = new ProducerRecord<>("test", 0, (long)0,
Long.toString(0));

What was the rationale of passing 0 as the third parameter in the second
transaction ?

Cheers

On Wed, Dec 20, 2017 at 5:08 AM, HKT <dushukuang...@163.com> wrote:

> Hi, I have runned the program over 10 hours! It doesn't stop.
> It generate 800+MB log, but almost of them is
> 2017-12-20 20:04:28 [kafka-producer-network-thread | producer-1] DEBUG
> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
> transactionalId=hello] Sending transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
> producerEpoch=0, partitions=[test-0]) to node 192.168.245.1:9092 (id: 0
> rack: null)
> 2017-12-20 20:04:28 [kafka-producer-network-thread | producer-1] DEBUG
> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
> transactionalId=hello] Enqueuing transactional request
> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
> producerEpoch=0, partitions=[test-0])
>
> I had debuged into the client library and I found the code in
> Sender.run(long now) at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214):
>
> > 214 } else if (transactionManager.hasInFlightTransactionalRequest() ||
> maybeSendTransactionalRequest(now)) {
> > 215     // as long as there are outstanding transactional requests, we
> simply wait for them to return
> > 216     client.poll(retryBackoffMs, now);
> > 217     return;
> > 218 }
>
> and I found that Sender.run(long now) will call
> Sender.maybeSendTransactionalRequest() that will call
> TransactionManager.nextRequestHandler().
> the TransactionManager has a pendingRequests.
> In nextRequestHandler will poll a TransactionManager$AddPartitionsToTxnHandler
> or a TransactionManager$FindCoordinatorHandler.
> the behavior result the maybeSendTransactionalRequest returns true, here
> is the stack:
>   at org.apache.kafka.clients.producer.internals.Sender.maybeSend
> TransactionalRequest(Sender.java:360)
>   at org.apache.kafka.clients.producer.internals.Sender.run(Sende
> r.java:214)
>   at org.apache.kafka.clients.producer.internals.Sender.run(Sende
> r.java:163)
>   at java.lang.Thread.run(Thread.java:745)
>
>
> so, It will call client.poll, and the client.poll will call
> NetworkClient.completeResponses which will call enqueueRequest finally:
>   at org.apache.kafka.clients.producer.internals.TransactionManag
> er.enqueueRequest(TransactionManager.java:799)
>   at org.apache.kafka.clients.producer.internals.TransactionManag
> er.access$700(TransactionManager.java:64)
>   at org.apache.kafka.clients.producer.internals.TransactionManag
> er$TxnRequestHandler.reenqueue(TransactionManager.java:880)
>   - locked <0x7d1> (a org.apache.kafka.clients.produ
> cer.internals.TransactionManager)
>   at org.apache.kafka.clients.producer.internals.TransactionManag
> er$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1024)
>   at org.apache.kafka.clients.producer.internals.TransactionManag
> er$TxnRequestHandler.onComplete(TransactionManager.java:905)
>   at org.apache.kafka.clients.ClientResponse.onComplete(ClientRes
> ponse.java:101)
>   at org.apache.kafka.clients.NetworkClient.completeResponses(Net
> workClient.java:482)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
>   at org.apache.kafka.clients.producer.internals.Sender.run(Sende
> r.java:216)
> enqueueRequest will add other a TransactionManager$AddPartitionsToTxnHandler
> or a TransactionManager$FindCoordinatorHandler into the pendingRequests.
> and cause the infinite loop.
>
> I think the problem is in the
>   at org.apache.kafka.clients.producer.internals.TransactionManag
> er$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1024)
> the code is
> > 1022 } else if (error == Errors.CONCURRENT_TRANSACTIONS) {
> > 1023     maybeOverrideRetryBackoffMs();
> > 1024     reenqueue();
> > 1025     return;
> > 1026 }
>
> the error is Errors.CONCURRENT_TRANSACTIONS, but I don't have tow
> transactions.
>
> I think there are somewhere I didn't config right. and here is my complete
> code:
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.internals.RecordAccumulator;
> import org.apache.kafka.common.serialization.LongSerializer;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.kstream.KStream;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.util.HashMap;
> import java.util.Properties;
> import java.util.UUID;
>
> public class ApacheKafkaHello
> {
>     public static final String Home;
>
>     static
>     {
>         final String homeEnv = "APACHE_KAFKA_HELLO_HOME";
>         String home = System.getenv(homeEnv);
>         if (home == null) {
>             System.err.printf("HOME ENVIRONMENT IS UNDEFINED: name=%s\n",
> homeEnv);
>             System.exit(-1);
>         }
>         Home = home;
>     }
>
>     private static final Logger logger = LoggerFactory.getLogger(Apache
> KafkaHello.class);
>
>     public static void main(final String[] args)
>     {
>         logger.info("apache kafka hello starts: Home={}", Home);
>         for (int i = 0;i != args.length; ++i) {
>             logger.info("    args[{}]={}", i, args[i]);
>         }
>
>         Properties kafkaProps = new Properties();
>         kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>         kafkaProps.setProperty("key.serializer",
> LongSerializer.class.getName());
>         kafkaProps.setProperty("value.serializer",
> StringSerializer.class.getName());
>         kafkaProps.setProperty("transactional.id", "hello");
>         try (KafkaProducer<Long, String> producer = new
> KafkaProducer<>(kafkaProps)) {
>             producer.initTransactions();
>             producer.beginTransaction();
>             ProducerRecord<Long, String> record = new
> ProducerRecord<>("test", 0, (long) 0, Long.toString(0));
>             producer.send(record);
>             producer.commitTransaction();
>             producer.beginTransaction();
>             record = new ProducerRecord<>("test", 0, (long)0,
> Long.toString(0));
>             producer.send(record);
>             producer.commitTransaction();
>         }
>     }
> }
> 在 2017/12/19 20:54, HKT 写道:
>
>> Here is the server side DEBUG log.
>>
>> JDK Version is
>> > java -version
>> java version "1.8.0_121"
>> Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
>> Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
>>
>> Thanks
>>
>> 在 2017/12/19 11:13, Ted Yu 写道:
>>
>>> For the server log, is it possible to enable DEBUG logging ?
>>>
>>> Thanks
>>>
>>> On Mon, Dec 18, 2017 at 4:35 PM, HKT <dushukuang...@163.com> wrote:
>>>
>>> Thanks for reply.
>>>>
>>>> here is the client side log:
>>>>
>>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
>>>> [Producer clientId=producer-1, transactionalId=hello] Transition from
>>>> state
>>>> READY to IN_TRANSACTION
>>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
>>>> [Producer clientId=producer-1, transactionalId=hello] Begin adding new
>>>> partition test-0 to transaction
>>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
>>>> [Producer clientId=producer-1, transactionalId=hello] Transition from
>>>> state
>>>> IN_TRANSACTION to COMMITTING_TRANSACTION
>>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
>>>> [Producer clientId=producer-1, transactionalId=hello] Enqueuing
>>>> transactional request (type=AddPartitionsToTxnRequest,
>>>> transactionalId=hello, producerId=0, producerEpoch=0,
>>>> partitions=[test-0])
>>>> 2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
>>>> [Producer clientId=producer-1, transactionalId=hello] Enqueuing
>>>> transactional request (type=EndTxnRequest, transactionalId=hello,
>>>> producerId=0, producerEpoch=0, result=COMMIT)
>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
>>>> transactionalId=hello] Sending transactional request
>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack:
>>>> null)
>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
>>>> transactionalId=hello] Enqueuing transactional request
>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>> producerEpoch=0, partitions=[test-0])
>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
>>>> transactionalId=hello] Sending transactional request
>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack:
>>>> null)
>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
>>>> transactionalId=hello] Enqueuing transactional request
>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>> producerEpoch=0, partitions=[test-0])
>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
>>>> transactionalId=hello] Sending transactional request
>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack:
>>>> null)
>>>> 2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
>>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
>>>> transactionalId=hello] Enqueuing transactional request
>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>> producerEpoch=0, partitions=[test-0])
>>>> ... // duplicate messages
>>>> 2017-12-19 08:26:14 [kafka-producer-network-thread | producer-1] DEBUG
>>>> o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
>>>> transactionalId=hello] Enqueuing transactional request
>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>> producerEpoch=0, partitions=[test-0])
>>>> 2017-12-19 08:26:14 [kafka-producer-network-thread | producer-1] DEBUG
>>>> o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
>>>> transactionalId=hello] Sending transactional request
>>>> (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
>>>> producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack:
>>>> null)
>>>>
>>>> and the server.log:
>>>> [2017-12-19 08:26:08,408] INFO Completed load of log
>>>> __transaction_state-9
>>>> with 1 log segments, log start offset 0 and log end offset 0 in 15 ms
>>>> (kafka.log.Log)
>>>> [2017-12-19 08:26:08,408] INFO Created log for partition
>>>> [__transaction_state,9] in D:\tmp\kafka-logs with properties
>>>> {compression.type -> uncompressed, message.format.version -> 1.0-IV0,
>>>> file.delete.delay.ms -> 60000, max.message.bytes -> 1000012,
>>>> min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime,
>>>> min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false,
>>>> min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
>>>> unclean.leader.election.enable -> false, retention.bytes -> -1,
>>>> delete.retention.ms -> 86400000, cleanup.policy -> compact, flush.ms ->
>>>> 9223372036854775807, segment.ms -> 604800000, segment.bytes ->
>>>> 104857600,
>>>> retention.ms -> 604800000, message.timestamp.difference.max.ms ->
>>>> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->
>>>> 9223372036854775807}. (kafka.log.LogManager)
>>>> [2017-12-19 08:26:08,408] INFO [Partition __transaction_state-9
>>>> broker=0]
>>>> No checkpointed highwatermark is found for partition
>>>> __transaction_state-9
>>>> (kafka.cluster.Partition)
>>>> [2017-12-19 08:26:08,408] INFO Replica loaded for partition
>>>> __transaction_state-9 with initial high watermark 0
>>>> (kafka.cluster.Replica)
>>>> [2017-12-19 08:26:08,408] INFO [Partition __transaction_state-9
>>>> broker=0]
>>>> __transaction_state-9 starts at Leader Epoch 0 from offset 0. Previous
>>>> Leader Epoch was: -1 (kafka.cluster.Partition)
>>>> [2017-12-19 08:26:08,408] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-1
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-4
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-7
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-10
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-13
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-16
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-19
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-22
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-25
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-28
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-31
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-34
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-37
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-2
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-5
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-8
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-11
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-14
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-17
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-20
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-23
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-26
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-29
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-32
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-35
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-38
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-41
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-44
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-47
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-21
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-24
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-27
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-30
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-33
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-36
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-39
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-42
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-45
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-48
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-40
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-43
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-46
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-49
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-0
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-3
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-6
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-9
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-12
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,471] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-15
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,471] INFO [Transaction State Manager 0]: Loading
>>>> transaction metadata from __transaction_state-18
>>>> (kafka.coordinator.transaction.TransactionStateManager)
>>>> [2017-12-19 08:26:08,627] INFO Updated PartitionLeaderEpoch. New:
>>>> {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition:
>>>> __transaction_state-22. Cache now contains 0 entries.
>>>> (kafka.server.epoch.LeaderEpochFileCache)
>>>> [2017-12-19 08:26:08,658] INFO [TransactionCoordinator id=0] Initialized
>>>> transactionalId hello with producerId 0 and producer epoch 0 on
>>>> partition
>>>> __transaction_state-22 (kafka.coordinator.transaction
>>>> .TransactionCoordinator)
>>>> [2017-12-19 08:26:08,705] INFO Updated PartitionLeaderEpoch. New:
>>>> {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition:
>>>> test-0.
>>>> Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
>>>>
>>>>
>>>> 在 2017/12/19 0:42, Ted Yu 写道:
>>>>
>>>> Can you capture stack trace on the broker and pastebin it ?
>>>>>
>>>>> Broker log may also provide some clue.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mon, Dec 18, 2017 at 4:46 AM, HKT <dushukuang...@163.com> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>>> I was testing the transactional message on kafka.
>>>>>> but I get a problem.
>>>>>> the producer always blocking at second commitTransaction.
>>>>>> Here is my code:
>>>>>>
>>>>>>           Properties kafkaProps = new Properties();
>>>>>>           kafkaProps.setProperty("bootstrap.servers",
>>>>>> "localhost:9092");
>>>>>>           kafkaProps.setProperty("key.serializer",
>>>>>> LongSerializer.class.getName());
>>>>>>           kafkaProps.setProperty("value.serializer",
>>>>>> StringSerializer.class.getName());
>>>>>>           kafkaProps.setProperty("transactional.id", "hello");
>>>>>>           try (KafkaProducer<Long, String> producer = new
>>>>>> KafkaProducer<>(kafkaProps)) {
>>>>>>               producer.initTransactions();
>>>>>>               producer.beginTransaction();
>>>>>>               ProducerRecord<Long, String> record = new
>>>>>> ProducerRecord<>("test", 0, (long) 0, Long.toString(0));
>>>>>>               producer.send(record);
>>>>>>               producer.sendOffsetsToTransaction(new HashMap<>(), "");
>>>>>>               producer.commitTransaction();
>>>>>>               producer.beginTransaction();
>>>>>>               record = new ProducerRecord<>("test", 0, (long)0,
>>>>>> Long.toString(0));
>>>>>>               producer.send(record);
>>>>>>               producer.commitTransaction(); // blocking here
>>>>>>           }
>>>>>>
>>>>>> Enviroment:
>>>>>> Kafka broker: 1.0.0
>>>>>> broker count: 1
>>>>>> Kafka Client: 1.0.0
>>>>>> and I use the default server.properties in config/
>>>>>>
>>>>>> broker.id=0
>>>>>> num.network.threads=3
>>>>>> num.io.threads=8
>>>>>> socket.send.buffer.bytes=102400
>>>>>> socket.receive.buffer.bytes=102400
>>>>>> socket.request.max.bytes=104857600
>>>>>> log.dirs=/tmp/kafka-logs
>>>>>> num.partitions=1
>>>>>> num.recovery.threads.per.data.dir=1
>>>>>> offsets.topic.replication.factor=1
>>>>>> transaction.state.log.replication.factor=1
>>>>>> transaction.state.log.min.isr=1
>>>>>> log.retention.hours=168
>>>>>> log.segment.bytes=1073741824
>>>>>> log.retention.check.interval.ms=300000
>>>>>> zookeeper.connect=localhost:2181
>>>>>> zookeeper.connection.timeout.ms=6000
>>>>>> group.initial.rebalance.delay.ms=0
>>>>>>
>>>>>> I have run the program in Windows 7 and CentOS 6.9.
>>>>>> but it blocking in the second commitTransaction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>
>
>

Reply via email to