Hi, I have runned the program over 10 hours! It doesn't stop.
It generate 800+MB log, but almost of them is
2017-12-20 20:04:28 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1, transactionalId=hello] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, producerEpoch=0, partitions=[test-0]) to node 192.168.245.1:9092 (id: 0 rack: null) 2017-12-20 20:04:28 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1, transactionalId=hello] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0, producerEpoch=0, partitions=[test-0])

I had debuged into the client library and I found the code in Sender.run(long now) at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214):

> 214 } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { > 215 // as long as there are outstanding transactional requests, we simply wait for them to return
> 216     client.poll(retryBackoffMs, now);
> 217     return;
> 218 }

and I found that Sender.run(long now) will call Sender.maybeSendTransactionalRequest() that will call TransactionManager.nextRequestHandler().
the TransactionManager has a pendingRequests.
In nextRequestHandler will poll a TransactionManager$AddPartitionsToTxnHandler or a TransactionManager$FindCoordinatorHandler. the behavior result the maybeSendTransactionalRequest returns true, here is the stack: at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:360) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
  at java.lang.Thread.run(Thread.java:745)


so, It will call client.poll, and the client.poll will call NetworkClient.completeResponses which will call enqueueRequest finally: at org.apache.kafka.clients.producer.internals.TransactionManager.enqueueRequest(TransactionManager.java:799) at org.apache.kafka.clients.producer.internals.TransactionManager.access$700(TransactionManager.java:64) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.reenqueue(TransactionManager.java:880) - locked <0x7d1> (a org.apache.kafka.clients.producer.internals.TransactionManager) at org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1024) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) enqueueRequest will add other a TransactionManager$AddPartitionsToTxnHandler or a TransactionManager$FindCoordinatorHandler into the pendingRequests.
and cause the infinite loop.

I think the problem is in the
at org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1024)
the code is
> 1022 } else if (error == Errors.CONCURRENT_TRANSACTIONS) {
> 1023     maybeOverrideRetryBackoffMs();
> 1024     reenqueue();
> 1025     return;
> 1026 }

the error is Errors.CONCURRENT_TRANSACTIONS, but I don't have tow transactions.

I think there are somewhere I didn't config right. and here is my complete code:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;

public class ApacheKafkaHello
{
    public static final String Home;

    static
    {
        final String homeEnv = "APACHE_KAFKA_HELLO_HOME";
        String home = System.getenv(homeEnv);
        if (home == null) {
System.err.printf("HOME ENVIRONMENT IS UNDEFINED: name=%s\n", homeEnv);
            System.exit(-1);
        }
        Home = home;
    }

private static final Logger logger = LoggerFactory.getLogger(ApacheKafkaHello.class);

    public static void main(final String[] args)
    {
        logger.info("apache kafka hello starts: Home={}", Home);
        for (int i = 0;i != args.length; ++i) {
            logger.info("    args[{}]={}", i, args[i]);
        }

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("key.serializer", LongSerializer.class.getName()); kafkaProps.setProperty("value.serializer", StringSerializer.class.getName());
        kafkaProps.setProperty("transactional.id", "hello");
try (KafkaProducer<Long, String> producer = new KafkaProducer<>(kafkaProps)) {
            producer.initTransactions();
            producer.beginTransaction();
ProducerRecord<Long, String> record = new ProducerRecord<>("test", 0, (long) 0, Long.toString(0));
            producer.send(record);
            producer.commitTransaction();
            producer.beginTransaction();
record = new ProducerRecord<>("test", 0, (long)0, Long.toString(0));
            producer.send(record);
            producer.commitTransaction();
        }
    }
}
?? 2017/12/19 20:54, HKT ????:
Here is the server side DEBUG log.

JDK Version is
> java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

Thanks

?? 2017/12/19 11:13, Ted Yu ????:
For the server log, is it possible to enable DEBUG logging ?

Thanks

On Mon, Dec 18, 2017 at 4:35 PM, HKT <dushukuang...@163.com> wrote:

Thanks for reply.

here is the client side log:

2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
[Producer clientId=producer-1, transactionalId=hello] Transition from state
READY to IN_TRANSACTION
2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
[Producer clientId=producer-1, transactionalId=hello] Begin adding new
partition test-0 to transaction
2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
[Producer clientId=producer-1, transactionalId=hello] Transition from state
IN_TRANSACTION to COMMITTING_TRANSACTION
2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
[Producer clientId=producer-1, transactionalId=hello] Enqueuing
transactional request (type=AddPartitionsToTxnRequest,
transactionalId=hello, producerId=0, producerEpoch=0, partitions=[test-0])
2017-12-19 08:26:08 [main] DEBUG o.a.k.c.p.i.TransactionManager -
[Producer clientId=producer-1, transactionalId=hello] Enqueuing
transactional request (type=EndTxnRequest, transactionalId=hello,
producerId=0, producerEpoch=0, result=COMMIT)
2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
transactionalId=hello] Sending transactional request
(type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: null)
2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
transactionalId=hello] Enqueuing transactional request
(type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
producerEpoch=0, partitions=[test-0])
2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
transactionalId=hello] Sending transactional request
(type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: null)
2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
transactionalId=hello] Enqueuing transactional request
(type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
producerEpoch=0, partitions=[test-0])
2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
transactionalId=hello] Sending transactional request
(type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: null)
2017-12-19 08:26:08 [kafka-producer-network-thread | producer-1] DEBUG
o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
transactionalId=hello] Enqueuing transactional request
(type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
producerEpoch=0, partitions=[test-0])
... // duplicate messages
2017-12-19 08:26:14 [kafka-producer-network-thread | producer-1] DEBUG
o.a.k.c.p.i.TransactionManager - [Producer clientId=producer-1,
transactionalId=hello] Enqueuing transactional request
(type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
producerEpoch=0, partitions=[test-0])
2017-12-19 08:26:14 [kafka-producer-network-thread | producer-1] DEBUG
o.a.k.c.producer.internals.Sender - [Producer clientId=producer-1,
transactionalId=hello] Sending transactional request
(type=AddPartitionsToTxnRequest, transactionalId=hello, producerId=0,
producerEpoch=0, partitions=[test-0]) to node HKT-PC:9092 (id: 0 rack: null)

