flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-18 文章 m13162790856
具体日主信息如下:


   org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:745) Caused by: 
org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
of org.apache.kafka.common.serialization.Deserializer at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
... 15 more 2020-11-19 15:17:32,0


有哪位同学遇见过

Re:使用window function后checkpoint fail,报错Operation category WRITE is not supported in state standby

2020-11-18 文章 kingdomad
问题好像解决了。
使用flink-connector-kafka-0.10_2.12的FlinkKafkaConsumer010就会无法checkpoint,报这个错误,
换成flink-connector-kafka-2.12的FlinkKafkaConsumer就可以正常checkpoint,没报错。
CheckpointingMode是EXACTLY_ONCE或AT_LEAST_ONCE情况都相同。
尚不知何原因。















--

kingdomad







在 2020-11-18 17:19:29,"kingdomad"  写道:
>flink 1.11.1消费kafka0.10.1.1,然后开窗口去重统计,时间是eventtime,窗口是1分钟。
>程序的结构大致如下:
>kafkaStream.keyBy().window().aggregate(newAverageAggregate());
>
>
>flink on yarn,
>程序能跑,但无法checkpoint,查看taskmanager的日志,发现报错如下。
>查看了下,那几个节点都是正常的running。如果去掉窗口统计的代码,直接print 
>kafkaStream,程序是可以正常checkpoint的。日志上也看不出其他问题,百思不得其解。求助各位大佬。
>
>
>
>
>
>
>2020-11-18 13:30:52,475 INFO  org.apache.kafka.common.utils.AppInfoParser  
>[] - Kafka version : 0.10.2.2
>
>2020-11-18 13:30:52,475 INFO  org.apache.kafka.common.utils.AppInfoParser  
>[] - Kafka commitId : cd80bc412b9b9701
>
>2020-11-18 13:31:09,668 INFO  
>org.apache.hadoop.io.retry.RetryInvocationHandler[] - 
>org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): 
>Operation category WRITE is not supported in state standby. Visit 
>https://s.apache.org/sbnn-error
>
>at 
>org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:88)
>
>at 
>org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1952)
>
>at 
>org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1423)
>
>at 
>org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:776)
>
>at 
>org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:475)
>
>at 
>org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>
>at 
>org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
>
>at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)
>
>at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)
>
>at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)
>
>at java.security.AccessController.doPrivileged(Native Method)
>
>at javax.security.auth.Subject.doAs(Subject.java:422)
>
>at 
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1685)
>
>at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)
>
>, while invoking ClientNamenodeProtocolTranslatorPB.create over xxx:8020 after 
>1 failover attempts. Trying to failover after sleeping for 864ms.
>
>
>
>
>
>
>
>
>
>
>
>
>
>--
>
>kingdomad
>


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-18 文章 Jark Wu
如果数据本身没什么倾斜,且并发也能打上去。那在 sql 这边也没什么其他办法了。得从 rocksdb 的角度去调优看看。比如:
1. 是否有使用 SSD?
2. 调整 write buffer 和 block cache
3. 更多可以看下这些 state 调优文章[1][2].

Best,
Jark

[1]: https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
[2]: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA

On Thu, 19 Nov 2020 at 12:19, jindy_liu <286729...@qq.com> wrote:

> 很感谢jark!
> 1、昨天将status表设置成时态表(Temporal
> Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
>
> 同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。
>
> 2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。
>
> 3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
> status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
> status2, test 200 -- status200),source仍然存在着反压,只是并发度高的反压慢点出现一些,
> 这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?
>
> 这个数据反压上,jark你有啥建议吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 huang botao
hi, zhisheng, hailongwang:

感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect()
后面没有定义watermar导致,在connect后指定watermark就可以触发window了。



On Wed, Nov 18, 2020 at 10:46 PM zhisheng  wrote:

