Hi,
When I use AdminClient to delete the topic, and immediately create the topic
that was just deleted, it always indicates that the topic already exists, but I
try to get all the existing topics, the topic does not exist.
Following is the code I tested.
package org.wzh.three2.kafka.producer;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class ProducerClient {
public static final String TOPIC = "topic-three2-kafka";
public static void main(String[] args) throws ExecutionException,
InterruptedException {
initTopic();
// KafkaProducer<String, String> producer = new
KafkaProducer<>(initConfig());
// for (int i = 0; i < Integer.MAX_VALUE; i++) {
// ProducerRecord<String, String> record = new
ProducerRecord<>(TOPIC, String.valueOf(i));
// try {
// Future<RecordMetadata> future = producer.send(record);
// RecordMetadata metadata = future.get();
// System.out.println(metadata.topic() + " - " +
metadata.partition() + ":" + metadata.offset());
// } catch (InterruptedException e) {
// e.printStackTrace();
// } catch (ExecutionException e) {
// e.printStackTrace();
// }
// LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
// }
}
private static void listTopics(AdminClient client) throws
ExecutionException, InterruptedException {
System.out.println("+------Topics-------+");
client.listTopics().names().get().stream().forEach(System.out::println);
System.out.println("+-------------------+");
}
// 创建主题
public static void initTopic() throws ExecutionException,
InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092");
AdminClient client = KafkaAdminClient.create(props);
listTopics(client);
NewTopic newTopic = new NewTopic(TOPIC, 10, (short) 3);
if(client.listTopics().names().get().contains(TOPIC)) {
System.out.println("Will delete topic: " + TOPIC);
try {
DeleteTopicsResult deleteTopicsResult =
client.deleteTopics(Collections.singletonList(newTopic.name()));
deleteTopicsResult.all().get();
deleteTopicsResult.values().forEach((k, v) ->
System.out.println(k + "\t" + v));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
listTopics(client);
try {
System.out.println("Will create topic: " + TOPIC);
CreateTopicsResult result =
client.createTopics(Collections.singletonList(newTopic));
result.all().get();
result.values().forEach((k, v) -> System.out.println(k + "\t" + v));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
client.close();
}
public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return props;
}
}
Brs/newbie
从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送