and the server.log:
[2017-12-19 08:26:08,408] INFO Completed load of log __transaction_state-9
with 1 log segments, log start offset 0 and log end offset 0 in 15 ms
(kafka.log.Log)
[2017-12-19 08:26:08,408] INFO Created log for partition
[__transaction_state,9] in D:\tmp\kafka-logs with properties
{compression.type -> uncompressed, message.format.version -> 1.0-IV0,
file.delete.delay.ms -> 60000, max.message.bytes -> 1000012,
min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime,
min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false,
min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
unclean.leader.election.enable -> false, retention.bytes -> -1,
delete.retention.ms -> 86400000, cleanup.policy -> compact, flush.ms ->
9223372036854775807, segment.ms -> 604800000, segment.bytes -> 104857600,
retention.ms -> 604800000, message.timestamp.difference.max.ms ->
9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->
9223372036854775807}. (kafka.log.LogManager)
[2017-12-19 08:26:08,408] INFO [Partition __transaction_state-9 broker=0] No checkpointed highwatermark is found for partition __transaction_state-9
(kafka.cluster.Partition)
[2017-12-19 08:26:08,408] INFO Replica loaded for partition
__transaction_state-9 with initial high watermark 0 (kafka.cluster.Replica) [2017-12-19 08:26:08,408] INFO [Partition __transaction_state-9 broker=0]
__transaction_state-9 starts at Leader Epoch 0 from offset 0. Previous
Leader Epoch was: -1 (kafka.cluster.Partition)
[2017-12-19 08:26:08,408] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-1
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-4
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-7
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-10
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-13
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-16
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-19
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-22
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-25
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-28
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-31
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-34
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,424] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-37
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-2
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-5
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-8
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-11
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-14
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-17
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-20
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-23
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-26
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-29
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-32
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-35
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-38
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-41
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-44
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-47
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-21
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-24
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-27
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-30
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,440] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-33
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-36
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-39
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-42
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-45
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-48
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-40
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-43
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-46
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-49
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-0
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-3
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-6
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-9
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,455] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-12
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,471] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-15
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,471] INFO [Transaction State Manager 0]: Loading
transaction metadata from __transaction_state-18
(kafka.coordinator.transaction.TransactionStateManager)
[2017-12-19 08:26:08,627] INFO Updated PartitionLeaderEpoch. New:
{epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition:
__transaction_state-22. Cache now contains 0 entries.
(kafka.server.epoch.LeaderEpochFileCache)
[2017-12-19 08:26:08,658] INFO [TransactionCoordinator id=0] Initialized transactionalId hello with producerId 0 and producer epoch 0 on partition
__transaction_state-22 (kafka.coordinator.transaction
.TransactionCoordinator)
[2017-12-19 08:26:08,705] INFO Updated PartitionLeaderEpoch. New:
{epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: test-0.
Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)


?? 2017/12/19 0:42, Ted Yu ????:

Can you capture stack trace on the broker and pastebin it ?

Broker log may also provide some clue.

Thanks

On Mon, Dec 18, 2017 at 4:46 AM, HKT <dushukuang...@163.com> wrote:

Hello,
I was testing the transactional message on kafka.
but I get a problem.
the producer always blocking at second commitTransaction.
Here is my code:

          Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
          kafkaProps.setProperty("key.serializer",
LongSerializer.class.getName());
          kafkaProps.setProperty("value.serializer",
StringSerializer.class.getName());
          kafkaProps.setProperty("transactional.id", "hello");
          try (KafkaProducer<Long, String> producer = new
KafkaProducer<>(kafkaProps)) {
              producer.initTransactions();
              producer.beginTransaction();
              ProducerRecord<Long, String> record = new
ProducerRecord<>("test", 0, (long) 0, Long.toString(0));
              producer.send(record);
              producer.sendOffsetsToTransaction(new HashMap<>(), "");
              producer.commitTransaction();
              producer.beginTransaction();
              record = new ProducerRecord<>("test", 0, (long)0,
Long.toString(0));
              producer.send(record);
              producer.commitTransaction(); // blocking here
          }

Enviroment:
Kafka broker: 1.0.0
broker count: 1
Kafka Client: 1.0.0
and I use the default server.properties in config/

broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

I have run the program in Windows 7 and CentOS 6.9.
but it blocking in the second commitTransaction.







Reply via email to