回复:关于statement输出结果疑问

2021-03-17 Thread Shuai Xia
Hi,大佬,想问下如果使用Lazy调度模式,情况会是什么样子


--
发件人:Dian Fu 
发送时间:2021年3月15日(星期一) 15:49
收件人:刘杰鸿 
抄 送:user-zh 
主 题:Re: 关于statement输出结果疑问

奥,那你理解错了。这里面其实细分成2种情况:
- sink1和sink2,通过operator 
chain之后,和source节点位于同一个物理节点:由于执行的时候,在一个JVM里,每处理一条数据,先发送给其中一个sink,再发送给另外一个sink。然后处理下一条数据
- sink1 
和sink2,和source节点处于不同一个物理节点:这个时候是纯异步的,每处理一条数据,会分别通过网络发送给sink1和sink2,同时网络有buffer,所以sink1和sink2收到数据的顺序是完全不确定的。

但是不管是上面哪种情况,都不是先把数据全部发送给其中一个sink;再全部发送给第二个sink。

> 2021年3月12日 下午10:52,刘杰鸿  写道:
> 
> 我的理解是代码会先把source里面的数据全部插入到sink1,这时候就是1,2。然后执行到add_insert_sql代码的时候,将source数据全部插入到sink2里面,这时候也是1,2。
> 所以输出应该是1,2,1,2
> 
> -- 原始邮件 --
> 发件人: "Dian Fu" ;
> 发送时间: 2021年3月12日(星期五) 晚上10:24
> 收件人: "user-zh";"刘杰鸿";
> 主题: Re: 关于statement输出结果疑问
> 
> 可以说一下为什么你觉得输出结果应该是1,2,1,2吗?
> 
> 个人觉得现在的输出结果应该没有问题。比如第1条数据,先发送到sink1,再发送到sink2,所以打印了2个1;然后处理第二条数据,打印了2个2
> 
> On Mon, Mar 8, 2021 at 9:42 AM 刘杰鸿  > wrote:
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> 
> env_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(environment_settings=env_settings)
> 
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
> table_env.create_temporary_view("simple_source", table)
> table_env.execute_sql("""
> CREATE TABLE first_sink_table (
> id BIGINT, 
> data VARCHAR 
> ) WITH (
> 'connector' = 'print'
> )
> """)
> table_env.execute_sql("""
> CREATE TABLE second_sink_table (
> id BIGINT, 
> data VARCHAR
> ) WITH (
> 'connector' = 'print'
> )
> """)
> # 创建一个statement对象
> statement_set = table_env.create_statement_set()
> # 使用TABLE API 将table表插进first_sink_table表里面
> statement_set.add_insert("first_sink_table", table)
> # 使用SQL将table表插进second_sink_table表里面
> statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM 
> simple_source")
> # 执行查询
> statement_set.execute().wait()我执行上面脚本之后,输出以下的结果。在我理解输出内容的应该是1,2,1,2;而不是1,1,2,2。4>
>  +I(1,Hi)
> 4> +I(1,Hi)
> 4> +I(2,Hello)
> 4> +I(2,Hello)



Re: flinksql读取format为avro的kafka topic 报错 ArrayIndexOutOfBoundsException

2021-03-17 Thread wxpcc
声明可以尝试加上 not null,避免变为 union类型
如: 
headers map not null,
`body` String not null



--
Sent from: http://apache-flink.147419.n8.nabble.com/


【flink sql group by 时间窗口】

2021-03-17 Thread guoyb
版本1.12.1

请问,支持事件时间吗?应该设置为哪种时间类型。


as_of_time time
group by xx
, tumble( as_of_time, interval "5" second)


sql client报错,
window aggregate can only be defined over a time attribute column, but time(0) 
encountered

Re: Flink sql 实现全局row_number()分组排序

2021-03-17 Thread Kurt Young
直接 SQL Top-N 即可:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n

Best,
Kurt


On Tue, Mar 16, 2021 at 3:40 PM Tian Hengyu  wrote:

> 咋么有人啊~~~
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


interval join 如何用 process time

