来自Lynn Chen的邮件

2022-12-21 Thread Lynn Chen
退订

flink job 一直失败重试, 生产者id 没有被分配事务 id

2021-02-28 Thread Lynn Chen
hi,  all 


flink 生产数据到 kafka 报错,  导致 job  一直重试


跟踪情况:  每个 job 启动后,  大约正常跑 20 天左右就开始出现这个问题了, 导致任务一直重试, 一直未找到问题的真实原因




报错信息:


org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
at java.lang.Thread.run(Thread.java:748)




kafka 生产配置: 


// InstanceAlreadyExistsException
prop.setProperty("client.id", "")
// 修改生产者的事务超时属性transaction.timeout.ms
prop.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "")
prop.setProperty("max_in_flight_requests_per_connection", "1")
// 幂等性 Producer ENABLE_IDEMPOTENCE_CONFIG
prop.setProperty("enable_idempotence_config", "true")
// RETRIES_CONFIG
prop.setProperty("retries_config", "5")

val kafkaSink: FlinkKafkaProducer[String] = new FlinkKafkaProducer[String](
  topic,
new ResultStringKafkaSerializationSchema(topic),
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)


flink 同样配置的 EXACTLY_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)


谢谢大家!











RichFlatMapFunction => NoClassDefFoundError

2020-10-31 Thread Lynn Chen
hi, dear all :


为什么我通过 flatmap 写 liststate,  extends  RichFlatMapFunction, 一直报 
java.lang.NoClassDefFoundError ??


stream.map(x => (x._4._1, x._4._2))
  .keyBy(_._2)
  .flatMap(new ReceptionListStateFunction2)


class ReceptionListStateFunction2 extends RichFlatMapFunction[(String, Int), 
List[String]] {
var myState: ListState[String] = _

override def flatMap(value: (String, Int), out: Collector[List[String]]): Unit 
= {
if (value._2 == 1) {
myState.add(value._1)
} 

val states = myState.get().iterator()
val listBuf: ListBuffer[String] = new ListBuffer[String]()
while (states.hasNext) {
  listBuf.append(states.next())
}
out.collect(listBuf.toList)
  }
}
错误日志:

Exception in thread "main" java.util.concurrent.ExecutionException: 
scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot 
initialize the compiler due to java.lang.NoClassDefFoundError: Could not 
initialize class scala.tools.nsc.Properties$

at 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)

at 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)

at 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)

at 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)

at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)

at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)

at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)











Re:Re: Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-23 Thread Lynn Chen



hi ,all


我的问题解决了.   出现该问题的原因如下:


因为通过堡垒机端口转发, 所以需要在bootstrap.servers 写上所有 kafka borker即可


1. 修改 kafka 外网配置
>> broker1 配置:
>> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT

对应的broker2  broker3 分别改为 9798  9799



2. 让运维对三个端口进行映射转发
xxx-b-1:9797 --> xxx-a-1:9797
xxx-b-1:9798 --> xxx-a-2:9798
xxx-b-1:9799 --> xxx-a-3:9799



3. properties.setProperty("bootstrap.servers", 
"xxx-b-1:9797,xxx-b-1:9798,xxx-b-1:9799")





确实是少转发了数据节点, 导致的只能读取一个节点的数据  (正好这个节点对应的 partition 是 2,  才反应过来可能是少转发节点的问题)


感谢 zhisheng 的帮助!



