hi, zhisheng:


我解析 json 后:
(xxx, xxx, xxx, topic, partition, offset)
=>


(false,1603420582310,"INSERT","test3.order",2,75)
(false,1603421312803,"INSERT","test3.order",2,76)
(false,1603421344819,"INSERT","test3.order",2,77)
(false,1603421344819,"INSERT","test3.order",2,78)


我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到


我的猜想:


我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)


broker1 配置:


listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT


broker2 配置:


listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT







broker3 配置:


listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT


本机连接kafka:
properties.setProperty("bootstrap.servers", "xxx-b-1:9797")


是跟这个配置有关吗? 










在 2020-10-23 08:37:14,"zhisheng" <zhisheng2...@gmail.com> 写道:
>hi
>
>如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
>来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
>
>eg:
>
>      env.addSource(new FlinkKafkaConsumer011<>(
>parameters.get("topic"),                new
>JSONKeyValueDeserializationSchema(true),
>buildKafkaProps(parameters)))                .flatMap(new
>FlatMapFunction<ObjectNode, ObjectNode>() {
>@Override                    public void flatMap(ObjectNode jsonNodes,
>Collector<ObjectNode> collector) throws Exception {
>    System.out.println(jsonNodes.get("value"));
>System.out.println(jsonNodes.get("metadata").get("topic").asText());
>
>System.out.println(jsonNodes.get("metadata").get("offset").asText());
>
>System.out.println(jsonNodes.get("metadata").get("partition").asText());
>                       collector.collect(jsonNodes);
>    }                })                .print();
>
>Best
>
>zhisheng
>
>
>Lynn Chen <alynnc...@163.com> 于2020年10月23日周五 上午12:13写道:
>
>>
>>
>>
>>
>>
>>
>> hi,  Qijun Feng:
>>
>>
>> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-03 09:27:52,"LakeShen" <shenleifight...@gmail.com> 写道:
>> >Hi Qijun,
>> >
>> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:1585670400000L,后面是不是只有分区3写入数据,个人的想法。
>> >
>> >Best,
>> >LakeShen
>> >
>> >Qijun Feng <jun1st.f...@gmail.com> 于2020年4月2日周四 下午5:44写道:
>> >
>> >> Dear All,
>> >>
>> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>> >>  现在改成了所有地址,也换了 group.id
>> >>
>> >>
>> >> Properties properties = new Properties();
>> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> >> 10.216.77.170:9092,10.216.77.188:9092");
>> >> properties.setProperty("group.id", "behavior-logs-aggregator");
>> >>
>> >> FlinkKafkaConsumer010<BehaviorLog> kafkaConsumer010 =
>> >>        new FlinkKafkaConsumer010<BehaviorLog>("behavior-logs_dev", new
>> >> BehaviorLogDeserializationSchema(), properties);
>> >> kafkaConsumer010.setStartFromTimestamp(1585670400000L); //2020/04/01
>> >>
>> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者
>> 2
>> >> 的,
>> >>
>> >> 2020-04-02 14:54:58,532 INFO
>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> >> Consumer subtask 0 creating fetcher with offsets
>> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>> >>
>> >>
>> >> 是哪里有问题吗?
>> >>
>> >>
>>

回复