2021-03-17 Thread guomuhua
在 flink sql 中,可以使用 proc time 来进行 interval join,但是在 stream api 中,只能用 event
time 进行 interval join,如何能使用 process time 呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:【flink sql group by 时间窗口】

2021-03-17 Thread guoyb
这个问题换种写法解决了,从MySQL数据库表里取时间戳字段再转timestamp,可以实现滚动窗口,没报错。


从MySQL表里直接取datetime类型,jdbc表flink设置timestamp类型,会报错,直接取source的时间类型字段是不是转换有点问题。



---原始邮件---
发件人: "guoyb"<861277...@qq.com>
发送时间: 2021年3月17日(周三) 下午5:43
收件人: "user-zh"

Re: interval join 如何用 process time

2021-03-17 Thread HunterXHunter
你不定义watermark好像用的就是pro time



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: interval join 如何用 process time

2021-03-17 Thread JasonLee
hi

proctime as PROCTIME() 可以这样设置



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


重复class应如何选择

2021-03-17 Thread nicygan
dear all:
 我在flink中遇到一些重复的类名(flink1.12)
 比如:
 org.apache.flink.streaming.api.windowing.triggers.Trigger
 org.apache.flink.table.runtime.operators.window.triggers.Trigger


 org.apache.flink.streaming.api.watermark.Watermark
 org.apache.flink.api.common.eventtime.Watermark


 而且他们的内容都非常似,只有极少量的差别。
 这些不同包里的类,在使用上有什么差别吗?是否有必要统一下,让使用更方便。
 我在使用时,比如要实现自定义trigger,应如何选择?
 


| |
nicygan
|
|
read3...@163.com
|



Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-03-17 Thread Frost Wong
Hi 大家好

我用的Flink on yarn模式运行的一个任务,每隔几个小时就会出现一次错误

2021-03-18 08:52:37,019 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
checkpoint 661818 for job 4fa72fc414f53e5ee062f9fbd5a2f4d5 (562357 bytes in 
4699 ms).
2021-03-18 08:52:37,637 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 661819 (type=CHECKPOINT) @ 1616028757520 for job 
4fa72fc414f53e5ee062f9fbd5a2f4d5.
2021-03-18 08:52:42,956 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
checkpoint 661819 for job 4fa72fc414f53e5ee062f9fbd5a2f4d5 (2233389 bytes in 
4939 ms).
2021-03-18 08:52:43,528 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
checkpoint 661820 (type=CHECKPOINT) @ 1616028763457 for job 
4fa72fc414f53e5ee062f9fbd5a2f4d5.
2021-03-18 09:12:43,528 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Checkpoint 
661820 of job 4fa72fc414f53e5ee062f9fbd5a2f4d5 expired before completing.
2021-03-18 09:12:43,615 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable 
failure threshold.
at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_231]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_231]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_231]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 ~[?:1.8.0_231]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_231]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_231]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
2021-03-18 09:12:43,618 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
csmonitor_comment_strategy (4fa72fc414f53e5ee062f9fbd5a2f4d5) switched from 
state RUNNING to RESTARTING.
2021-03-18 09:12:43,619 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map 
(43/256) (18dec1f23b95f741f5266594621971d5) switched from RUNNING to CANCELING.
2021-03-18 09:12:43,622 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map 
(44/256) (3f2ec60b2f3042ceea6e1d660c78d3d7) switched from RUNNING to CANCELING.
2021-03-18 09:12:43,622 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map 
(45/256) (66d411c2266ab025b69196dfec30d888) switched from RUNNING to CANCELING.
然后就自己恢复了。用的是Unaligned 
Checkpoint,rocksdb存储后端,在这个错误前后也没有什么其他报错信息。从Checkpoint的metrics看,总是剩最后一个无法完成,调整过parallelism也无法解决问题。

谢谢大家!


在FlinkKafkaProducer获取sink表的建表key

2021-03-17 Thread Jimmy Zhang
Hi!大家好。
目前有一个需求,需要获取Kafka 
sink表的所有建表字段,而且需要在FlinkKafkaProducer中进行操作,看了源码,没有找到获取这个信息的接口,大家有知道的吗?非常感谢!
例如:CREATE TABLE kafkaTable (

 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset'
) 
想获取到   user_id, item_id ,category_id ,behavior这四个字段。