> 可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。
>
> Best
> zhisheng
>
> huang botao  于2020年11月18日周三 下午10:34写道:
>
> > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> > window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
> >
> >
> > On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18868816...@163.com> wrote:
> >
> > > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
> > >
> > > 在 2020-11-18 15:29:54,"huang botao"  写道:
> > > >Hi ,请教一个奇怪的问题:
> > > >
> > > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> > > >
> > > >.assignTimestampsAndWatermarks(new
> > > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> > > >
> > > >.connect(ruleConfigSource)
> > > >.process(new MetricDataFilterProcessFunction())
> > > >.keyBy((KeySelector) metric -> {
> > > >MetricDataKey metricDataKey = new MetricDataKey();
> > > >metricDataKey.setDomain(metric.getDomain());
> > > >metricDataKey.setStationAliasCode(metric.getStaId());
> > > >metricDataKey.setEquipMK(metric.getEquipMK());
> > > >metricDataKey.setEquipID(metric.getEquipID());
> > > >metricDataKey.setMetric(metric.getMetric());
> > > >return metricDataKey;
> > > >})
> > > >
> > > >.window(SlidingEventTimeWindows.of(Time.seconds(2),
> > Time.seconds(1)))
> > > >.apply(new RichWindowFunction > > >MetricDataKey, TimeWindow>() {
> > > >@Override
> > > >public void apply(MetricDataKey tuple, TimeWindow window,
> > > >Iterable input, Collector out) throws
> > > >Exception {
> > > >input.forEach(x->{
> > > >System.out.println("--->>>"+x);
> > > >});
> > > >}
> > > >})
> > > >
> > > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
> > > >
> > > >
> > > >数据一直在消费着,没有任何报错信息
> > >
> >
>


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-18 文章 jindy_liu
很感谢jark!
1、昨天将status表设置成时态表(Temporal
Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。

2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。

3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
status2, test 200 -- status200),source仍然存在着反压,只是并发度高的反压慢点出现一些,
这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?

这个数据反压上,jark你有啥建议吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-18 文章 Jark Wu
我再仔细看了下你的问题,你的 join key 是 status id,所以目前会按照 status id 做 shuffle key 分发给
join 的不同并发处理。
如果 test 表的 status id 发生变更的话,就会导致一个 test  id 的数据会被不同的 join 并发处理,也即 test
数据已经乱序了,
这时候,即使下游再加 keyby sink key,也无济于事了。

所以,如果双流 join 两个 cdc 流,要注意 join key 是不能发生变更的,否则只能 join 设置成单并发。
像你这个场景,可以考虑采用维表 join status 表,因为目前维表 join 不会按照 join key 做 shuffle,所以能保证即使
test 表数据不乱序。
但是 status 表的更新,就无法触发计算 更新到sink 表了,只有 test 表的更新 才会触发计算并更新到 sink 表。

Best,
Jark



On Mon, 16 Nov 2020 at 16:03, jindy_liu <286729...@qq.com> wrote:

> 1、试了下
>
> 在test表中增加一个proctime
>
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `proctime` AS PROCTIME(),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'no_lock',
>   'password' = 'no_lock',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'test',
>   'debezium.snapshot.locking.mode' = 'none'
> );
>
> 写去重语句,
>
> INSERT into test_status_print
> SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name
> FROM (
> SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as
> rowNum
> FROM (
> SELECT t.* , s.name as status_name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id
> )
> )r WHERE rowNum = 1;
>
> 但提示报错,不支持:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Deduplicate doesn't support
> consuming update and delete changes which is produced by node
> Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name,
> time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey])
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:如何加载mysql数据到flink任务内存

2020-11-18 文章 hailongwang
Hi,
你需要继承 RichFunction,然后实现下 open 方法。
其中在 open 方法里面需要,
1. 建立 Mysql 连接
2. 获得 PreparedStatement 对象 
3. 根据 SQL 获得 ResultSet
4. 遍历 ResultSet load 在内存中
5. 释放连接资源




在 2020-11-18 22:58:52,"ゞ野蠻遊戲χ"  写道:
>大家好!  
>如何使用DataStream在任务初始化时候加载mysql数据到flink任务的内存中,请给我一个demo。
>谢谢,
>jiazhi


????????mysql??????flink????????

