Darya Merkureva created KAFKA-8380:
--------------------------------------

             Summary: I can not create a topic, immediately write to it and 
then read.
                 Key: KAFKA-8380
                 URL: https://issues.apache.org/jira/browse/KAFKA-8380
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.2.0
            Reporter: Darya Merkureva


We are trying to create a topic, immediately write to it and read. 
For some reason, we read nothing in spite of the fact that we are waiting for 
the completion of KafkaFuture. 
{code:java}
public class main {
        private static final String TOPIC_NAME = "topic";
        private static final String KEY_NAME = "key";
        public static void main(String[] args) {

                final Properties prodProps = new Properties();
                prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
                prodProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
                prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 50000);
                prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
                prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
                final Producer<String, String> prod = new 
KafkaProducer<>(prodProps);

                final Properties admProps = new Properties();
                admProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
                final AdminClient adm = KafkaAdminClient.create(admProps);

                final Properties consProps = new Properties();
                consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
                consProps.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
                consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
                consProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
"1000");
                consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"30000");
                consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
                consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
                final Consumer<String,String> cons = new 
KafkaConsumer<>(consProps);
                
                try {
                        final NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, 
(short)1);
                        val createTopicsResult = 
adm.createTopics(Collections.singleton(newTopic));
                        createTopicsResult.values().get(TOPIC_NAME).get();
                } catch (InterruptedException | ExecutionException e) {
                        if (!(e.getCause() instanceof TopicExistsException)) {
                                throw new RuntimeException(e.getMessage(), e);
                        }
                }
                
                final ProducerRecord<String, String> producerRecord =
                                new ProducerRecord<>(TOPIC_NAME, KEY_NAME, 
"data");
                prod.send(producerRecord);
                prod.send(producerRecord);
                prod.send(producerRecord);
                prod.send(producerRecord);
                cons.subscribe(Arrays.asList(TOPIC_NAME));
                val records  = cons.poll(Duration.ofSeconds(10));
                for(var record: records){
                        System.out.println(record.value());
                }
        }
}
{code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to