| |
Jimmy Zhang
|
|
13669299...@163.com
|
签名由网易邮箱大师定制

Re: interval join 如何用 process time

2021-03-17 Thread guomuhua
你说的这个语法是flink sql 吧,我想问的是stream api 里面怎么用



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-03-17 Thread yidan zhao
设置下检查点失败不影响任务呀,你这貌似还导致任务重启了?

Frost Wong  于2021年3月18日周四 上午10:38写道:

> Hi 大家好
>
> 我用的Flink on yarn模式运行的一个任务,每隔几个小时就会出现一次错误
>
> 2021-03-18 08:52:37,019 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 661818 for job 4fa72fc414f53e5ee062f9fbd5a2f4d5 (562357 bytes in
> 4699 ms).
> 2021-03-18 08:52:37,637 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 661819 (type=CHECKPOINT) @ 1616028757520 for job
> 4fa72fc414f53e5ee062f9fbd5a2f4d5.
> 2021-03-18 08:52:42,956 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 661819 for job 4fa72fc414f53e5ee062f9fbd5a2f4d5 (2233389 bytes
> in 4939 ms).
> 2021-03-18 08:52:43,528 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 661820 (type=CHECKPOINT) @ 1616028763457 for job
> 4fa72fc414f53e5ee062f9fbd5a2f4d5.
> 2021-03-18 09:12:43,528 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Checkpoint 661820 of job 4fa72fc414f53e5ee062f9fbd5a2f4d5 expired before
> completing.
> 2021-03-18 09:12:43,615 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to
> recover from a global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_231]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_231]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> ~[?:1.8.0_231]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> ~[?:1.8.0_231]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_231]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_231]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
> 2021-03-18 09:12:43,618 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
> csmonitor_comment_strategy (4fa72fc414f53e5ee062f9fbd5a2f4d5) switched from
> state RUNNING to RESTARTING.
> 2021-03-18 09:12:43,619 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map
> (43/256) (18dec1f23b95f741f5266594621971d5) switched from RUNNING to
> CANCELING.
> 2021-03-18 09:12:43,622 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map
> (44/256) (3f2ec60b2f3042ceea6e1d660c78d3d7) switched from RUNNING to
> CANCELING.
> 2021-03-18 09:12:43,622 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map
> (45/256) (66d411c2266ab025b69196dfec30d888) switched from RUNNING to
> CANCELING.
> 然后就自己恢复了。用的是Unaligned
> Checkpoint,rocksdb存储后端,在这个错误前后也没有什么其他报错信息。从Checkpoint的metrics看,总是剩最后一个无法完成,调整过parallelism也无法解决问题。
>
> 谢谢大家!
>


Flink1.11如何实现Tumble Window后基于event time倒序取第一条作统计

2021-03-17 Thread Hush
Hi 大家好


现在想对5分钟的kafka数据开窗,因为是DTS同步消息数据,会有update 和 
delete,所以需要对相同user_id的数据根据事件时间倒序第一条,统计最后一次status(状态字段)共有多少人。


marketingMapDS: DataStream[(String, String, Long)]
|
tEnv.createTemporaryView("test", marketingMapDS,$"status", $"upd_user_id", 
$"upd_time".rowtime)
val resultSQL =
  """
|SELECT t.status,
| COUNT(t.upd_user_id) as num
|FROM (
|SELECT  *,
|  ROW_NUMBER() OVER (PARTITION BY upd_user_id ORDER BY 
upd_time DESC) as row_num
|FROM test
|) t
|WHERE t.row_num = 1
|GROUP BY t.status, TUMBLE(t.upd_time, INTERVAL '5' MINUTE)
|""".stripMargin
val table2 = tEnv.sqlQuery(resultSQL)
val resultDS = tEnv.toRetractStream[Row](table2)
|


