[ 
https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397789#comment-16397789
 ] 

ASF GitHub Bot commented on KAFKA-6052:
---------------------------------------

vahidhashemian opened a new pull request #4705: KAFKA-6052: Fix the request 
retry issue (on Windows) in InterBrokerSendThread
URL: https://github.com/apache/kafka/pull/4705
 
 
   This resolves the issue detected on Windows. It's a follow-up to the 
investigation done by @hachikuji (details on the JIRA).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Windows: Consumers not polling when isolation.level=read_committed 
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6052
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6052
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, producer 
>    Affects Versions: 0.11.0.0
>         Environment: Windows 10. All processes running in embedded mode.
>            Reporter: Ansel Zandegran
>            Priority: Major
>              Labels: windows
>         Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, 
> logFile.log
>
>
> *The same code is running fine in Linux.* I am trying to send a transactional 
> record with exactly once schematics. These are my producer, consumer and 
> broker setups. 
> public void sendWithTTemp(String topic, EHEvent event) {
>     Properties props = new Properties();
>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>               "localhost:9092,localhost:9093,localhost:9094");
> //    props.put("bootstrap.servers", 
> "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
>     props.put(ProducerConfig.ACKS_CONFIG, "all");
>     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
>     props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
>     props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
>     props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
>     props.put("transactional.id", "TID" + transactionId.incrementAndGet());
>     props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");
>     Producer<String, String> producer =
>         new KafkaProducer<>(props,
>                             new StringSerializer(),
>                             new StringSerializer());
>     Logger.log(this, "Initializing transaction...");
>     producer.initTransactions();
>     Logger.log(this, "Initializing done.");
>     try {
>       Logger.log(this, "Begin transaction...");
>       producer.beginTransaction();
>       Logger.log(this, "Begin transaction done.");
>       Logger.log(this, "Sending events...");
>       producer.send(new ProducerRecord<>(topic,
>                                          event.getKey().toString(),
>                                          event.getValue().toString()));
>       Logger.log(this, "Sending events done.");
>       Logger.log(this, "Committing...");
>       producer.commitTransaction();
>       Logger.log(this, "Committing done.");
>     } catch (ProducerFencedException | OutOfOrderSequenceException
>         | AuthorizationException e) {
>       producer.close();
>       e.printStackTrace();
>     } catch (KafkaException e) {
>       producer.abortTransaction();
>       e.printStackTrace();
>     }
>     producer.close();
>   }
> *In Consumer*
> I have set isolation.level=read_committed
> *In 3 Brokers*
> I'm running with the following properties
>       Properties props = new Properties();
>       props.setProperty("broker.id", "" + i);
>       props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
>       props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + 
> i);
>       props.setProperty("num.partitions", "1");
>       props.setProperty("zookeeper.connect", "localhost:2181");
>       props.setProperty("zookeeper.connection.timeout.ms", "6000");
>       props.setProperty("min.insync.replicas", "2");
>       props.setProperty("offsets.topic.replication.factor", "2");
>       props.setProperty("offsets.topic.num.partitions", "1");
>       props.setProperty("transaction.state.log.num.partitions", "2");
>       props.setProperty("transaction.state.log.replication.factor", "2");
>       props.setProperty("transaction.state.log.min.isr", "2");
> I am not getting any records in the consumer. When I set 
> isolation.level=read_uncommitted, I get the records. I assume that the 
> records are not getting commited. What could be the problem? log attached



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to