[
https://issues.apache.org/jira/browse/KAFKA-8401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
leishuiyu updated KAFKA-8401:
-----------------------------
Description:
# this is code
{code:java}
public class Consumer extends Thread {
KafkaConsumer<Integer, String> consumer;
public Consumer() {
Properties props = new Properties();
//47.105.201.137 is public network Ip
props.put("bootstrap.servers", "47.105.201.137:9092"); //连接地址
props.put("group.id", "lsy_test");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<Integer, String>(props);
}
@Override
public void run() {
consumer.subscribe(Arrays.asList("flink_order"));
while (true) {
ConsumerRecords<Integer, String> poll =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, String> record : poll) {
System.out.println(record.key() + "-------" + record.value());
}
}
}
public static void main(String[] args) {
Consumer sumer = new Consumer();
sumer.start();
}
}
{code}
# Configured hosts for remote machines
{code:java}
xx.xx.xx.xx centos-7{code}
# when my code running in local machines,the
bootstrap.servers=47.105.201.137:9092 the consumer poll is blocking ,howerver
in my mac set /etc/hosts 47.105.201.137 centos-7 and
boostrap.servers=centos-7:9092 the consumer can poll message,The previous
methods consumer.listTopics() is successful,only poll message is blocking ,I
feel very confused
was:
# this is code
{code:java}
//public class Consumer extends Thread {
KafkaConsumer<Integer, String> consumer;
public Consumer() {
Properties props = new Properties();
//47.105.201.137 is public network Ip
props.put("bootstrap.servers", "47.105.201.137:9092"); //连接地址
props.put("group.id", "lsy_test");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<Integer, String>(props);
}
@Override
public void run() {
consumer.subscribe(Arrays.asList("flink_order"));
while (true) {
ConsumerRecords<Integer, String> poll =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, String> record : poll) {
System.out.println(record.key() + "-------" + record.value());
}
}
}
public static void main(String[] args) {
Consumer sumer = new Consumer();
sumer.start();
}
}
{code}
# Configured hosts for remote machines
{code:java}
//xx.xx.xx.xx centos-7{code}
# when my code running in local machines,the
bootstrap.servers=47.105.201.137:9092 the consumer poll is blocking ,howerver
in my mac set /etc/hosts 47.105.201.137 centos-7 and
boostrap.servers=centos-7:9092 the consumer can poll message,The previous
methods consumer.listTopics() is successful,only poll message is blocking ,I
feel very confused
> consumer.poll(Duration.ofMillis(100)) blocking
> -----------------------------------------------
>
> Key: KAFKA-8401
> URL: https://issues.apache.org/jira/browse/KAFKA-8401
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 1.1.0
> Environment: kafka 1.1.0
> zk 3.4.11
> Reporter: leishuiyu
> Priority: Major
> Labels: blocking, kafka
>
> # this is code
> {code:java}
> public class Consumer extends Thread {
> KafkaConsumer<Integer, String> consumer;
> public Consumer() {
> Properties props = new Properties();
> //47.105.201.137 is public network Ip
> props.put("bootstrap.servers", "47.105.201.137:9092"); //连接地址
> props.put("group.id", "lsy_test");
> props.put("zookeeper.session.timeout.ms", "400");
> props.put("zookeeper.sync.time.ms", "200");
> props.put("auto.commit.interval.ms", "1000");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.IntegerDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> this.consumer = new KafkaConsumer<Integer, String>(props);
> }
> @Override
> public void run() {
> consumer.subscribe(Arrays.asList("flink_order"));
> while (true) {
> ConsumerRecords<Integer, String> poll =
> consumer.poll(Duration.ofMillis(100));
> for (ConsumerRecord<Integer, String> record : poll) {
> System.out.println(record.key() + "-------" + record.value());
> }
> }
> }
> public static void main(String[] args) {
> Consumer sumer = new Consumer();
> sumer.start();
> }
> }
> {code}
> # Configured hosts for remote machines
> {code:java}
> xx.xx.xx.xx centos-7{code}
> # when my code running in local machines,the
> bootstrap.servers=47.105.201.137:9092 the consumer poll is blocking ,howerver
> in my mac set /etc/hosts 47.105.201.137 centos-7 and
> boostrap.servers=centos-7:9092 the consumer can poll message,The previous
> methods consumer.listTopics() is successful,only poll message is blocking ,I
> feel very confused
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)