2020-11-18 文章 ?g???U?[????
  
DataStreammysql??flinkdemo??
??
jiazhi

Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-18 文章 zhisheng
是不是有 kafka 机器挂了?

Best
zhisheng

hailongwang <18868816...@163.com> 于2020年11月18日周三 下午5:56写道:

> 感觉还有其它 root cause,可以看下还有其它日志不?
>
>
> Best,
> Hailong
>
> At 2020-11-18 15:52:57, "赵一旦"  wrote:
> >2020-11-18 16:51:37
> >org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> >Partition
> b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
> >not found.
> >at org.apache.flink.runtime.io.network.partition.consumer.
> >RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
> >at org.apache.flink.runtime.io.network.partition.consumer.
>
> >RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
> >at org.apache.flink.runtime.io.network.partition.consumer.
> >SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
> >at org.apache.flink.runtime.io.network.partition.consumer.
>
> >SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765
> >)
> >at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> >.java:670)
> >at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> >CompletableFuture.java:646)
> >at java.util.concurrent.CompletableFuture$Completion.run(
> >CompletableFuture.java:456)
> >at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> >ForkJoinExecutorConfigurator.scala:44)
> >at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> >.java:1339)
> >at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
> >.java:107)
> >
> >
> >请问这是什么问题呢?
>


Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 zhisheng
可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。

Best
zhisheng

huang botao  于2020年11月18日周三 下午10:34写道:

