Re: 如何获取Flink table api/sql code gen 代码

2019-08-08 文章 Zhenghua Gao
Currently Flink DO NOT provides a direct way to get code gen code. But
there are indirect ways to try.
1) debug in IDE
Flink use Janino to compile all code gen code, and there is a single entry
point [1]

for
Blink planner, [2]

for
old planner, you can set breakpoint there and get the code.

2) enable debug logging
Blink planner logging code in CompileUtils, and old planner logging code in
subclass of Compiler

3) use Janino options
Janino caches code in tmp directory, and you can enable these options[3]
.
Note: org.codehaus.janino.source_debugging.keep is not supported in current
Janino version, which means this method can only be used to debug in
IDE(need breakpoint to keep source code)

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/CompileUtils.java
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/Compiler.scala
[3]
https://github.com/janino-compiler/janino/blob/master/janino/src/main/java/org/codehaus/janino/Scanner.java#L71

*Best Regards,*
*Zhenghua Gao*


On Wed, Aug 7, 2019 at 12:02 AM Vincent Cai  wrote:

> Hi all,
> 在Spark中,可以通过调用Dataset的queryExecution.debug.codegen() 方法获得 Catalyst 产生的代码。
> 在Flink是否有类似的方法可以获得code gen的代码?
>
>
> 参考链接:
> https://medium.com/virtuslab/spark-sql-under-the-hood-part-i-26077f85ebf0
>
>
> Regards
> Vincent  Cai


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 wangl...@geekplus.com.cn

抱歉,是我搞错了。
实际上是写入数据的。我在 windows 下做测试,刷新下文件的大小始终是 0 , 只有编辑看下那个文件显示的文件大小才会变更。



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-09 10:17
Receiver: user-zh
Subject: Re: Re: CsvTableSink 目录没有写入具体的数据
没数据是因为没有trigger执行,  参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html
)
 
// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalentlyStreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
// create a TableSinkTableSink sink = new
CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schemaString[] fieldNames =
{"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT,
Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable",
fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL
queriesTable result = ...// emit the result Table to the registered
TableSinkresult.insertInto("CsvSinkTable");
// execute the program
 
加上 tableEnv.execute();
 
 
wangl...@geekplus.com.cn  于2019年8月9日周五 上午9:42写道:
 
>
> 我接入了一个 RocketMQ 的流作为输入。
>
>
>  DataStream> ds = env.addSource(new
> RocketMQSource(
> 
> System.out.println(res);
> return res;
> }
> });
>
>
> tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
> pick_list_no, sku_code");
>
> TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
> String[] fieldNames = {"num"};
> TypeInformation[] fieldTypes = {Types.INT};
> tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
> csvSink);
> tableEnv.sqlUpdate(
> "INSERT INTO RubberOrders SELECT pick_task_id FROM
> t_pick_task");
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Alec Chen
> Send Time: 2019-08-08 21:01
> Receiver: user-zh
> Subject: Re: CsvTableSink 目录没有写入具体的数据
> 完整代码发一下
>
> wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
>
> >
> > 我按官网上的
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> > 例子写的代码
> > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
>


Re: Flink sql join问题

2019-08-08 文章 Zhenghua Gao
可以试下最新flink 1.9 blink
planner的firstRow/lastRow优化[1]能否满足你的需求,目前的限制是只能基于procTime来去重。

* e.g.
* 1. {{{
* SELECT a, b, c FROM (
*   SELECT a, b, c, proctime,
*  ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as row_num
*   FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps first row.
* 2. {{{
* SELECT a, b, c FROM (
*   SELECT a, b, c, proctime,
*  ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) as row_num
*   FROM MyTable
* ) WHERE row_num <= 1
* }}} will be converted to StreamExecDeduplicate which keeps last row.


[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecDeduplicateRule.scala

*Best Regards,*
*Zhenghua Gao*


On Tue, Aug 6, 2019 at 2:28 PM huang  wrote:

> Hi all,
>
>
> 请问用Flink
> sql做双流join。如果希望两个流都只保存每个key的最新的数据,这样相当于每次join都只输出最新的一条记录。请问这种场景sql支持吗
>
>
> thanks


如何讓兩個 SQL 使用相同的 KafkaTableSource

2019-08-08 文章 Tony Wei
Hi

我在我的 flink job 中透過 `flinkTableEnv.connect(new
Kafka()...).registerTableSource(...)` 註冊了
一張 kafka table。但從文件上我才知道 SQL 只會在特定的條件下才會真正的轉為 DataStream,比
如說呼叫了Table#toRetractStream`。

因為如此,我發現當我嘗試在同一個 flink job 中使用了不同的 SQL 時,他們會同時產生各自的
kafka source operator。從 flink 的角度來說可能不是什麼大問題,各自獨立的 operator 會各自管理
好自己的 offset state,也不會互相影響。但是從 kafka 方面來看,因為兩邊都是使用相同的
group_id,當 offset 被 commit 回 kafka 時,就會在 kafka 端有衝突。

我想要確保每個 group_id 只會被一個 operator 負責執行 commit 的動作。最簡單的做法可能是故意
為相同的 kafka topic 註冊兩個名稱不同的 table, group_id,分別給兩個 SQL 使用。但我想知道是
不是有更好的做法,可以讓兩個 SQL 是真正的從同一個 kafka operator 讀取資料?這樣也不需要同
時存在兩個做一樣事情的 kafka operator 。先謝謝各位的幫助。

Best,
Tony Wei


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 Alec Chen
没数据是因为没有trigger执行,  参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html
)

// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalentlyStreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
// create a TableSinkTableSink sink = new
CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schemaString[] fieldNames =
{"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT,
Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable",
fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL
queriesTable result = ...// emit the result Table to the registered
TableSinkresult.insertInto("CsvSinkTable");
// execute the program

加上 tableEnv.execute();


wangl...@geekplus.com.cn  于2019年8月9日周五 上午9:42写道:

>
> 我接入了一个 RocketMQ 的流作为输入。
>
>
>  DataStream> ds = env.addSource(new
> RocketMQSource(
> 
> System.out.println(res);
> return res;
> }
> });
>
>
> tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
> pick_list_no, sku_code");
>
> TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
> String[] fieldNames = {"num"};
> TypeInformation[] fieldTypes = {Types.INT};
> tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
> csvSink);
> tableEnv.sqlUpdate(
> "INSERT INTO RubberOrders SELECT pick_task_id FROM
> t_pick_task");
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Alec Chen
> Send Time: 2019-08-08 21:01
> Receiver: user-zh
> Subject: Re: CsvTableSink 目录没有写入具体的数据
> 完整代码发一下
>
> wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
>
> >
> > 我按官网上的
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> > 例子写的代码
> > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
>


Re: flink-1.8.1 yarn per job模式使用

2019-08-08 文章 Zili Chen
刚发现 user-zh 是有 archive[1] 的,上面提到过的跟你类似的问题是这个 thread[2]。

Best,
tison.

[1] https://lists.apache.org/list.html?user-zh@flink.apache.org
[2]
https://lists.apache.org/thread.html/061d8e48b091b27e797975880c193838f2c37894c2a90aa6a6e83d36@%3Cuser-zh.flink.apache.org%3E

Yuhuan Li  于2019年8月7日周三 下午7:57写道:

> 非常感谢tison,完美的解决了我的问题,以后会多留意社区问题。
>
> 具体到自己的hadoop版本,就是在flink工程编译
> flink-1.8.1/flink-shaded-hadoop/flink-shaded-hadoop2-uber/target
> 的jar放在lib下即可
>
> Zili Chen  于2019年8月7日周三 下午7:33写道:
>
> > 这个问题以前邮件列表有人提过...不过现在 user-zh 没有 archive 不好引用。
> >
> > 你看下是不是 lib 下面没有 flink-shaded-hadoop-2-uber--7.0.jar
> 这样一个文件。
> >
> > 1.8.1 之后 FLINK 把 hadoop(YARN) 的 lib 分开 release 了,你要指定自己的 HADOOP_CLASSPATH
> > 或者下载 FLINK 官网 pre-bundle 的 hadoop。
> >
> > 具体可以看这个页面(https://flink.apache.org/downloads.html)第一段的内容。
> >
> > Best,
> > tison.
> >
> >
> > 李玉环  于2019年8月7日周三 下午7:15写道:
> >
> > > Hi 大家好:
> > >
> > > 在使用flink过程中,运行官网给的命令
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > > 报错如下:
> > >
> > > ➜  flink-1.8.1 ./bin/flink run -m yarn-cluster
> > > ./examples/batch/WordCount.jar
> > > 
> > >  The program finished with the following exception:
> > >
> > > java.lang.RuntimeException: Could not identify hostname and port in
> > > 'yarn-cluster'.
> > > at
> > >
> > >
> >
> org.apache.flink.client.ClientUtils.parseHostPortAddress(ClientUtils.java:47)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.AbstractCustomCommandLine.applyCommandLineOptionsToConfiguration(AbstractCustomCommandLine.java:83)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:60)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.DefaultCLI.createClusterDescriptor(DefaultCLI.java:35)
> > > at
> > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224)
> > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> > > at
> > >
> > >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> > >
> > >
> > > 疑问:
> > > 1.为什么会将 yarn-clustet解析为host?
> > > 2.要运行single flink job on yarn的正确姿势是啥?
> > >
> > > Best,
> > > Yuhuan
> > >
> >
>


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 wangl...@geekplus.com.cn
   
我接入了一个 RocketMQ 的流作为输入。


 DataStream> ds = env.addSource(new 
RocketMQSource(

System.out.println(res);
return res;
}
});


tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, 
pick_list_no, sku_code");

TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
String[] fieldNames = {"num"};
TypeInformation[] fieldTypes = {Types.INT};
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, 
csvSink);
tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT pick_task_id FROM 
t_pick_task");



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-08 21:01
Receiver: user-zh
Subject: Re: CsvTableSink 目录没有写入具体的数据
完整代码发一下
 
wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
 
>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> wangl...@geekplus.com.cn
>


Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 Alec Chen
完整代码发一下

wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:

>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> wangl...@geekplus.com.cn
>


Re: need help

2019-08-08 文章 Biao Liu
你好,

异常里可以看出 AskTimeoutException, 可以调整两个参数 akka.ask.timeout 和 web.timeout
再试一下,默认值如下

akka.ask.timeout: 10 s
web.timeout: 1

PS: 搜 “AskTimeoutException Flink” 可以搜到很多相关答案

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 7:33 PM 陈某  wrote:

>
>
> -- Forwarded message -
> 发件人: 陈某 
> Date: 2019年8月8日周四 下午7:25
> Subject: need help
> To: 
>
>
> 你好,我是一个刚接触flink的新手,在搭建完flink on
> yarn集群后,依次启动zookeeper,hadoop,yarn,flkink集群,并提交认识到yarn上时运行遇到问题,网上搜索相关问题,暂未找到解决方式,希望能得到帮助,谢谢。
>
> 使用的运行指令为:
> [root@flink01 logs]# flink run -m  yarn-cluster
> ./examples/streaming/WordCount.jar
> 查看log后错误信息如下:(附件中为完整的log文件)
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 91e82fd8626bde4c901bf0b1639e12e7)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:208)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error.,  akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#2035575525]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at
> 

CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 wangl...@geekplus.com.cn

我按官网上的 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
  例子写的代码
但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?



wangl...@geekplus.com.cn


Fwd: need help

2019-08-08 文章 陈某
-- Forwarded message -
发件人: 陈某 
Date: 2019年8月8日周四 下午7:25
Subject: need help
To: 


你好,我是一个刚接触flink的新手,在搭建完flink on
yarn集群后,依次启动zookeeper,hadoop,yarn,flkink集群,并提交认识到yarn上时运行遇到问题,网上搜索相关问题,暂未找到解决方式,希望能得到帮助,谢谢。

使用的运行指令为:
[root@flink01 logs]# flink run -m  yarn-cluster
./examples/streaming/WordCount.jar
查看log后错误信息如下:(附件中为完整的log文件)
org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result. (JobID: 91e82fd8626bde4c901bf0b1639e12e7)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:208)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., 

Re: flink 结合canal统计订单gmv

2019-08-08 文章 Alec Chen
Hi,

截图无法显示, 不知道你是使用FlinkSQL还是DataStreamAPI实现, 前者可以参考UDTF, 后者可以参考FlatMap "Takes
one element and produces zero, one, or more elements. A flatmap function
that splits sentences to words"
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/

王飞  于2019年8月8日周四 下午4:53写道:

> hi 你好
> 需要用flink 解析mysql的binlog 统计订单表  产品维度的gmv,
> 但是一个insert的binlog 会出现同时购买多个订单 会出现一个集合的订单集合 但是我想统计订单里面产品维度的gmv,如下图
> 返回的是一个list的订单集合,但是我想取每个订单里面的产品id进行维度统计 ,请问flink 有什么算子 可以把一个list数据的流数据
> 变成多条流
> 谢谢
>
>
>
>


Re: 关于event-time的定义与产生时间戳位置的问题。

2019-08-08 文章 Alec Chen
Hi,
Q: event time这个时间戳是在什么时候打到数据上面去的,
A: event time按字面意思理解为event发生的时间, 如果产生数据的设备提供了记录时间的字段, 并且业务逻辑也需要使用这个时间,
则可以将该时间作为event time. 更多信息可以参考
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
关于event
time, processing time的描述

zhaoheng.zhaoh...@qq.com  于2019年8月8日周四 下午4:36写道:

>
> hi,all:
>   event time这个时间戳是在什么时候打到数据上面去的,看api是在flink
> source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka
> source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。
>
> 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗?
>   不知道有哪里是我理解不对的地方望指教!
>   祝好~
>


flink 结合canal统计订单gmv

2019-08-08 文章 王飞
hi 你好
需要用flink 解析mysql的binlog 统计订单表  产品维度的gmv,
但是一个insert的binlog 会出现同时购买多个订单 会出现一个集合的订单集合 但是我想统计订单里面产品维度的gmv,如下图
返回的是一个list的订单集合,但是我想取每个订单里面的产品id进行维度统计 ,请问flink 有什么算子 可以把一个list数据的流数据 变成多条流
谢谢

回复:Re: Re: submit jobGraph error on server side

2019-08-08 文章 王智
感谢大神,

是我配置的资源太少导致响应慢,导致akka 超时。




现在我换了一个k8s 集群,调大了资源,已经不再配到邮件中的发生的异常。









原始邮件



发件人:"Zili Chen"< wander4...@gmail.com ;

发件时间:2019/8/7 15:32

收件人:"王智"< ben.wa...@foxmail.com ;

抄送人:"user-zh"< user-zh@flink.apache.org ;

主题:Re: Re: submit jobGraph error on server side




从错误堆栈上看你的请求应该是已经发到 jobmanager 上了,也就是不存在找不到端口的问题。
但是 jobmanager 在处理 submit job 的时候某个动作超时了。你这个问题是一旦把
gateway 分开就稳定复现吗?也有可能是 akka 偶然的超时。


Best,
tison.










王智