Hi,
When I use AdminClient to delete the topic, and immediately create the topic 
that was just deleted, it always indicates that the topic already exists, but I 
try to get all the existing topics, the topic does not exist.
Following is the code I tested.

package org.wzh.three2.kafka.producer;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class ProducerClient {

    public static final String TOPIC = "topic-three2-kafka";

    public static void main(String[] args) throws ExecutionException, 
InterruptedException {
        initTopic();

//        KafkaProducer<String, String> producer = new 
KafkaProducer<>(initConfig());
//        for (int i = 0; i < Integer.MAX_VALUE; i++) {
//            ProducerRecord<String, String> record = new 
ProducerRecord<>(TOPIC, String.valueOf(i));
//            try {
//                Future<RecordMetadata> future = producer.send(record);
//                RecordMetadata metadata = future.get();
//                System.out.println(metadata.topic() + " - " + 
metadata.partition() + ":" + metadata.offset());
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            } catch (ExecutionException e) {
//                e.printStackTrace();
//            }
//            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
//        }
    }

    private static void listTopics(AdminClient client) throws 
ExecutionException, InterruptedException {
        System.out.println("+------Topics-------+");
        client.listTopics().names().get().stream().forEach(System.out::println);
        System.out.println("+-------------------+");
    }

    // 创建主题
    public static void initTopic() throws ExecutionException, 
InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092");
        AdminClient client = KafkaAdminClient.create(props);

        listTopics(client);

        NewTopic newTopic = new NewTopic(TOPIC, 10, (short) 3);
        if(client.listTopics().names().get().contains(TOPIC)) {
            System.out.println("Will delete topic: " + TOPIC);
            try {
                DeleteTopicsResult deleteTopicsResult = 
client.deleteTopics(Collections.singletonList(newTopic.name()));
                deleteTopicsResult.all().get();
                deleteTopicsResult.values().forEach((k, v) -> 
System.out.println(k + "\t" + v));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        listTopics(client);

        try {
            System.out.println("Will create topic: " + TOPIC);
            CreateTopicsResult result = 
client.createTopics(Collections.singletonList(newTopic));
            result.all().get();
            result.values().forEach((k, v) -> System.out.println(k + "\t" + v));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        client.close();
    }

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.111.101:9092,192.168.111.102:9092,192.168.111.103:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        return props;
    }
}

Brs/newbie
从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送

Reply via email to