> 感谢您的回复,是这样的,我这边的环境设置用的是eventTime
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法
>
>
> On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18868816...@163.com> wrote:
>
> > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
> >
> > 在 2020-11-18 15:29:54,"huang botao"  写道:
> > >Hi ,请教一个奇怪的问题:
> > >
> > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> > >
> > >.assignTimestampsAndWatermarks(new
> > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> > >
> > >.connect(ruleConfigSource)
> > >.process(new MetricDataFilterProcessFunction())
> > >.keyBy((KeySelector) metric -> {
> > >MetricDataKey metricDataKey = new MetricDataKey();
> > >metricDataKey.setDomain(metric.getDomain());
> > >metricDataKey.setStationAliasCode(metric.getStaId());
> > >metricDataKey.setEquipMK(metric.getEquipMK());
> > >metricDataKey.setEquipID(metric.getEquipID());
> > >metricDataKey.setMetric(metric.getMetric());
> > >return metricDataKey;
> > >})
> > >
> > >.window(SlidingEventTimeWindows.of(Time.seconds(2),
> Time.seconds(1)))
> > >.apply(new RichWindowFunction > >MetricDataKey, TimeWindow>() {
> > >@Override
> > >public void apply(MetricDataKey tuple, TimeWindow window,
> > >Iterable input, Collector out) throws
> > >Exception {
> > >input.forEach(x->{
> > >System.out.println("--->>>"+x);
> > >});
> > >}
> > >})
> > >
> > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
> > >
> > >
> > >数据一直在消费着,没有任何报错信息
> >
>


Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 huang botao
感谢您的回复,是这样的,我这边的环境设置用的是eventTime

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法


On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18868816...@163.com> wrote:

> 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
>
> 在 2020-11-18 15:29:54,"huang botao"  写道:
> >Hi ,请教一个奇怪的问题:
> >
> >streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
> >
> >.assignTimestampsAndWatermarks(new
> >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
> >
> >.connect(ruleConfigSource)
> >.process(new MetricDataFilterProcessFunction())
> >.keyBy((KeySelector) metric -> {
> >MetricDataKey metricDataKey = new MetricDataKey();
> >metricDataKey.setDomain(metric.getDomain());
> >metricDataKey.setStationAliasCode(metric.getStaId());
> >metricDataKey.setEquipMK(metric.getEquipMK());
> >metricDataKey.setEquipID(metric.getEquipID());
> >metricDataKey.setMetric(metric.getMetric());
> >return metricDataKey;
> >})
> >
> >.window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
> >.apply(new RichWindowFunction >MetricDataKey, TimeWindow>() {
> >@Override
> >public void apply(MetricDataKey tuple, TimeWindow window,
> >Iterable input, Collector out) throws
> >Exception {
> >input.forEach(x->{
> >System.out.println("--->>>"+x);
> >});
> >}
> >})
> >
> >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
> >
> >
> >数据一直在消费着,没有任何报错信息
>


es线程死锁问题

2020-11-18 文章 359502...@qq.com
现在遇到一个很尴尬的问题,就是从kafka这数据到es时,任务运行一段时间后就会出现死锁,es集群版本是5,flink写用的6,请问这个有影响吗?求助。

发自我的iPhone

Re:Flink cdc扫描过程报错Query execution was interrupted, maximum statement execution time exceeded

2020-11-18 文章 hailongwang
Hi,
   可以看下以下的文章是否对你有帮助[1][2]。
   如果不需要启动时候获得一次已有的快照,可以设置下 snapshot.mode 参数[3]


[1] 
https://dba.stackexchange.com/questions/134923/query-execution-was-interrupted-max-statement-time-exceeded
[2] https://mysqlserverteam.com/server-side-select-statement-timeouts/
[3] 
https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-property-snapshot-mode


Best,
Hailong Wang

在 2020-11-18 17:26:40,"丁浩浩" <18579099...@163.com> 写道:
>我有一张大表有上亿条数据,但是在扫描过程中发生错误。提示超过最大执行时间,这个应该如何解决。
>
>2020-11-18 17:07:43,658 INFO  io.debezium.connector.mysql.SnapshotReader   
>[] - Step 7: - 10403 of 131508640 rows scanned from table 
>'test.right_info' after 01:00:00.952
>2020-11-18 17:07:44,131 INFO  io.debezium.connector.mysql.SnapshotReader   
>[] - Step 7: - 10404 of 131508640 rows scanned from table 
>'test.right_info' after 01:00:01.425
>2020-11-18 17:07:44,601 INFO  io.debezium.connector.mysql.SnapshotReader   
>[] - Step 7: committing transaction
>2020-11-18 17:07:44,601 ERROR io.debezium.connector.mysql.SnapshotReader   
>[] - Failed due to error: Aborting snapshot due to error when last 
>running 'SELECT * FROM `test`.`right_info`': Query execution was interrupted, 
>maximum statement execution time exceeded
>org.apache.kafka.connect.errors.ConnectException: Query execution was 
>interrupted, maximum statement execution time exceeded Error code: 3024; 
>SQLSTATE: HY000.
>at 
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at 
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at 
> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [?:1.8.0_91]
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [?:1.8.0_91]
>at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
>Caused by: java.sql.SQLException: Query execution was interrupted, maximum 
>statement execution time exceeded
>at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at 
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
>  ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at com.mysql.cj.jdbc.result.ResultSetImpl.next(ResultSetImpl.java:1739) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at 
> io.debezium.connector.mysql.SnapshotReader.lambda$execute$14(SnapshotReader.java:648)
>  ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:473) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at 
> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:641) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>... 3 more
>2020-11-18 17:07:44,606 INFO  io.debezium.connector.common.BaseSourceTask  
>[] - Stopping down connector
>2020-11-18 17:07:44,606 INFO  io.debezium.connector.mysql.MySqlConnectorTask   
>[] - Stopping MySQL connector task
>2020-11-18 17:07:44,606 INFO  io.debezium.connector.mysql.ChainedReader
>[] - ChainedReader: Stopping the snapshot reader
>2020-11-18 17:07:44,607 INFO  io.debezium.connector.mysql.SnapshotReader   
>[] - Discarding 4363 unsent record(s) due to the connector 
>shutting down
>2020-11-18 17:07:44,607 INFO  io.debezium.connector.mysql.SnapshotReader   
>[] - Discarding 0 unsent record(s) due to the connector shutting 
>down
>2020-11-18 17:07:44,608 INFO  io.debezium.connector.mysql.MySqlConnectorTask   
>[] - Connector task finished all work and is now shutdown
>2020-11-18 17:07:44,609 ERROR 
>com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction[] - Reporting 
>error:
>org.apache.kafka.connect.errors.ConnectException: Query execution was 
>interrupted, maximum statement execution time exceeded Error code: 3024; 
>SQLSTATE: HY000.
>at 
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at 
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at 
> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) 
> ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [?:1.8.0_91]
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [?:1.8.0_91]

Flink cdc扫描过程报错Query execution was interrupted, maximum statement execution time exceeded

2020-11-18 文章 丁浩浩
我有一张大表有上亿条数据,但是在扫描过程中发生错误。提示超过最大执行时间,这个应该如何解决。

2020-11-18 17:07:43,658 INFO  io.debezium.connector.mysql.SnapshotReader
   [] - Step 7: - 10403 of 131508640 rows scanned from table 
'test.right_info' after 01:00:00.952
2020-11-18 17:07:44,131 INFO  io.debezium.connector.mysql.SnapshotReader
   [] - Step 7: - 10404 of 131508640 rows scanned from table 
