Re:Re: 来自潘明文的邮件

2022-06-28 Thread
HI 您好,


  我的就是一个源同时写入HBASE SINK,和HBASE SINK.。














在 2022-06-27 23:54:33,"Weihua Hu"  写道:
>Hi,
>图片看不到了,正常来说做个 Sink 算子之间是没有执行先后顺序保证的,是可以并行的。 但是如果多个 sink 被 operator chain
>优化在一起,单个 operator chain 内部数据是并行的
>Best,
>Weihua
>
>
>On Fri, Jun 24, 2022 at 9:29 PM Lincoln Lee  wrote:
>
>> Hi,
>>邮件中直接贴图片无法正常看到,可以发下文本
>>
>> Best,
>> Lincoln Lee
>>
>>
>> 潘明文  于2022年6月24日周五 16:36写道:
>>
>> > 你好,下面2个SINK 能够并发同时处理吗?还是要窜行,等第一个SINK 好了,才能第二个SINK.
>> >
>> >
>>


来自潘明文的邮件

2022-06-24 Thread
你好,下面2个SINK 能够并发同时处理吗?还是要窜行,等第一个SINK 好了,才能第二个SINK.



hbase 写性能

2022-03-21 Thread
HI,

目前环境是hbase4个Region server节点,内存为128`256, 
目前写入hbase7000——1多每秒,这正常,是否有调优的空间。谢谢!


代码如下:
BufferedMutatorParams params= new BufferedMutatorParams(TableName.valueOf(key));
//设置缓存10m,当达到10m时数据会自动刷到hbase
params.writeBufferSize(10*1024*1024L);//设置缓存的大小
params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);//写缓存刷写时间为5s
BufferedMutator mutator = connection.getBufferedMutator(params);
mutator.mutate(recordList);
mutator.flush();
mutator.close();

FLINK 实时写入HBASE性能问题

2022-03-21 Thread
HI,


目前环境是hbase4个Region server节点,内存为128`256, 
目前写入hbase7000——1多每秒,这正常,是否有调优的空间。谢谢


代码如下:

Re:Re: io.network.netty.exception

2022-03-07 Thread
HI ,
  谢谢,有没有好的解决方案解决该问题呀?











在 2022-03-08 02:20:57,"Zhilong Hong"  写道:
>Hi, 明文:
>
>这个报错实际上是TM失联,一般是TM被kill导致的,可以根据TM的Flink日志和GC日志、集群层面的NM日志(YARN环境)或者是K8S日志查看TM被kill的原因。一般情况下可能是:gc时间过长导致TM心跳超时被kill、TM内存超用导致container/pod被kill等等。
>
>Best.
>Zhilong
>
>On Mon, Mar 7, 2022 at 10:18 AM 潘明文  wrote:
>
>> HI 读kafka,入hbase和kafka
>> flink任务经常性报错
>>
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'.
>> This might indicate that the remote task manager was lost.


Re:Re:Re:回复:FlinkKafkaProducer 问题

2022-03-06 Thread
HI,


 flink 还是报以下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


代码如下:
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", KAFKA_ADDR);
prop.setProperty("acks", "all");
//设置producer 幂等性 保证producer 数据写入到broker 不重复
prop.setProperty("enable.idempotence", "true");
// 设置FlinkKafkaProducer里面的事务超时时间,默认broker的最大事务超时时间为15分钟
prop.setProperty("transaction.timeout.ms", transaction + "");
prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my_tx_id");
prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);


FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(
WRITE_TOPIC,
serializationSchema,
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); 














At 2022-03-07 10:06:45, "潘明文"  wrote:

目前出现下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


















在 2022-01-21 15:15:51,"潘明文"  写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>HI ,
>   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector 
> flink-connector-kafka_2.11
>
>在 2022-01-21 14:36:06,"selves_nan"  写道:
>>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月21日 13:00,潘明文 写道:
>>HI,
>>"生产者的事务id"  怎么获取呀?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-01-21 10:41:37,"selves_nan"  写道:
>>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>>//开启幂等性
>>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月20日 14:39,潘明文 写道:
>>hi,
>>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>>
>>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
>>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>
>>
>>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
>>The producer attempted to use a producer id which is not currently assigned 
>>to its transactional id.
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>at 
>>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>>at 
>>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>>at java.lang.Thread.run(Thread.java:748)
>>Suppressed: java.lang.NullPointerException
>>





 

