来自Lynn Chen的邮件
退订
flink job 一直失败重试, 生产者id 没有被分配事务 id
hi, all flink 生产数据到 kafka 报错, 导致 job 一直重试 跟踪情况: 每个 job 启动后, 大约正常跑 20 天左右就开始出现这个问题了, 导致任务一直重试, 一直未找到问题的真实原因 报错信息: org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id. at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at java.lang.Thread.run(Thread.java:748) kafka 生产配置: // InstanceAlreadyExistsException prop.setProperty("client.id", "") // 修改生产者的事务超时属性transaction.timeout.ms prop.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "") prop.setProperty("max_in_flight_requests_per_connection", "1") // 幂等性 Producer ENABLE_IDEMPOTENCE_CONFIG prop.setProperty("enable_idempotence_config", "true") // RETRIES_CONFIG prop.setProperty("retries_config", "5") val kafkaSink: FlinkKafkaProducer[String] = new FlinkKafkaProducer[String]( topic, new ResultStringKafkaSerializationSchema(topic), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE ) flink 同样配置的 EXACTLY_ONCE env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 谢谢大家!
RichFlatMapFunction => NoClassDefFoundError
hi, dear all : 为什么我通过 flatmap 写 liststate, extends RichFlatMapFunction, 一直报 java.lang.NoClassDefFoundError ?? stream.map(x => (x._4._1, x._4._2)) .keyBy(_._2) .flatMap(new ReceptionListStateFunction2) class ReceptionListStateFunction2 extends RichFlatMapFunction[(String, Int), List[String]] { var myState: ListState[String] = _ override def flatMap(value: (String, Int), out: Collector[List[String]]): Unit = { if (value._2 == 1) { myState.add(value._1) } val states = myState.get().iterator() val listBuf: ListBuffer[String] = new ListBuffer[String]() while (states.hasNext) { listBuf.append(states.next()) } out.collect(listBuf.toList) } } 错误日志: Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler due to java.lang.NoClassDefFoundError: Could not initialize class scala.tools.nsc.Properties$ at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) at org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)
Re:Re: Re: Re: Flink 读取 Kafka 多个 Partition 问题,
hi ,all 我的问题解决了. 出现该问题的原因如下: 因为通过堡垒机端口转发, 所以需要在bootstrap.servers 写上所有 kafka borker即可 1. 修改 kafka 外网配置 >> broker1 配置: >> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797 >> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797 >> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT >> security.inter.broker.protocol=PLAINTEXT 对应的broker2 broker3 分别改为 9798 9799 2. 让运维对三个端口进行映射转发 xxx-b-1:9797 --> xxx-a-1:9797 xxx-b-1:9798 --> xxx-a-2:9798 xxx-b-1:9799 --> xxx-a-3:9799 3. properties.setProperty("bootstrap.servers", "xxx-b-1:9797,xxx-b-1:9798,xxx-b-1:9799") 确实是少转发了数据节点, 导致的只能读取一个节点的数据 (正好这个节点对应的 partition 是 2, 才反应过来可能是少转发节点的问题) 感谢 zhisheng 的帮助! 在 2020-10-23 11:56:08,"zhisheng" 写道: >hi > >既然你只能消费到一个分区的数据,那么可以肯定的是消费能拿到的只是一个分区的数据,另外看到你说 > >> 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1) > >建议看看是不是这个转发有问题,只转发了一个节点 > >Best >zhisheng > >Lynn Chen 于2020年10月23日周五 上午11:01写道: > >> >> >> >> hi, zhisheng: >> >> >> 我解析 json 后: >> (xxx, xxx, xxx, topic, partition, offset) >> => >> >> >> (false,1603420582310,"INSERT","test3.order",2,75) >> (false,1603421312803,"INSERT","test3.order",2,76) >> (false,1603421344819,"INSERT","test3.order",2,77) >> (false,1603421344819,"INSERT","test3.order",2,78) >> >> >> 我增加十几条数据, 拿到的都是 partition 2 的数据(4 条), 1跟 3 的没有拿到 >> >> >> 我的猜想: >> >> >> 我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1) >> >> >> broker1 配置: >> >> >> listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797 >> advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797 >> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT >> security.inter.broker.protocol=PLAINTEXT >> >> >> broker2 配置: >> >> >> listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797 >> advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797 >> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT >> security.inter.broker.protocol=PLAINTEXT >> >> >> >> >> >> >> >> broker3 配置: >> >> >> listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797 >> advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797 >> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT >> security.inter.broker.protocol=PLAINTEXT >> >> >> 本机连接kafka: >> properties.setProperty("bootstrap.servers", "xxx-b-1:9797") >> >> >> 是跟这个配置有关吗? >> >> >> >> >> >> >> >> >> >> >> 在 2020-10-23 08:37:14,"zhisheng" 写道: >> >hi >> > >> >如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema >> >来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。 >> > >> >eg: >> > >> > env.addSource(new FlinkKafkaConsumer011<>( >> >parameters.get("topic"),new >> >JSONKeyValueDeserializationSchema(true), >> >buildKafkaProps(parameters))).flatMap(new >> >FlatMapFunction() { >> >@Overridepublic void flatMap(ObjectNode jsonNodes, >> >Collector collector) throws Exception { >> >System.out.println(jsonNodes.get("value")); >> >System.out.println(jsonNodes.get("metadata").get("topic").asText()); >> > >> >System.out.println(jsonNodes.get("metadata").get("offset").asText()); >> > >> >System.out.println(jsonNodes.get("metadata").get("partition").asText()); >> > collector.collect(jsonNodes); >> >}}).print(); >> > >> >Best >> > >> >zhisheng >> > >> > >> >Lynn Chen 于2020年10月23日周五 上午12:13写道: >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> hi, Qijun Feng: >> >> >> >> >> >> 我也遇到了类似的问题, 请问您后来是怎么解决的哈? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-04-03 09:27:52,"LakeShen" 写道: >> >> >Hi Qijun, >> >> > >> >> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。 >> >> > >> >> >Best, >> >> >LakeShen >> >> > >> >> >Qijun Feng 于2020年4月2日周四 下午5:44写道: >> >> > >> >> >> Dear All, >> >> >> >> >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka >> >> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址, >> >> >> 现在改成了所有地址,也换了 group.id >> >> >> >> >> >> >> >> >> Properties properties = new Properties(); >> >> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092, >> >> >> 10.216.77.170:9092,10.216.77.188:9092"); >> >> >> properties.setProperty("group.id", "behavior-logs-aggregator"); >> >> >> >> >> >> FlinkKafkaConsumer010 kafkaConsumer010 = >> >> >>new FlinkKafkaConsumer010("behavior-logs_dev", >> new >> >> >> BehaviorLogDeserializationSchema(), properties); >> >> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01 >> >> >> >> >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 >> partiton=1,或者 >> >> 2 >> >> >> 的, >> >> >> >> >> >> 2020-04-02 14:54:58,532 INFO >> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> >> >> Consumer subtask 0 creating fetcher with offsets >> >> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}. >> >> >> >> >> >> >> >> >> 是哪里有问题吗? >> >> >> >> >> >> >> >> >>
Re:Re: Re: Flink 读取 Kafka 多个 Partition 问题,
hi, zhisheng: 我解析 json 后: (xxx, xxx, xxx, topic, partition, offset) => (false,1603420582310,"INSERT","test3.order",2,75) (false,1603421312803,"INSERT","test3.order",2,76) (false,1603421344819,"INSERT","test3.order",2,77) (false,1603421344819,"INSERT","test3.order",2,78) 我增加十几条数据, 拿到的都是 partition 2 的数据(4 条), 1跟 3 的没有拿到 我的猜想: 我做了一个 9797 外网端口, 用于本地开发调试 kafka(连到一个堡垒机xxx-b-1,转发 9797 到 xxx-a-1) broker1 配置: listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797 advertised.listeners=PLAINTEXT://xxx-a-1:9092,EXTERNAL://:9797 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT security.inter.broker.protocol=PLAINTEXT broker2 配置: listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797 advertised.listeners=PLAINTEXT://xxx-a-2:9092,EXTERNAL://:9797 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT security.inter.broker.protocol=PLAINTEXT broker3 配置: listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797 advertised.listeners=PLAINTEXT://xxx-a-3:9092,EXTERNAL://:9797 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT security.inter.broker.protocol=PLAINTEXT 本机连接kafka: properties.setProperty("bootstrap.servers", "xxx-b-1:9797") 是跟这个配置有关吗? 在 2020-10-23 08:37:14,"zhisheng" 写道: >hi > >如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema >来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。 > >eg: > > env.addSource(new FlinkKafkaConsumer011<>( >parameters.get("topic"),new >JSONKeyValueDeserializationSchema(true), >buildKafkaProps(parameters))).flatMap(new >FlatMapFunction() { >@Overridepublic void flatMap(ObjectNode jsonNodes, >Collector collector) throws Exception { >System.out.println(jsonNodes.get("value")); >System.out.println(jsonNodes.get("metadata").get("topic").asText()); > >System.out.println(jsonNodes.get("metadata").get("offset").asText()); > >System.out.println(jsonNodes.get("metadata").get("partition").asText()); > collector.collect(jsonNodes); >}}).print(); > >Best > >zhisheng > > >Lynn Chen 于2020年10月23日周五 上午12:13写道: > >> >> >> >> >> >> >> hi, Qijun Feng: >> >> >> 我也遇到了类似的问题, 请问您后来是怎么解决的哈? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-04-03 09:27:52,"LakeShen" 写道: >> >Hi Qijun, >> > >> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。 >> > >> >Best, >> >LakeShen >> > >> >Qijun Feng 于2020年4月2日周四 下午5:44写道: >> > >> >> Dear All, >> >> >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka >> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址, >> >> 现在改成了所有地址,也换了 group.id >> >> >> >> >> >> Properties properties = new Properties(); >> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092, >> >> 10.216.77.170:9092,10.216.77.188:9092"); >> >> properties.setProperty("group.id", "behavior-logs-aggregator"); >> >> >> >> FlinkKafkaConsumer010 kafkaConsumer010 = >> >>new FlinkKafkaConsumer010("behavior-logs_dev", new >> >> BehaviorLogDeserializationSchema(), properties); >> >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01 >> >> >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 >> 2 >> >> 的, >> >> >> >> 2020-04-02 14:54:58,532 INFO >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> >> Consumer subtask 0 creating fetcher with offsets >> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}. >> >> >> >> >> >> 是哪里有问题吗? >> >> >> >> >>
Re:Re: Flink 读取 Kafka 多个 Partition 问题,
hi, Qijun Feng: 我也遇到了类似的问题, 请问您后来是怎么解决的哈? 在 2020-04-03 09:27:52,"LakeShen" 写道: >Hi Qijun, > >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。 > >Best, >LakeShen > >Qijun Feng 于2020年4月2日周四 下午5:44写道: > >> Dear All, >> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址, >> 现在改成了所有地址,也换了 group.id >> >> >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092, >> 10.216.77.170:9092,10.216.77.188:9092"); >> properties.setProperty("group.id", "behavior-logs-aggregator"); >> >> FlinkKafkaConsumer010 kafkaConsumer010 = >>new FlinkKafkaConsumer010("behavior-logs_dev", new >> BehaviorLogDeserializationSchema(), properties); >> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01 >> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 2 >> 的, >> >> 2020-04-02 14:54:58,532 INFO >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> Consumer subtask 0 creating fetcher with offsets >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}. >> >> >> 是哪里有问题吗? >> >>
Re:基于flink1.10源码编译问题
aliyun-mapr-public mapr-releases mapr-releases https://maven.aliyun.com/repository/mapr-public confluent-packages-maven confluent confluent https://packages.confluent.io/maven aliyun-public * aliyun public https://maven.aliyun.com/repository/public Lynn Chen 在 2020-08-21 19:44:20,"魏烽" 写道: >各位好: > >我在flink源码基于1.10编译时有好多jar包都下载不下来,比如flink-test-utils、