'test.right_info' after 01:00:01.425
2020-11-18 17:07:44,601 INFO  io.debezium.connector.mysql.SnapshotReader
   [] - Step 7: committing transaction
2020-11-18 17:07:44,601 ERROR io.debezium.connector.mysql.SnapshotReader
   [] - Failed due to error: Aborting snapshot due to error when last 
running 'SELECT * FROM `test`.`right_info`': Query execution was interrupted, 
maximum statement execution time exceeded
org.apache.kafka.connect.errors.ConnectException: Query execution was 
interrupted, maximum statement execution time exceeded Error code: 3024; 
SQLSTATE: HY000.
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[?:1.8.0_91]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[?:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.sql.SQLException: Query execution was interrupted, maximum 
statement execution time exceeded
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
 ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at com.mysql.cj.jdbc.result.ResultSetImpl.next(ResultSetImpl.java:1739) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.SnapshotReader.lambda$execute$14(SnapshotReader.java:648)
 ~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:473) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:641) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
... 3 more
2020-11-18 17:07:44,606 INFO  io.debezium.connector.common.BaseSourceTask   
   [] - Stopping down connector
2020-11-18 17:07:44,606 INFO  io.debezium.connector.mysql.MySqlConnectorTask
   [] - Stopping MySQL connector task
2020-11-18 17:07:44,606 INFO  io.debezium.connector.mysql.ChainedReader 
   [] - ChainedReader: Stopping the snapshot reader
2020-11-18 17:07:44,607 INFO  io.debezium.connector.mysql.SnapshotReader
   [] - Discarding 4363 unsent record(s) due to the connector shutting 
down
2020-11-18 17:07:44,607 INFO  io.debezium.connector.mysql.SnapshotReader
   [] - Discarding 0 unsent record(s) due to the connector shutting down
2020-11-18 17:07:44,608 INFO  io.debezium.connector.mysql.MySqlConnectorTask
   [] - Connector task finished all work and is now shutdown
2020-11-18 17:07:44,609 ERROR 
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction[] - Reporting 
error:
org.apache.kafka.connect.errors.ConnectException: Query execution was 
interrupted, maximum statement execution time exceeded Error code: 3024; 
SQLSTATE: HY000.
at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[?:1.8.0_91]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[?:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.sql.SQLException: Query execution was interrupted, maximum 
statement execution time exceeded
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) 
~[flink-connector-mysql-cdc-1.1.0.jar:1.1.0]
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
 

Re:Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-18 文章 hailongwang
感觉还有其它 root cause,可以看下还有其它日志不?


Best,
Hailong

At 2020-11-18 15:52:57, "赵一旦"  wrote:
>2020-11-18 16:51:37
>org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
>Partition b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
>not found.
>at org.apache.flink.runtime.io.network.partition.consumer.
>RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
>at org.apache.flink.runtime.io.network.partition.consumer.
>RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
>at org.apache.flink.runtime.io.network.partition.consumer.
>SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
>at org.apache.flink.runtime.io.network.partition.consumer.
>SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765
>)
>at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
>.java:670)
>at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
>CompletableFuture.java:646)
>at java.util.concurrent.CompletableFuture$Completion.run(
>CompletableFuture.java:456)
>at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>ForkJoinExecutorConfigurator.scala:44)
>at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
>.java:1339)
>at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
>.java:107)
>
>
>请问这是什么问题呢?


Re:Re: Re: 回复:回复:回复:flinksql指定kafka多topic

2020-11-18 文章 hailongwang
是的,或者自己可以 cherry-pick 这个 MR,然后编译把。


在 2020-11-18 16:47:59,"赵一旦"  写道:
>这个问题现在应该还是无解的吧。那个topic-pattern看了下文档也不存在还,1.11不支持对吗?
>
>hailongwang <18868816...@163.com> 于2020年10月26日周一 下午8:37写道:
>
>> Hi s_hongliang,
>>  目前的 Master 分支(1.12) 版本支持一个 source 指定消费多个 topics。
>>  相关issue见:https://issues.apache.org/jira/browse/FLINK-18449
>>
>> 目前你可以拆成多个 Source 消费。
>>
>>
>>
>>
>> Best,
>> Hailong Wang
>>
>>
>>
>>
>> 在 2020-10-26 13:54:40,"奔跑的小飞袁"  写道:
>> >with (
>> >'connector'='kafka',
>> >
>>
>> >'topic'='cloud_behavior;cloud_behavior_other;cloud_behavior_qxb;cloud_behavior_cc;cloud_behavior_cs',
>> >'properties.bootstrap.servers'='',
>> >'properties.group.id'='flink_2_hive_and_imei_ncrypy_test',
>> >'format'='avro',
>> >'scan.startup.mode'='group-offsets'
>> >);
>> >
>> >这是我的配置,但是在执行的时候失败了,我想知道目前支持的flinksql版本是哪个
>> >
>> >
>> >
>> >--
>> >Sent from: http://apache-flink.147419.n8.nabble.com/
>>


