Re:Re: 来自潘明文的邮件
HI 您好, 我的就是一个源同时写入HBASE SINK,和HBASE SINK.。 在 2022-06-27 23:54:33,"Weihua Hu" 写道: >Hi, >图片看不到了,正常来说做个 Sink 算子之间是没有执行先后顺序保证的,是可以并行的。 但是如果多个 sink 被 operator chain >优化在一起,单个 operator chain 内部数据是并行的 >Best, >Weihua > > >On Fri, Jun 24, 2022 at 9:29 PM Lincoln Lee wrote: > >> Hi, >>邮件中直接贴图片无法正常看到,可以发下文本 >> >> Best, >> Lincoln Lee >> >> >> 潘明文 于2022年6月24日周五 16:36写道: >> >> > 你好,下面2个SINK 能够并发同时处理吗?还是要窜行,等第一个SINK 好了,才能第二个SINK. >> > >> > >>
来自潘明文的邮件
你好,下面2个SINK 能够并发同时处理吗?还是要窜行,等第一个SINK 好了,才能第二个SINK.
hbase 写性能
HI, 目前环境是hbase4个Region server节点,内存为128`256, 目前写入hbase7000——1多每秒,这正常,是否有调优的空间。谢谢! 代码如下: BufferedMutatorParams params= new BufferedMutatorParams(TableName.valueOf(key)); //设置缓存10m,当达到10m时数据会自动刷到hbase params.writeBufferSize(10*1024*1024L);//设置缓存的大小 params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);//写缓存刷写时间为5s BufferedMutator mutator = connection.getBufferedMutator(params); mutator.mutate(recordList); mutator.flush(); mutator.close();
FLINK 实时写入HBASE性能问题
HI, 目前环境是hbase4个Region server节点,内存为128`256, 目前写入hbase7000——1多每秒,这正常,是否有调优的空间。谢谢 代码如下:
Re:Re: io.network.netty.exception
HI , 谢谢,有没有好的解决方案解决该问题呀? 在 2022-03-08 02:20:57,"Zhilong Hong" 写道: >Hi, 明文: > >这个报错实际上是TM失联,一般是TM被kill导致的,可以根据TM的Flink日志和GC日志、集群层面的NM日志(YARN环境)或者是K8S日志查看TM被kill的原因。一般情况下可能是:gc时间过长导致TM心跳超时被kill、TM内存超用导致container/pod被kill等等。 > >Best. >Zhilong > >On Mon, Mar 7, 2022 at 10:18 AM 潘明文 wrote: > >> HI 读kafka,入hbase和kafka >> flink任务经常性报错 >> >> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: >> Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. >> This might indicate that the remote task manager was lost.
Re:Re:Re:回复:FlinkKafkaProducer 问题
HI, flink 还是报以下错误: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker 代码如下: Properties prop = new Properties(); prop.setProperty("bootstrap.servers", KAFKA_ADDR); prop.setProperty("acks", "all"); //设置producer 幂等性 保证producer 数据写入到broker 不重复 prop.setProperty("enable.idempotence", "true"); // 设置FlinkKafkaProducer里面的事务超时时间,默认broker的最大事务超时时间为15分钟 prop.setProperty("transaction.timeout.ms", transaction + ""); prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my_tx_id"); prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>( WRITE_TOPIC, serializationSchema, prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); At 2022-03-07 10:06:45, "潘明文" wrote: 目前出现下错误: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker 在 2022-01-21 15:15:51,"潘明文" 写道: > > > > > > > > > > > > > > > >HI , > 好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector > flink-connector-kafka_2.11 > >在 2022-01-21 14:36:06,"selves_nan" 写道: >>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api >> >> >>| | >>selves_nan >>| >>| >>selves_...@163.com >>| >>签名由网易邮箱大师定制 >> >> >>在2022年01月21日 13:00,潘明文 写道: >>HI, >>"生产者的事务id" 怎么获取呀? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2022-01-21 10:41:37,"selves_nan" 写道: >>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。 >>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id"); >>//开启幂等性 >>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); >> >> >>| | >>selves_nan >>| >>| >>selves_...@163.com >>| >>签名由网易邮箱大师定制 >> >> >>在2022年01月20日 14:39,潘明文 写道: >>hi, >>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。 >> >>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, >>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, >>FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >> >> >>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: >>The producer attempted to use a producer id which is not currently assigned >>to its transactional id. >>at >>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) >>at >>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) >>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) >>at >>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) >>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) >>at >>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) >>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) >>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) >>at java.lang.Thread.run(Thread.java:748) >>Suppressed: java.lang.NullPointerException >>
Re:Re: flink任务经常性报错
看日志: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'cdh04/192.168.0.12:45843'. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) 在 2022-03-07 11:01:22,"yue ma" 写道: >这个报错的意思是有 TM 断开了连接,我觉得可以首先看看你们 'cdh02/xxx:42892' 这个丢失的 TM >的日志上有没有什么异常信息,如果没有的话也可以看看对应的机器监控有没有异常。 > >潘明文 于2022年3月7日周一 10:21写道: > >> HI 读kafka,入hbase和kafka >> flink任务经常性报错 >> >> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: >> Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. >> This might indicate that the remote task manager was lost.
flink任务经常性报错
HI 读kafka,入hbase和kafka flink任务经常性报错 org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. This might indicate that the remote task manager was lost.
io.network.netty.exception
HI 读kafka,入hbase和kafka flink任务经常性报错 org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. This might indicate that the remote task manager was lost.
Re:Re:回复:FlinkKafkaProducer 问题
目前出现下错误: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker 在 2022-01-21 15:15:51,"潘明文" 写道: > > > > > > > > > > > > > > > >HI , > 好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector > flink-connector-kafka_2.11 > >在 2022-01-21 14:36:06,"selves_nan" 写道: >>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api >> >> >>| | >>selves_nan >>| >>| >>selves_...@163.com >>| >>签名由网易邮箱大师定制 >> >> >>在2022年01月21日 13:00,潘明文 写道: >>HI, >>"生产者的事务id" 怎么获取呀? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2022-01-21 10:41:37,"selves_nan" 写道: >>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。 >>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id"); >>//开启幂等性 >>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); >> >> >>| | >>selves_nan >>| >>| >>selves_...@163.com >>| >>签名由网易邮箱大师定制 >> >> >>在2022年01月20日 14:39,潘明文 写道: >>hi, >>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。 >> >>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, >>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, >>FlinkKafkaProducer.Semantic.EXACTLY_ONCE); >> >> >>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: >>The producer attempted to use a producer id which is not currently assigned >>to its transactional id. >>at >>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) >>at >>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) >>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) >>at >>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) >>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) >>at >>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) >>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) >>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) >>at java.lang.Thread.run(Thread.java:748) >>Suppressed: java.lang.NullPointerException >>
elasticsearch+hbase
HI, 现在环境是CDH 集群6台下,elasticsearch作为hbase二级索引,如何优化代码使得通过elasticsearch二级索引再查询hbase数据速度优化到0.1秒一下。谢谢。
elasticsearch+hbase
HI, 现在环境是CDH 集群6台下,elasticsearch作为hbase二级索引,如何优化代码使得查询速度优化到0.1秒一下。谢谢。
hbase NoSuchMethodError: org.apache.hadoop.hbase.client.HTable.getTableName()[B
HI 您好, hbase-client 包是2.1.0 flink 1.12.4 hbase 代码如下: hbase代码extends TableInputFormat> try { connection = ConnectionFactory.createConnection(hbaseConf); // Table table=connection.getTable(TableName.valueOf(tableName)); table = (HTable) connection.getTable(TableName.valueOf(tableName)); } catch (IOException e) { logger.error("HBase连接异常", e.getCause()); System.out.println("--"); } System.out.println("--aaa"); scan = new Scan().addFamily(Bytes.toBytes(family)); scan.withStartRow(startRow.getBytes()); scan.withStopRow(endRow.getBytes()); System.out.println("--"); 错误如下: Exception in thread "main" org.apache.flink.util.FlinkException: Failed to execute job 'Flink Streaming Job'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1796) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1765) at com.example.app.hbasesource.main(hbasesource.java:25) Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: org.apache.hadoop.hbase.client.HTable.getTableName()[B at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:260) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:866) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:257) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276) at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249) at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:342) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:327) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478) ... 4 more Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.HTable.getTableName()[B at org.apache.flink.addons.hbase.AbstractTableInputFormat.createInputSplits(AbstractTableInputFormat.java:232) at org.apache.flink.addons.hbase.AbstractTableInputFormat.createInputSplits(AbstractTableInputFormat.java:44) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:247) ... 18 more
Re:回复:FlinkKafkaProducer 问题
HI , 好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector flink-connector-kafka_2.11 在 2022-01-21 14:36:06,"selves_nan" 写道: >Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api > > >| | >selves_nan >| >| >selves_...@163.com >| >签名由网易邮箱大师定制 > > >在2022年01月21日 13:00,潘明文 写道: >HI, >"生产者的事务id" 怎么获取呀? > > > > > > > > > > > > > > > > > >在 2022-01-21 10:41:37,"selves_nan" 写道: >Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。 >prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id"); >//开启幂等性 >prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); > > >| | >selves_nan >| >| >selves_...@163.com >| >签名由网易邮箱大师定制 > > >在2022年01月20日 14:39,潘明文 写道: >hi, >我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。 > >FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, >new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, >FlinkKafkaProducer.Semantic.EXACTLY_ONCE); > > >org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The >producer attempted to use a producer id which is not currently assigned to its >transactional id. >at >org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) >at >org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) >at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) >at >org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) >at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) >at >org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) >at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) >at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) >at java.lang.Thread.run(Thread.java:748) >Suppressed: java.lang.NullPointerException >
Re:回复:FlinkKafkaProducer 问题
HI, "生产者的事务id" 怎么获取呀? 在 2022-01-21 10:41:37,"selves_nan" 写道: >Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。 >prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id"); >//开启幂等性 >prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); > > >| | >selves_nan >| >| >selves_...@163.com >| >签名由网易邮箱大师定制 > > >在2022年01月20日 14:39,潘明文 写道: >hi, >我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。 > >FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, >new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, >FlinkKafkaProducer.Semantic.EXACTLY_ONCE); > > >org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The >producer attempted to use a producer id which is not currently assigned to its >transactional id. >at >org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) >at >org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) >at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) >at >org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) >at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) >at >org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) >at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) >at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) >at java.lang.Thread.run(Thread.java:748) >Suppressed: java.lang.NullPointerException >
FlinkKafkaProducer 问题
hi, 我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。 FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id. at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at java.lang.Thread.run(Thread.java:748) Suppressed: java.lang.NullPointerException
checkpoint 代码中如何设置TTL
您好, checkpoint 基于RocksDBStateBackend的如何设置TTL?
Re:Re: image
select * from mykafka1 错误信息: [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.(Lorg/apache/kafka/common/Metric;)V 在 2020-02-14 17:43:16,"Benchao Li" 写道: >明文, > >你的图片没有在邮件中显示出来。 >你可以用第三方的图床来上传图片,或者直接以text的方式发送你的异常信息。 > > >潘明文 于2020年2月14日周五 下午5:31写道: > >> >> >> CREATE TABLE mykafka1(name String) WITH ( >>'connector.type' = 'kafka', >>'connector.version' = 'universal', >>'connector.topic' = 'mysql_binlog', >>'connector.properties.zookeeper.connect' = 'masternode1:2181', >>'connector.properties.bootstrap.servers' = 'masternode1:9092', >>'format.type' = 'csv', >>'update-mode' = 'append' >> ); >> >> >> 出现下面错误,没有方法,是版本不对吗?我用的是flink-sql-connector-kafka_2.11-1.10.0.jar >> flink-connector-kafka_2.11-1.10.0.jar >> >> >> >> >> > > >-- > >Benchao Li >School of Electronics Engineering and Computer Science, Peking University >Tel:+86-15650713730 >Email: libenc...@gmail.com; libenc...@pku.edu.cn
image
CREATE TABLE mykafka1(name String) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'mysql_binlog', 'connector.properties.zookeeper.connect' = 'masternode1:2181', 'connector.properties.bootstrap.servers' = 'masternode1:9092', 'format.type' = 'csv', 'update-mode' = 'append' ); 出现下面错误,没有方法,是版本不对吗?我用的是flink-sql-connector-kafka_2.11-1.10.0.jar flink-connector-kafka_2.11-1.10.0.jar