You can add the config *props.put("metadata.max.age.ms <http://metadata.max.age.ms>", 5000);* to your cases, and re-test it.
飞翔的加菲猫 <526564...@qq.com> 于2018年10月18日周四 上午10:47写道: > Sorry for bothering. I don't know whether it is a bug. Maybe something > wrong in my test or there is explanation for it. Could any Kafka master > help take a look? Thanks lot. > > > Ruiping Li > > > ------------------ 原始邮件 ------------------ > 发件人: "526564746"<526564...@qq.com>; > 发送时间: 2018年10月12日(星期五) 下午4:42 > 收件人: "users"<users@kafka.apache.org>; > > 主题: New increased partitions could not be rebalance, until stop all > consumers and start them > > > > Hi Kafka team, > > > I meet a strange thing about Kafka rebalance. If I increase partitions of > a topic which subscribed by some java consumers(in same one group), there > is no rebalance occur. Furthermore, if I start a new consumer (or stop one) > to cause a rebalance, the increased partitions could not be assigned, until > I stop all consumers and start them. Is that normal? > > > Thanks, > Ruiping Li > > > > -------------------------------------------------------------------------------- > Below is my test: > 1. Start Kafka, ZK. Create a normal topic(test-topic) with 1 partitions > $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic > test-topic --partitions 1 --replication-factor 1 --config > retention.ms=604800000 > > 2. Start 2 java consumers (C1, C2), subscribe test-topic > 3. Increase 2 partitions of test-topic > $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic > test-topic --partitions 3 > WARNING: If partitions are increased for a topic that has a key, the > partition logic or ordering of the messages will be affected > Adding partitions succeeded! > > Increasing succeeded: > $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic > test-topic > Topic:test-topic PartitionCount:3 ReplicationFactor:1 Configs: > retention.ms=604800000 > Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr: > 0 > Topic: test-topic Partition: 1 Leader: 0 Replicas: 0 Isr: > 0 > Topic: test-topic Partition: 2 Leader: 0 Replicas: 0 Isr: > 0 > > There is no rebalance occur in C1, C2. > 4. Start a new consumer C3 to subscribed test-topic. Rebalance occur, but > only partition test-topic-0 involved in reassigned, no test-topic-1 and > test-topic-2. > 5. I try to stop C2, C3, and test-topic-1 and test-topic-2 still not be > assigned. > 6. Stop all running consumers, and then start them. All test-topic-0,1,2 > assigned normally. > > > Environment > kafka & java api version: kafka_2.12-2.0.0 (I also tried kafka_2.11-1.0.0 > and kafka_2.10-0.10.2.1, same result) > zookeeper: 3.4.13 > consumer code: > // consumer > public class KafkaConsumerThread extends Thread { > // consumer settings > public static org.apache.kafka.clients.consumer.KafkaConsumer<String, > String> createNativeConsumer(String groupName, String kafkaBootstrap) { > Properties props = new Properties(); > props.put("bootstrap.servers", kafkaBootstrap); > props.put("group.id", groupName); > props.put("auto.offset.reset", "earliest"); > props.put("enable.auto.commit", true); > > props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); > > > props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); > > > > return new KafkaConsumer<String, String>(props); > } > > > private static final Logger log = > LoggerFactory.getLogger(KafkaConsumerThread.class); > private boolean stop = false; > private KafkaConsumer<String, String> consumer; > private String topicName; > private ConsumerRebalanceListener consumerRebalanceListener; > private AtomicLong receivedRecordNumber = new AtomicLong(0); > > > public KafkaConsumerThread(String topicName, String groupName, > ConsumerRebalanceListener consumerRebalanceListener, String kafkaBootstrap) > { > this.consumer = createNativeConsumer(groupName, kafkaBootstrap); > this.topicName = topicName; > this.consumerRebalanceListener = consumerRebalanceListener; > } > > > @Override > public void run() { > log.info("Start consumer .."); > consumer.subscribe(Collections.singleton(topicName), > consumerRebalanceListener); > while (!stop) { > try { > ConsumerRecords<String, String> records = > consumer.poll(100); > receivedRecordNumber.addAndGet(records.count()); > Iterator<ConsumerRecord<String, String>> iterator = > records.iterator(); > while (iterator.hasNext()) { > ConsumerRecord<String, String> record = > iterator.next(); > log.info("Receive [key:{}][value:{}]", record.key(), > record.value()); > } > } catch (TimeoutException e) { > log.info("no data"); > } > } > consumer.close(); > } > > > public void stopConsumer() { > this.stop = true; > } > }