Re:求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 hailongwang
应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进

在 2020-11-18 15:29:54,"huang botao"  写道:
>Hi ,请教一个奇怪的问题:
>
>streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
>
>.assignTimestampsAndWatermarks(new
>CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
>
>.connect(ruleConfigSource)
>.process(new MetricDataFilterProcessFunction())
>.keyBy((KeySelector) metric -> {
>MetricDataKey metricDataKey = new MetricDataKey();
>metricDataKey.setDomain(metric.getDomain());
>metricDataKey.setStationAliasCode(metric.getStaId());
>metricDataKey.setEquipMK(metric.getEquipMK());
>metricDataKey.setEquipID(metric.getEquipID());
>metricDataKey.setMetric(metric.getMetric());
>return metricDataKey;
>})
>
>.window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
>.apply(new RichWindowFunctionMetricDataKey, TimeWindow>() {
>@Override
>public void apply(MetricDataKey tuple, TimeWindow window,
>Iterable input, Collector out) throws
>Exception {
>input.forEach(x->{
>System.out.println("--->>>"+x);
>});
>}
>})
>
>我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);
>
>
>数据一直在消费着,没有任何报错信息


Re: Re: 回复:回复:回复:flinksql指定kafka多topic

2020-11-18 文章 赵一旦
这个问题现在应该还是无解的吧。那个topic-pattern看了下文档也不存在还,1.11不支持对吗?

hailongwang <18868816...@163.com> 于2020年10月26日周一 下午8:37写道:

> Hi s_hongliang,
>  目前的 Master 分支(1.12) 版本支持一个 source 指定消费多个 topics。
>  相关issue见:https://issues.apache.org/jira/browse/FLINK-18449
>
> 目前你可以拆成多个 Source 消费。
>
>
>
>
> Best,
> Hailong Wang
>
>
>
>
> 在 2020-10-26 13:54:40,"奔跑的小飞袁"  写道:
> >with (
> >'connector'='kafka',
> >
>
> >'topic'='cloud_behavior;cloud_behavior_other;cloud_behavior_qxb;cloud_behavior_cc;cloud_behavior_cs',
> >'properties.bootstrap.servers'='',
> >'properties.group.id'='flink_2_hive_and_imei_ncrypy_test',
> >'format'='avro',
> >'scan.startup.mode'='group-offsets'
> >);
> >
> >这是我的配置,但是在执行的时候失败了,我想知道目前支持的flinksql版本是哪个
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


?????? flink web ui ????????????

2020-11-18 文章 ????





----
??: 
   "user-zh"



使用window function后checkpoint fail,报错Operation category WRITE is not supported in state standby

2020-11-18 文章 kingdomad
flink 1.11.1消费kafka0.10.1.1,然后开窗口去重统计,时间是eventtime,窗口是1分钟。
程序的结构大致如下:
kafkaStream.keyBy().window().aggregate(newAverageAggregate());


flink on yarn,
程序能跑,但无法checkpoint,查看taskmanager的日志,发现报错如下。
查看了下,那几个节点都是正常的running。如果去掉窗口统计的代码,直接print 
kafkaStream,程序是可以正常checkpoint的。日志上也看不出其他问题,百思不得其解。求助各位大佬。






2020-11-18 13:30:52,475 INFO  org.apache.kafka.common.utils.AppInfoParser   
   [] - Kafka version : 0.10.2.2

2020-11-18 13:30:52,475 INFO  org.apache.kafka.common.utils.AppInfoParser   
   [] - Kafka commitId : cd80bc412b9b9701

2020-11-18 13:31:09,668 INFO  org.apache.hadoop.io.retry.RetryInvocationHandler 
   [] - 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): 
