Hi, When I use AdminClient to delete the topic, and immediately create the topic that was just deleted, it always indicates that the topic already exists, but I try to get all the existing topics, the topic does not exist. Following is the code I tested.
package org.wzh.three2.kafka.producer; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; public class ProducerClient { public static final String TOPIC = "topic-three2-kafka"; public static void main(String[] args) throws ExecutionException, InterruptedException { initTopic(); // KafkaProducer<String, String> producer = new KafkaProducer<>(initConfig()); // for (int i = 0; i < Integer.MAX_VALUE; i++) { // ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, String.valueOf(i)); // try { // Future<RecordMetadata> future = producer.send(record); // RecordMetadata metadata = future.get(); // System.out.println(metadata.topic() + " - " + metadata.partition() + ":" + metadata.offset()); // } catch (InterruptedException e) { // e.printStackTrace(); // } catch (ExecutionException e) { // e.printStackTrace(); // } // LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3)); // } } private static void listTopics(AdminClient client) throws ExecutionException, InterruptedException { System.out.println("+------Topics-------+"); client.listTopics().names().get().stream().forEach(System.out::println); System.out.println("+-------------------+"); } // 创建主题 public static void initTopic() throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092"); AdminClient client = KafkaAdminClient.create(props); listTopics(client); NewTopic newTopic = new NewTopic(TOPIC, 10, (short) 3); if(client.listTopics().names().get().contains(TOPIC)) { System.out.println("Will delete topic: " + TOPIC); try { DeleteTopicsResult deleteTopicsResult = client.deleteTopics(Collections.singletonList(newTopic.name())); deleteTopicsResult.all().get(); deleteTopicsResult.values().forEach((k, v) -> System.out.println(k + "\t" + v)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } listTopics(client); try { System.out.println("Will create topic: " + TOPIC); CreateTopicsResult result = client.createTopics(Collections.singletonList(newTopic)); result.all().get(); result.values().forEach((k, v) -> System.out.println(k + "\t" + v)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } client.close(); } public static Properties initConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo"); return props; } } Brs/newbie 从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送