这样写后会报以下错:
| Exception in thread "main" org.apache.flink.table.api.TableException: 
GroupWindowAggregate doesn't support consuming update and delete changes which 
is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[upd_user_id], 
orderBy=[upd_time DESC], select=[status, upd_user_id, upd_time]) |


所以想实现该需求,请问还可以怎么实现。。。


TABLE API 可以实现 类似 ROW_NUMBER() OVER 这样功能吗?
|
 val table = tEnv.fromDataStream(marketingMapDS, $"status", $"upd_user_id", 
$"upd_time".rowtime)
  .window(Tumble over 5.millis on $"upd_time" as "w")
  .groupBy($"w")
???
|


Flink新手一个。。。请大佬指点~



回复: Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-03-17 Thread Frost Wong
哦哦,我看到了有个

setTolerableCheckpointFailureNumber

之前不知道有这个方法,倒是可以试一下,不过我就是不太理解为什么会失败,也没有任何报错

发件人: yidan zhao 
发送时间: 2021年3月18日 3:47
收件人: user-zh 
主题: Re: Flink 1.12.0 隔几个小时Checkpoint就会失败

设置下检查点失败不影响任务呀,你这貌似还导致任务重启了?

Frost Wong  于2021年3月18日周四 上午10:38写道:

> Hi 大家好
>
> 我用的Flink on yarn模式运行的一个任务,每隔几个小时就会出现一次错误
>
> 2021-03-18 08:52:37,019 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 661818 for job 4fa72fc414f53e5ee062f9fbd5a2f4d5 (562357 bytes in
> 4699 ms).
> 2021-03-18 08:52:37,637 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 661819 (type=CHECKPOINT) @ 1616028757520 for job
> 4fa72fc414f53e5ee062f9fbd5a2f4d5.
> 2021-03-18 08:52:42,956 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 661819 for job 4fa72fc414f53e5ee062f9fbd5a2f4d5 (2233389 bytes
> in 4939 ms).
> 2021-03-18 08:52:43,528 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 661820 (type=CHECKPOINT) @ 1616028763457 for job
> 4fa72fc414f53e5ee062f9fbd5a2f4d5.
> 2021-03-18 09:12:43,528 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Checkpoint 661820 of job 4fa72fc414f53e5ee062f9fbd5a2f4d5 expired before
> completing.
> 2021-03-18 09:12:43,615 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to
> recover from a global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_231]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_231]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> ~[?:1.8.0_231]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> ~[?:1.8.0_231]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_231]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_231]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
> 2021-03-18 09:12:43,618 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
> csmonitor_comment_strategy (4fa72fc414f53e5ee062f9fbd5a2f4d5) switched from
> state RUNNING to RESTARTING.
> 2021-03-18 09:12:43,619 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map
> (43/256) (18dec1f23b95f741f5266594621971d5) switched from RUNNING to
> CANCELING.
> 2021-03-18 09:12:43,622 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map
> (44/256) (3f2ec60b2f3042ceea6e1d660c78d3d7) switched from RUNNING to
> CANCELING.
> 2021-03-18 09:12:43,622 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map
> (45/256) (66d411c2266ab025b69196dfec30d888) switched from RUNNING to
> CANCELING.
> 然后就自己恢复了。用的是Unaligned
> Checkpoint,rocksdb存储后端,在这个错误前后也没有什么其他报错信息。从Checkpoint的metrics看,总是剩最后一个无法完成,调整过parallelism也无法解决问题。
>
> 谢谢大家!
>


pyflink UDTF求助!

2021-03-17 Thread 陈康
定制UDTF想要拆分字符串、但是报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
毫无头绪、有大佬遇到过吗?谢谢!
好像是 udf 和 udtf 一起使用时出现的~下面有可复现的例子,谢谢

class myKerasMLP(ScalarFunction):

def eval(self, *args):
...
# 返回预测结果
return str(trueY[0][0]) + '|' + str(trueY[0][1])

注册UDF函数
myKerasMLP = udf(myKerasMLP(),
 input_types=[DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING(),
  DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()],
 result_type=DataTypes.STRING())

class SplitStr(TableFunction):
def eval(self, str_value):
str_arr = str_value.split('|')
yield str_arr[0], str_arr[1]
yield str_arr[0], str_arr[1]