在 2020-10-23 11:56:08,"zhisheng"  写道:
>hi
>
>既然你只能消费到一个分区的数据,那么可以肯定的是消费能拿到的只是一个分区的数据,另外看到你说
>
>> 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)
>
>建议看看是不是这个转发有问题,只转发了一个节点
>
>Best
>zhisheng
>
>Lynn Chen  于2020年10月23日周五 上午11:01写道:
>
>>
>>
>>
>> hi, zhisheng:
>>
>>
>> 我解析 json 后:
>> (xxx, xxx, xxx, topic, partition, offset)
>> =>
>>
>>
>> (false,1603420582310,"INSERT","test3.order",2,75)
>> (false,1603421312803,"INSERT","test3.order",2,76)
>> (false,1603421344819,"INSERT","test3.order",2,77)
>> (false,1603421344819,"INSERT","test3.order",2,78)
>>
>>
>> 我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到
>>
>>
>> 我的猜想:
>>
>>
>> 我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)
>>
>>
>> broker1 配置:
>>
>>
>> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT
>>
>>
>> broker2 配置:
>>
>>
>> listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT
>>
>>
>>
>>
>>
>>
>>
>> broker3 配置:
>>
>>
>> listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
>> advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
>> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>> security.inter.broker.protocol=PLAINTEXT
>>
>>
>> 本机连接kafka:
>> properties.setProperty("bootstrap.servers", "xxx-b-1:9797")
>>
>>
>> 是跟这个配置有关吗?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-10-23 08:37:14,"zhisheng"  写道:
>> >hi
>> >
>> >如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
>> >来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
>> >
>> >eg:
>> >
>> >  env.addSource(new FlinkKafkaConsumer011<>(
>> >parameters.get("topic"),new
>> >JSONKeyValueDeserializationSchema(true),
>> >buildKafkaProps(parameters))).flatMap(new
>> >FlatMapFunction() {
>> >@Overridepublic void flatMap(ObjectNode jsonNodes,
>> >Collector collector) throws Exception {
>> >System.out.println(jsonNodes.get("value"));
>> >System.out.println(jsonNodes.get("metadata").get("topic").asText());
>> >
>> >System.out.println(jsonNodes.get("metadata").get("offset").asText());
>> >
>> >System.out.println(jsonNodes.get("metadata").get("partition").asText());
>> >   collector.collect(jsonNodes);
>> >}}).print();
>> >
>> >Best
>> >
>> >zhisheng
>> >
>> >
>> >Lynn Chen  于2020年10月23日周五 上午12:13写道:
>> >
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> hi,  Qijun Feng:
>> >>
>> >>
>> >> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-04-03 09:27:52,"LakeShen"  写道:
>> >> >Hi Qijun,
>> >> >
>> >> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
>> >> >
>> >> >Best,
>> >> >LakeShen
>> >> >
>> >> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
>> >> >
>> >> >> Dear All,
>> >> >>
>> >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> >> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>> >> >>  现在改成了所有地址,也换了 group.id
>> >> >>
>> >> >>
>> >> >> Properties properties = new Properties();
>> >> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> >> >> 10.216.77.170:9092,10.216.77.188:9092");
>> >> >> properties.setProperty("group.id", "behavior-logs-aggregator");
>> >> >>
>> >> >> FlinkKafkaConsumer010 kafkaConsumer010 =
>> >> >>new FlinkKafkaConsumer010("behavior-logs_dev",
>> new
>> >> >> BehaviorLogDeserializationSchema(), properties);
>> >> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
>> >> >>
>> >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有
>> partiton=1,或者
>> >> 2
>> >> >> 的,
>> >> >>
>> >> >> 2020-04-02 14:54:58,532 INFO
>> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> >> >> Consumer subtask 0 creating fetcher with offsets
>> >> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>> >> >>
>> >> >>
>> >> >> 是哪里有问题吗?
>> >> >>
>> >> >>
>> >>
>>


Re:Re: Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 Thread Lynn Chen



hi, zhisheng:


我解析 json 后:
(xxx, xxx, xxx, topic, partition, offset)
=>


(false,1603420582310,"INSERT","test3.order",2,75)
(false,1603421312803,"INSERT","test3.order",2,76)
(false,1603421344819,"INSERT","test3.order",2,77)
(false,1603421344819,"INSERT","test3.order",2,78)


我增加十几条数据,   拿到的都是 partition 2 的数据(4 条),  1跟 3 的没有拿到


我的猜想:


我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1)


broker1 配置:


listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT


broker2 配置:


listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT







broker3 配置:


listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
security.inter.broker.protocol=PLAINTEXT


