flink-connector-jdbc是否支持多个values问题
请问flink-connector-jdbc是否支持同时写入多个values的用法?
Re:Re:Re:回复:FlinkKafkaProducer 问题
HI, flink 还是报以下错误: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker 代码如下: Properties prop = new Properties(); prop.setProperty("bootstrap.servers", KAFKA_ADDR); prop.setProperty("acks", "all"); //设置producer 幂等性 保证producer 数据写入到broker 不重复 prop.setProperty("enable.idempotence", "true"); // 设置FlinkKafkaProducer里面的事务超时时间,默认broker的最大事务超时时间为15分钟 prop.setProperty("transaction.timeout.ms", transaction + ""); prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my_tx_id"); prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>( WRITE_TOPIC, serializationSchema, prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); At 2022-03-07 10:06:45, "潘明文" wrote: 目前出现下错误: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker 在 2022-01-21 15:15:51,"潘明文" 写道: > > > > > > > > > > > > > > > >HI , > 好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector > flink-connector-kafka_2.11 > >在 2022-01-21 14:36:06,"selves_nan" 写道: >>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api >> >> >>| | >>selves_nan >>| >>| >>selves_...@163.com >>| >>签名由网易邮箱大师定制 >> >> >>在2022年01月21日 13:00,潘明文 写道: >>HI, >>"生产者的事务id" 怎么获取呀? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2022-01-21 10:41:37,"selves_nan" 写道: >>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。 >>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id"); >>//开启幂等性 >>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); >> >> >>| | >>selves_nan >>| >>| >>selves_...@163.com >>| >>签名由网易邮箱大师定制 >> >> >>在2022年01月20日 14:39,潘明文 写道: >>hi, >>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。 >> >>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, >>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, >>FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >> >> >>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: >>The producer attempted to use a producer id which is not currently assigned >>to its transactional id. >>at >>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) >>at >>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) >>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) >>at >>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) >>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) >>at >>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) >>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) >>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) >>at java.lang.Thread.run(Thread.java:748) >>Suppressed: java.lang.NullPointerException >>
退订
退订 发自我的iPhone
??????Re: flink??????????????
?? ---- ??: "user-zh"
?????? flink??????????????
?? ---- ??: "user-zh"
Re:Re: flink任务经常性报错
看日志: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'cdh04/192.168.0.12:45843'. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) 在 2022-03-07 11:01:22,"yue ma" 写道: >这个报错的意思是有 TM 断开了连接,我觉得可以首先看看你们 'cdh02/xxx:42892' 这个丢失的 TM >的日志上有没有什么异常信息,如果没有的话也可以看看对应的机器监控有没有异常。 > >潘明文 于2022年3月7日周一 10:21写道: > >> HI 读kafka,入hbase和kafka >> flink任务经常性报错 >> >> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: >> Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. >> This might indicate that the remote task manager was lost.
Re: flink任务经常性报错
这个报错的意思是有 TM 断开了连接,我觉得可以首先看看你们 'cdh02/xxx:42892' 这个丢失的 TM 的日志上有没有什么异常信息,如果没有的话也可以看看对应的机器监控有没有异常。 潘明文 于2022年3月7日周一 10:21写道: > HI 读kafka,入hbase和kafka > flink任务经常性报错 > > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. > This might indicate that the remote task manager was lost.
flink任务经常性报错
HI 读kafka,入hbase和kafka flink任务经常性报错 org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. This might indicate that the remote task manager was lost.
io.network.netty.exception
HI 读kafka,入hbase和kafka flink任务经常性报错 org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. This might indicate that the remote task manager was lost.
Re:Re:回复:FlinkKafkaProducer 问题
目前出现下错误: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker 在 2022-01-21 15:15:51,"潘明文" 写道: > > > > > > > > > > > > > > > >HI , > 好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector > flink-connector-kafka_2.11 > >在 2022-01-21 14:36:06,"selves_nan" 写道: >>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api >> >> >>| | >>selves_nan >>| >>| >>selves_...@163.com >>| >>签名由网易邮箱大师定制 >> >> >>在2022年01月21日 13:00,潘明文 写道: >>HI, >>"生产者的事务id" 怎么获取呀? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2022-01-21 10:41:37,"selves_nan" 写道: >>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。 >>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id"); >>//开启幂等性 >>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); >> >> >>| | >>selves_nan >>| >>| >>selves_...@163.com >>| >>签名由网易邮箱大师定制 >> >> >>在2022年01月20日 14:39,潘明文 写道: >>hi, >>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。 >> >>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, >>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, >>FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >> >> >>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: >>The producer attempted to use a producer id which is not currently assigned >>to its transactional id. >>at >>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) >>at >>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) >>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) >>at >>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) >>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) >>at >>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) >>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) >>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) >>at java.lang.Thread.run(Thread.java:748) >>Suppressed: java.lang.NullPointerException >>