注册UDTF函数
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.STRING(),
DataTypes.STRING()])

t_env.register_function('train_and_predict', myKerasMLP)
t_env.register_function("splitStr", splitStr)

==

t_env.sql_query("""
select A.hotime ,
A.before_ta ,
A.before_rssi ,
A.after_ta ,
A.after_rssil ,
A.nb_tath ,
A.nb_rssith ,
nbr_rssi ,
nbr_ta
from (SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil,
nb_tath, nb_rssith) predict
FROM
source) as  A, LATERAL TABLE(splitStr(predict)) as T(nbr_rssi, nbr_ta)
""").insert_into("predict_sink")


报错 java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
Traceback (most recent call last):
  File
"C:/Users/Administrator.XTZ-02012091146/PycharmProjects/pythonProject/kerasTest/UdtfNtPredictPyFlink.py",
line 280, in 
t_env.execute('NT重连预测参数')
  File
"D:\tools\Python3.6.5\lib\site-packages\pyflink\table\table_environment.py",
line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
  File "D:\tools\Python3.6.5\lib\site-packages\py4j\java_gateway.py", line
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "D:\tools\Python3.6.5\lib\site-packages\pyflink\util\exceptions.py",
line 147, in deco
return f(*a, **kw)
  File "D:\tools\Python3.6.5\lib\site-packages\py4j\protocol.py", line 328,
in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o25.execute.
: java.lang.IndexOutOfBoundsException: Index: 7, Size: 7
at java.util.ArrayList.rangeCheck(ArrayList.java:657)


这段SQL可以执行
t_env.sql_query("""
SELECT
hotime ,
before_ta ,
before_rssi ,
after_ta ,
after_rssil ,
nb_tath ,
nb_rssith ,
train_and_predict(hotime, before_ta, before_rssi, after_ta, after_rssil,
nb_tath, nb_rssith) predict
FROM
source
""").insert_into("print_table")

--
+I(37,14,-66,92,-74,24,-65,22.621065|-64.12096)
+I(291,136,-76,136,-78,22,-65,19.479145|-65.958)
--


== 简单可复现的例子 

===SQL 源=
/*
Navicat MySQL Data Transfer

Source Server : localhost
Source Server Version : 50717
Source Host   : localhost:3306
Source Database   : nufront-nt

Target Server Type: MYSQL
Target Server Version : 50717
File Encoding : 65001

Date: 2021-03-13 14:23:41
*/

SET FOREIGN_KEY_CHECKS=0;

-- 
-- Table structure for test
-- 
DROP TABLE IF EXISTS `test`;
CREATE TABLE `test` (
  `hotime` varchar(5) DEFAULT NULL,
  `before_ta` varchar(5) DEFAULT NULL,
  `before_rssi` varchar(10) DEFAULT NULL,
  `after_ta` varchar(5) DEFAULT NULL,
  `after_rssil` varchar(10) DEFAULT NULL,
  `nb_tath` varchar(5) DEFAULT NULL,
  `nb_rssith` varchar(10) DEFAULT NULL,
  `predict` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 
-- Records of test
-- 
INSERT INTO `test` VALUES ('35', '8', '-62', '136', '-65', '20', '-65',
'22.30014|-63.884907');
INSERT INTO `test` VALUES ('43', '8', '-71', '248', '-73', '20', '-65',
'20.598848|-65.127464');
INSERT INTO `test` VALUES ('82', '216', '-74', '208', '-74', '20', '-65',
'14.919615|-66.15158');

== 程序 ===

# -*- coding: utf-8 -*
import logging
import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env,
environment_settings=env_settings)
# 设置该参数以使用 UDF
t_env.get_config().get_configuration().set_bo

Re: Flink1.11如何实现Tumble Window后基于event time倒序取第一条作统计

2021-03-17 Thread HunterXHunter
GroupWindowAggregate不支持update或者delete的datasource。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: pyflink UDTF求助!

2021-03-17 Thread 陈康
apache-flink 1.11.1



--
Sent from: http://apache-flink.147419.n8.nabble.com/