Re:Re: flink任务经常性报错

2022-03-06 Thread



看日志:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'cdh04/192.168.0.12:45843'. This might indicate that the remote task manager 
was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)














在 2022-03-07 11:01:22,"yue ma"  写道:
>这个报错的意思是有 TM 断开了连接,我觉得可以首先看看你们 'cdh02/xxx:42892' 这个丢失的 TM
>的日志上有没有什么异常信息,如果没有的话也可以看看对应的机器监控有没有异常。
>
>潘明文  于2022年3月7日周一 10:21写道:
>
>> HI 读kafka,入hbase和kafka
>> flink任务经常性报错
>>
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'.
>> This might indicate that the remote task manager was lost.


flink任务经常性报错

2022-03-06 Thread
HI 读kafka,入hbase和kafka
flink任务经常性报错

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. This 
might indicate that the remote task manager was lost.

io.network.netty.exception

2022-03-06 Thread
HI 读kafka,入hbase和kafka
flink任务经常性报错

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. This 
might indicate that the remote task manager was lost.

Re:Re:回复:FlinkKafkaProducer 问题

2022-03-06 Thread
目前出现下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


















在 2022-01-21 15:15:51,"潘明文"  写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>HI ,
>   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector 
> flink-connector-kafka_2.11
>
>在 2022-01-21 14:36:06,"selves_nan"  写道:
>>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月21日 13:00,潘明文 写道:
>>HI,
>>"生产者的事务id"  怎么获取呀?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-01-21 10:41:37,"selves_nan"  写道:
>>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>>//开启幂等性
>>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月20日 14:39,潘明文 写道:
>>hi,
>>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>>
>>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
>>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>
>>
>>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
>>The producer attempted to use a producer id which is not currently assigned 
>>to its transactional id.
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>at 
>>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>>at 
>>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>>at java.lang.Thread.run(Thread.java:748)
>>Suppressed: java.lang.NullPointerException
>>


elasticsearch+hbase

2022-03-01 Thread
HI,
   现在环境是CDH 
集群6台下,elasticsearch作为hbase二级索引,如何优化代码使得通过elasticsearch二级索引再查询hbase数据速度优化到0.1秒一下。谢谢。




 

elasticsearch+hbase

2022-03-01 Thread
HI,
   现在环境是CDH 集群6台下,elasticsearch作为hbase二级索引,如何优化代码使得查询速度优化到0.1秒一下。谢谢。

hbase NoSuchMethodError: org.apache.hadoop.hbase.client.HTable.getTableName()[B

2022-01-26 Thread
HI 您好,


hbase-client 包是2.1.0 flink 1.12.4
hbase 代码如下:
hbase代码extends TableInputFormat>
try {
connection = ConnectionFactory.createConnection(hbaseConf);
//   Table table=connection.getTable(TableName.valueOf(tableName));
table = (HTable) connection.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
logger.error("HBase连接异常", e.getCause());
System.out.println("--");
}
   System.out.println("--aaa");
scan = new Scan().addFamily(Bytes.toBytes(family));
scan.withStartRow(startRow.getBytes());
scan.withStopRow(endRow.getBytes());
System.out.println("--");
错误如下:
 Exception in thread "main" org.apache.flink.util.FlinkException: Failed to 
execute job 'Flink Streaming Job'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1796)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765)
at com.example.app.hbasesource.main(hbasesource.java:25)
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not 
instantiate JobManager.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not instantiate JobManager.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: org.apache.hadoop.hbase.client.HTable.getTableName()[B
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:260)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:866)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:257)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:342)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:327)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
... 4 more
Caused by: java.lang.NoSuchMethodError: 
org.apache.hadoop.hbase.client.HTable.getTableName()[B
at 
org.apache.flink.addons.hbase.AbstractTableInputFormat.createInputSplits(AbstractTableInputFormat.java:232)
at 
org.apache.flink.addons.hbase.AbstractTableInputFormat.createInputSplits(AbstractTableInputFormat.java:44)
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:247)
... 18 more