本机连接kafka:
properties.setProperty("bootstrap.servers", "xxx-b-1:9797")


是跟这个配置有关吗? 










在 2020-10-23 08:37:14,"zhisheng"  写道:
>hi
>
>如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
>来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。
>
>eg:
>
>  env.addSource(new FlinkKafkaConsumer011<>(
>parameters.get("topic"),new
>JSONKeyValueDeserializationSchema(true),
>buildKafkaProps(parameters))).flatMap(new
>FlatMapFunction() {
>@Overridepublic void flatMap(ObjectNode jsonNodes,
>Collector collector) throws Exception {
>System.out.println(jsonNodes.get("value"));
>System.out.println(jsonNodes.get("metadata").get("topic").asText());
>
>System.out.println(jsonNodes.get("metadata").get("offset").asText());
>
>System.out.println(jsonNodes.get("metadata").get("partition").asText());
>   collector.collect(jsonNodes);
>}}).print();
>
>Best
>
>zhisheng
>
>
>Lynn Chen  于2020年10月23日周五 上午12:13写道:
>
>>
>>
>>
>>
>>
>>
>> hi,  Qijun Feng:
>>
>>
>> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-03 09:27:52,"LakeShen"  写道:
>> >Hi Qijun,
>> >
>> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
>> >
>> >Best,
>> >LakeShen
>> >
>> >Qijun Feng  于2020年4月2日周四 下午5:44写道:
>> >
>> >> Dear All,
>> >>
>> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>> >>  现在改成了所有地址,也换了 group.id
>> >>
>> >>
>> >> Properties properties = new Properties();
>> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> >> 10.216.77.170:9092,10.216.77.188:9092");
>> >> properties.setProperty("group.id", "behavior-logs-aggregator");
>> >>
>> >> FlinkKafkaConsumer010 kafkaConsumer010 =
>> >>new FlinkKafkaConsumer010("behavior-logs_dev", new
>> >> BehaviorLogDeserializationSchema(), properties);
>> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
>> >>
>> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者
>> 2
>> >> 的,
>> >>
>> >> 2020-04-02 14:54:58,532 INFO
>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> >> Consumer subtask 0 creating fetcher with offsets
>> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>> >>
>> >>
>> >> 是哪里有问题吗?
>> >>
>> >>
>>


Re:Re: Flink 读取 Kafka 多个 Partition 问题,

2020-10-22 Thread Lynn Chen






hi,  Qijun Feng:


我也遇到了类似的问题, 请问您后来是怎么解决的哈?

















在 2020-04-03 09:27:52,"LakeShen"  写道:
>Hi Qijun,
>
>看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。
>
>Best,
>LakeShen
>
>Qijun Feng  于2020年4月2日周四 下午5:44写道:
>
>> Dear All,
>>
>> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
>> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>>  现在改成了所有地址,也换了 group.id
>>
>>
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
>> 10.216.77.170:9092,10.216.77.188:9092");
>> properties.setProperty("group.id", "behavior-logs-aggregator");
>>
>> FlinkKafkaConsumer010 kafkaConsumer010 =
>>new FlinkKafkaConsumer010("behavior-logs_dev", new
>> BehaviorLogDeserializationSchema(), properties);
>> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
>>
>> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 2
>> 的,
>>
>> 2020-04-02 14:54:58,532 INFO
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> Consumer subtask 0 creating fetcher with offsets
>> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>>
>>
>> 是哪里有问题吗?
>>
>>


Re:基于flink1.10源码编译问题

2020-08-26 Thread Lynn Chen





aliyun-mapr-public

mapr-releases

mapr-releases

https://maven.aliyun.com/repository/mapr-public








confluent-packages-maven

confluent

confluent

https://packages.confluent.io/maven








aliyun-public

*

aliyun public

https://maven.aliyun.com/repository/public














Lynn Chen









在 2020-08-21 19:44:20,"魏烽"  写道:
>各位好:
>
>我在flink源码基于1.10编译时有好多jar包都下载不下来,比如flink-test-utils、