Operation category WRITE is not supported in state standby. Visit 
https://s.apache.org/sbnn-error

at 
org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:88)

at 
org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1952)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1423)

at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:776)

at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:475)

at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991)

at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869)

at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1685)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)

, while invoking ClientNamenodeProtocolTranslatorPB.create over xxx:8020 after 
1 failover attempts. Trying to failover after sleeping for 864ms.













--

kingdomad



Re: flink web ui 页面按钮控制

2020-11-18 文章 zilong xiao
Hi  祁洁


*  想问下你是怎么解决的呢?Thanks~*

祁洁 <1241502...@qq.com> 于2020年11月18日周三 下午5:13写道:

> 已解决。
>
>
>
>
> --原始邮件--
> 发件人:
>   "祁洁"
>   <
> 1241502...@qq.com;
> 发送时间:2020年11月18日(星期三) 中午11:01
> 收件人:"user-zh"
> 主题:flink web ui 页面按钮控制
>
>
>
> 请教一下,如何隐藏flink dashboard上的cancel job按钮?


??????flink web ui ????????????

2020-11-18 文章 ????





----
??: 
   ""   
 <1241502...@qq.com;
:2020??11??18??(??) 11:01
??:"user-zh"

Re: Flink cdc 多表关联处理延迟很大

2020-11-18 文章 jindy_liu
借宝地,我们的场景很像,cdc流与cdc的join,打成宽表。

我也遇到过这种问题,cdc流与cdc的join,
当数据量大的时候,容易出问题(我的checkpoint设置的时间是2小时,超时时间只设置成了10分钟,失败次数设置得超大,超时时长太长,设置成2小时,基本上数据都流不动)

1、snapshot 的时候,老是会有i/o问题。flink侧的日志就是这样的。
./flink--taskexecutor-0-flink-taskmanager-v1-11-2-fcf8f675f-gn8q8.log.5:146619:2020-11-14
00:19:53,578 ERROR io.debezium.connector.mysql.SnapshotReader  
[] - Failed due to error: Aborting snapshot due to error when last running
'SELECT * FROM `low_db`.`t_low_media`': Streaming result set
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@3d208504 is still
active. No statements may be issued when any streaming result sets are open
and in use on a given connection. Ensure that you have called .close() on
any active streaming result sets before attempting more queries.

MySQL侧的show processlist 就看不到'SELECT * FROM `low_db`.`t_low_media`'这个命令了。

猜测是因为反压严重,然后链路上没有数据传输,空闲太久,挂了

2、join后的view做sink的时候,由于有join在,在某些情况下,结果输出存在乱序情况(Retract流输出可能放到不同的subtask上),导致sink结果不对。除非并行度设置成1,不然大概率乱序!






--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-18 文章 赵一旦
2020-11-18 16:51:37
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
not found.
at org.apache.flink.runtime.io.network.partition.consumer.
RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
at org.apache.flink.runtime.io.network.partition.consumer.
RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765
)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
.java:670)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
CompletableFuture.java:646)
at java.util.concurrent.CompletableFuture$Completion.run(
CompletableFuture.java:456)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
.java:107)


请问这是什么问题呢?


求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 文章 huang botao
Hi ,请教一个奇怪的问题:

streamSource.flatMap(new ComeIntoMaxFlatMapFunction())

.assignTimestampsAndWatermarks(new
CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))

.connect(ruleConfigSource)
.process(new MetricDataFilterProcessFunction())
.keyBy((KeySelector) metric -> {
MetricDataKey metricDataKey = new MetricDataKey();
metricDataKey.setDomain(metric.getDomain());
metricDataKey.setStationAliasCode(metric.getStaId());
metricDataKey.setEquipMK(metric.getEquipMK());
metricDataKey.setEquipID(metric.getEquipID());
metricDataKey.setMetric(metric.getMetric());
return metricDataKey;
})

.window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
.apply(new RichWindowFunction() {
@Override
public void apply(MetricDataKey tuple, TimeWindow window,
Iterable input, Collector out) throws
Exception {
input.forEach(x->{
System.out.println("--->>>"+x);
});
}
})

我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x);


数据一直在消费着,没有任何报错信息


Re: pyflink利用sql ddl连接hbase-1.4.x出错Configuring the input format (null) failed: Cannot create connection to HBase