Re:回复:FlinkKafkaProducer 问题

2022-01-20 Thread















HI ,
   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector flink-connector-kafka_2.11

在 2022-01-21 14:36:06,"selves_nan"  写道:
>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>
>
>| |
>selves_nan
>|
>|
>selves_...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2022年01月21日 13:00,潘明文 写道:
>HI,
>"生产者的事务id"  怎么获取呀?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-01-21 10:41:37,"selves_nan"  写道:
>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>//开启幂等性
>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>
>
>| |
>selves_nan
>|
>|
>selves_...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2022年01月20日 14:39,潘明文 写道:
>hi,
>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>
>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>
>
>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
>producer attempted to use a producer id which is not currently assigned to its 
>transactional id.
>at 
>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>at 
>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>at 
>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>at 
>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>at java.lang.Thread.run(Thread.java:748)
>Suppressed: java.lang.NullPointerException
>


Re:回复:FlinkKafkaProducer 问题

2022-01-20 Thread
HI,
"生产者的事务id"  怎么获取呀?

















在 2022-01-21 10:41:37,"selves_nan"  写道:
>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>//开启幂等性
>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>
>
>| |
>selves_nan
>|
>|
>selves_...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2022年01月20日 14:39,潘明文 写道:
>hi,
>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>
>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>
>
>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
>producer attempted to use a producer id which is not currently assigned to its 
>transactional id.
>at 
>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>at 
>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>at 
>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>at 
>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>at java.lang.Thread.run(Thread.java:748)
>Suppressed: java.lang.NullPointerException
>


FlinkKafkaProducer 问题

2022-01-19 Thread
hi,
我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。

FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);


org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
 at 
org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
 at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
 at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
 at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
 at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
 at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
 at java.lang.Thread.run(Thread.java:748)
 Suppressed: java.lang.NullPointerException



checkpoint 代码中如何设置TTL

2020-03-12 Thread
您好,
   checkpoint  基于RocksDBStateBackend的如何设置TTL?

Re:Re: image

2020-02-14 Thread


select * from mykafka1

错误信息:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.(Lorg/apache/kafka/common/Metric;)V









在 2020-02-14 17:43:16,"Benchao Li"  写道:
>明文,
>
>你的图片没有在邮件中显示出来。
>你可以用第三方的图床来上传图片,或者直接以text的方式发送你的异常信息。
>
>
>潘明文  于2020年2月14日周五 下午5:31写道:
>
>>
>>
>> CREATE TABLE mykafka1(name String) WITH (
>>'connector.type' = 'kafka',
>>'connector.version' = 'universal',
>>'connector.topic' = 'mysql_binlog',
>>'connector.properties.zookeeper.connect' = 'masternode1:2181',
>>'connector.properties.bootstrap.servers' = 'masternode1:9092',
>>'format.type' = 'csv',
>>'update-mode' = 'append'
>> );
>>
>>
>> 出现下面错误,没有方法,是版本不对吗?我用的是flink-sql-connector-kafka_2.11-1.10.0.jar 
>> flink-connector-kafka_2.11-1.10.0.jar
>>
>>
>>
>>
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


image

2020-02-14 Thread




CREATE TABLE mykafka1(name String) WITH (
   'connector.type' = 'kafka',
   'connector.version' = 'universal',
   'connector.topic' = 'mysql_binlog',
   'connector.properties.zookeeper.connect' = 'masternode1:2181',
   'connector.properties.bootstrap.servers' = 'masternode1:9092',
   'format.type' = 'csv',
   'update-mode' = 'append'
);



出现下面错误,没有方法,是版本不对吗?我用的是flink-sql-connector-kafka_2.11-1.10.0.jar 
flink-connector-kafka_2.11-1.10.0.jar