[ https://issues.apache.org/jira/browse/KAFKA-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Narendra Kumar resolved KAFKA-8380. ----------------------------------- Resolution: Not A Problem > We can not create a topic, immediately write to it and then read. > ----------------------------------------------------------------- > > Key: KAFKA-8380 > URL: https://issues.apache.org/jira/browse/KAFKA-8380 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.2.0 > Reporter: Darya Merkureva > Priority: Blocker > > We are trying to create a topic, immediately write to it and read. > For some reason, we read nothing in spite of the fact that we are waiting for > the completion of KafkaFuture. > {code:java} > public class main { > private static final String TOPIC_NAME = "topic"; > private static final String KEY_NAME = "key"; > public static void main(String[] args) { > final Properties prodProps = new Properties(); > prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > prodProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); > prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000); > prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > final Producer<String, String> prod = new > KafkaProducer<>(prodProps); > final Properties admProps = new Properties(); > admProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > final AdminClient adm = KafkaAdminClient.create(admProps); > final Properties consProps = new Properties(); > consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > consProps.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); > consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); > consProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, > "1000"); > consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > "30000"); > consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > final Consumer<String,String> cons = new > KafkaConsumer<>(consProps); > > try { > final NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, > (short)1); > val createTopicsResult = > adm.createTopics(Collections.singleton(newTopic)); > createTopicsResult.values().get(TOPIC_NAME).get(); > } catch (InterruptedException | ExecutionException e) { > if (!(e.getCause() instanceof TopicExistsException)) { > throw new RuntimeException(e.getMessage(), e); > } > } > > final ProducerRecord<String, String> producerRecord = > new ProducerRecord<>(TOPIC_NAME, KEY_NAME, > "data"); > prod.send(producerRecord); > prod.send(producerRecord); > prod.send(producerRecord); > prod.send(producerRecord); > cons.subscribe(Arrays.asList(TOPIC_NAME)); > val records = cons.poll(Duration.ofSeconds(10)); > for(var record: records){ > System.out.println(record.value()); > } > } > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)