2020-11-18 文章 Wei Zhong
Hi 你好,

看root cause是  io.netty.channel.EventLoopGroup 
这个类找不到,能否检查一下classpath里是否包含netty的jar包,亦或相关jar包中是否shade了netty库?

> 在 2020年11月16日,17:02,ghostviper  写道:
> 
> *环境配置如下:*
> hbase-1.4.13
> flink-1.11.1
> python-3.6.1
> pyflink-1.0
> 
> *已做配置如下:*
> 1.hadoop classpath下已经加入hbase路径 (:/opt/hbase/hbase-1.4.13/lib/*)
> 2.程序ddl配置如下:
> 
> source_ddl = """CREATE TABLE MySourceTable (
> hbase_rowkey_name varchar, cf1 ROW) WITH (
>'connector.type' = 'hbase',
>'connector.version' = '1.4.3',  
>'connector.table-name' = 'flink-test',
>'connector.zookeeper.quorum' =
> 'es-zk-hadoop1:2181,es-zk-hadoop2:2181,es-zk-hadoop3:2181',
>'connector.zookeeper.znode.parent' = '/hbase')
> """
> 
> sink_ddl = """CREATE TABLE MySinkTable (
> hbase_rowkey_name varchar, cf1 ROW) WITH (
>'connector.type' = 'hbase',
>'connector.version' = '1.4.3',  
>'connector.table-name' = 'flink-test-result',
>'connector.zookeeper.quorum' =
> 'es-zk-hadoop1:2181,es-zk-hadoop2:2181,es-zk-hadoop3:2181',
>'connector.zookeeper.znode.parent' = '/hbase')
> """
> 3.zookeeper无鉴权
> 4.hive能关联访问hbase
> 5.hbase shell命令能正确执行
> 6.hbase 集群状态正常
> 7.hbase lib目录下jar包如下:
> ./hbase-common-1.4.3.jar
> ./flink-connector-hbase_2.11-1.11.1.jar
> ./hbase-client-1.4.3.jar
> ./hbase-protocol-1.4.3.jar
> 
> 
> *出错信息如下:*
> Traceback (most recent call last):
>  File "read_hbase.py", line 46, in 
>st_env.execute("7-read_and_hbase")
>  File
> "/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/pyflink/table/table_environment.py",
> line 1057, in execute
>return JobExecutionResult(self._j_tenv.execute(job_name))
>  File
> "/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File
> "/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/pyflink/util/exceptions.py",
> line 147, in deco
>return f(*a, **kw)
>  File
> "/home/chenxiaoyun/.pyenv/versions/3.6.1/lib/python3.6/site-packages/py4j/protocol.py",
> line 328, in get_return_value
>format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
> : org.apache.flink.util.FlinkException: Failed to execute job
> '7-read_and_hbase'.
>   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1823)
>   at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
>   at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>   at
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
> submit job.
>   at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
>   at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>   at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>   at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 

Re: pyflink1.11 window groupby出错

2020-11-18 文章 Xingbo Huang
Hi,

我怀疑还是你的环境不干净导致的,你pip list | grep
apache-flink看下版本是不是确实是这个。因为很多时候用户会在机器上装好几个python环境。

Best,
Xingbo

anfeng  于2020年11月18日周三 上午9:40写道:

> 我是在playgrounds环境跑到,  不过我检查的apache flink 是1.11.2;
> 跟这个会有关系吗
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: pyflink 1.11 运行pyflink作业时报错

2020-11-18 文章 Wei Zhong
Hi 你好,

只看目前的报错看不出问题来,请问能贴出出错部分的job源码吗?

> 在 2020年11月17日,16:58,whh_960101  写道:
> 
> Hi,各位大佬,pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table 
> insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in 
> from_kafka_to_oracle_demo
>
> main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result()
>  File 
> "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/table/table.py", 
> line 783, in execute_insert
>return TableResult(self._j_table.executeInsert(table_path, overwrite))
>  File 
> "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", 
> line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File 
> "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py",
>  line 154, in deco
>raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: 'Failed to execute sql'
> org.apache.flink.client.program.ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at 
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)是缺少jar包吗,我在opt、lib目录下都放了flink-sql-client_2.11-1.11.1.jar,'Failed
>  to execute sql 是什么原因
> 
> 
> 
> 
> 
> 
> 
> 
>