[ 
https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375380#comment-16375380
 ] 

Jason Gustafson commented on KAFKA-6052:
----------------------------------------

It is clear that the transaction markers are failing to be written. I see in 
one of the posted logs that the transaction coordinator continually retries 
sending, but it always fails. The most puzzling part is that I see it initiate 
the connection to the destination broker (which happens to be itself), but I 
never see it complete the connection, nor do I see it disconnect or attempt to 
reconnect. It is as if the connection is being established successfully but the 
notification is never reaching the NetworkClient. That made me think of 
KAFKA-3378, but that was fixed well before 0.11, so I'm not sure.

[~vmallavarapu] Could you post broker logs as well? It would be helpful to 
confirm whether it looks like the same problem.

> Windows: Consumers not polling when isolation.level=read_committed 
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6052
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6052
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, producer 
>    Affects Versions: 0.11.0.0
>         Environment: Windows 10. All processes running in embedded mode.
>            Reporter: Ansel Zandegran
>            Priority: Major
>              Labels: windows
>         Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, 
> logFile.log
>
>
> *The same code is running fine in Linux.* I am trying to send a transactional 
> record with exactly once schematics. These are my producer, consumer and 
> broker setups. 
> public void sendWithTTemp(String topic, EHEvent event) {
>     Properties props = new Properties();
>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>               "localhost:9092,localhost:9093,localhost:9094");
> //    props.put("bootstrap.servers", 
> "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
>     props.put(ProducerConfig.ACKS_CONFIG, "all");
>     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
>     props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
>     props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
>     props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
>     props.put("transactional.id", "TID" + transactionId.incrementAndGet());
>     props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");
>     Producer<String, String> producer =
>         new KafkaProducer<>(props,
>                             new StringSerializer(),
>                             new StringSerializer());
>     Logger.log(this, "Initializing transaction...");
>     producer.initTransactions();
>     Logger.log(this, "Initializing done.");
>     try {
>       Logger.log(this, "Begin transaction...");
>       producer.beginTransaction();
>       Logger.log(this, "Begin transaction done.");
>       Logger.log(this, "Sending events...");
>       producer.send(new ProducerRecord<>(topic,
>                                          event.getKey().toString(),
>                                          event.getValue().toString()));
>       Logger.log(this, "Sending events done.");
>       Logger.log(this, "Committing...");
>       producer.commitTransaction();
>       Logger.log(this, "Committing done.");
>     } catch (ProducerFencedException | OutOfOrderSequenceException
>         | AuthorizationException e) {
>       producer.close();
>       e.printStackTrace();
>     } catch (KafkaException e) {
>       producer.abortTransaction();
>       e.printStackTrace();
>     }
>     producer.close();
>   }
> *In Consumer*
> I have set isolation.level=read_committed
> *In 3 Brokers*
> I'm running with the following properties
>       Properties props = new Properties();
>       props.setProperty("broker.id", "" + i);
>       props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
>       props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + 
> i);
>       props.setProperty("num.partitions", "1");
>       props.setProperty("zookeeper.connect", "localhost:2181");
>       props.setProperty("zookeeper.connection.timeout.ms", "6000");
>       props.setProperty("min.insync.replicas", "2");
>       props.setProperty("offsets.topic.replication.factor", "2");
>       props.setProperty("offsets.topic.num.partitions", "1");
>       props.setProperty("transaction.state.log.num.partitions", "2");
>       props.setProperty("transaction.state.log.replication.factor", "2");
>       props.setProperty("transaction.state.log.min.isr", "2");
> I am not getting any records in the consumer. When I set 
> isolation.level=read_uncommitted, I get the records. I assume that the 
> records are not getting commited. What could be the problem? log attached



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to