flink-connector-jdbc是否支持多个values问题

2022-03-06 文章 payne_z
请问flink-connector-jdbc是否支持同时写入多个values的用法?

Re:Re:Re:回复:FlinkKafkaProducer 问题

2022-03-06 文章 潘明文
HI,


 flink 还是报以下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


代码如下:
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", KAFKA_ADDR);
prop.setProperty("acks", "all");
//设置producer 幂等性 保证producer 数据写入到broker 不重复
prop.setProperty("enable.idempotence", "true");
// 设置FlinkKafkaProducer里面的事务超时时间,默认broker的最大事务超时时间为15分钟
prop.setProperty("transaction.timeout.ms", transaction + "");
prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my_tx_id");
prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);


FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(
WRITE_TOPIC,
serializationSchema,
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); 














At 2022-03-07 10:06:45, "潘明文"  wrote:

目前出现下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


















在 2022-01-21 15:15:51,"潘明文"  写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>HI ,
>   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector 
> flink-connector-kafka_2.11
>
>在 2022-01-21 14:36:06,"selves_nan"  写道:
>>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月21日 13:00,潘明文 写道:
>>HI,
>>"生产者的事务id"  怎么获取呀?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-01-21 10:41:37,"selves_nan"  写道:
>>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>>//开启幂等性
>>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月20日 14:39,潘明文 写道:
>>hi,
>>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>>
>>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
>>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>
>>
>>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
>>The producer attempted to use a producer id which is not currently assigned 
>>to its transactional id.
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>at 
>>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>>at 
>>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>>at java.lang.Thread.run(Thread.java:748)
>>Suppressed: java.lang.NullPointerException
>>





 

退订

2022-03-06 文章 朱福生
退订



发自我的iPhone

??????Re: flink??????????????

2022-03-06 文章 ????
??




----
??: 
   "user-zh"



?????? flink??????????????

2022-03-06 文章 ????
??




----
??: 
   "user-zh"



Re:Re: flink任务经常性报错

2022-03-06 文章 潘明文



看日志:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'cdh04/192.168.0.12:45843'. This might indicate that the remote task manager 
was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)














在 2022-03-07 11:01:22,"yue ma"  写道:
>这个报错的意思是有 TM 断开了连接,我觉得可以首先看看你们 'cdh02/xxx:42892' 这个丢失的 TM
>的日志上有没有什么异常信息,如果没有的话也可以看看对应的机器监控有没有异常。
>
>潘明文  于2022年3月7日周一 10:21写道:
>
>> HI 读kafka,入hbase和kafka
>> flink任务经常性报错
>>
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'.
>> This might indicate that the remote task manager was lost.


Re: flink任务经常性报错

2022-03-06 文章 yue ma
这个报错的意思是有 TM 断开了连接,我觉得可以首先看看你们 'cdh02/xxx:42892' 这个丢失的 TM
的日志上有没有什么异常信息,如果没有的话也可以看看对应的机器监控有没有异常。

潘明文  于2022年3月7日周一 10:21写道:

> HI 读kafka,入hbase和kafka
> flink任务经常性报错
>
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'.
> This might indicate that the remote task manager was lost.


flink任务经常性报错

2022-03-06 文章 潘明文
HI 读kafka,入hbase和kafka
flink任务经常性报错

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. This 
might indicate that the remote task manager was lost.

io.network.netty.exception

2022-03-06 文章 潘明文
HI 读kafka,入hbase和kafka
flink任务经常性报错

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. This 
might indicate that the remote task manager was lost.

Re:Re:回复:FlinkKafkaProducer 问题

2022-03-06 文章 潘明文
目前出现下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


















在 2022-01-21 15:15:51,"潘明文"  写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>HI ,
>   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector 
> flink-connector-kafka_2.11
>
>在 2022-01-21 14:36:06,"selves_nan"  写道:
>>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月21日 13:00,潘明文 写道:
>>HI,
>>"生产者的事务id"  怎么获取呀?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-01-21 10:41:37,"selves_nan"  写道:
>>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>>//开启幂等性
>>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月20日 14:39,潘明文 写道:
>>hi,
>>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>>
>>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
>>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>
>>
>>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
>>The producer attempted to use a producer id which is not currently assigned 
>>to its transactional id.
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>at 
>>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>>at 
>>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>>at java.lang.Thread.run(Thread.java:748)
>>Suppressed: java.lang.NullPointerException
>>