flink1.10.1??????????????org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.TimeoutException

2020-12-24 文章 bigdata

The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: java.util.concurrent.TimeoutException
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.util.concurrent.ExecutionException: 
java.util.concurrent.TimeoutException
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:678)
at com.autoai.cns.core.CNSDashboardDML$.main(CNSDashboardDML.scala:80)
at com.autoai.cns.core.CNSDashboardDML.main(CNSDashboardDML.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:999)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$14(FutureUtils.java:427)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Re:Re: pyflink1.12 使用connector read.query参数报错

2020-12-24 文章 肖越
谢谢,老师们的指导,根据嘉伟的建议,发现pyflink1.12确实并不支持这个参数~ 

还是希望官方能够开放这个参数,就目前的工作情景来说,取数据就需要定义整张表,如果数据库更改,代码这边很不便于维护;
从本机的实验结果上看,pyflink内部进行query的效率并不高,正准备放到集群上试试~







在 2020-12-25 09:45:28,"Leonard Xu"  写道:
>Hi, 嘉伟  
>
>1.12 应该不支持 read.query, 社区还在讨论是否要开放这个,有些concern, 简单的讲,就如你这个query写的,创建的这张JDBC 
>表应该是一个 View 而不是对应一张JDBC 表,同时这个表只能用来作为source,不能用来作为sink。
>
>祝好,
>Leonard
>
>> 在 2020年12月24日,19:16,冯嘉伟 <1425385...@qq.com> 写道:
>> 
>> hi! 试试这个
>> 
>> CREATE TABLE source_table(
>>yldrate DECIMAL,
>>pf_id VARCHAR,
>>symbol_id VARCHAR) WITH(
>>'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://ip/db',
>>'driver' = 'com.mysql.cj.jdbc.Driver',
>>'username' = 'xxx',
>>'password' = 'xxx',
>>'table-name' = 'TS_PF_SEC_YLDRATE',
>>'read.query' = 'SELECT YLDRATE, PF_ID, SYMBOL_ID FROM
>> TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE =
>> "AC" AND PF_ID = "1030100122" AND SYMBOL_ID = "2030004042" AND BIZ_DATE
>> between "20160701" AND "20170307"'
>>)
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: Flink-1.11.1流写filesystem分区提交问题

2020-12-24 文章 amen...@163.com
想请问下,写filesystem的时候依赖checkpoint进行commit,那么做完一次checkpoint的时候可提交的文件数是由并行度parallelism数决定的吗?我发现我的文件提交数都是3个3个的当每次chk结束后。



 
发件人: amen...@163.com
发送时间: 2020-12-24 18:47
收件人: user-zh
主题: Re: Re: Flink-1.11.1流写filesystem分区提交问题
一语点醒梦中人,谢谢回复@冯嘉伟
 
因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢
 
best,
amenhub
 
 
 
发件人: 冯嘉伟
发送时间: 2020-12-24 18:39
收件人: user-zh
主题: Re: Flink-1.11.1流写filesystem分区提交问题
有开启checkpoint吗?
Part files can be in one of three states:
In-progress : The part file that is currently being written to is
in-progress
Pending : Closed (due to the specified rolling policy) in-progress files
that are waiting to be committed
Finished : On successful checkpoints (STREAMING) or at the end of input
(BATCH) pending files transition to “Finished”
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html

  
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink TaskManager失败的日志关键词

2020-12-24 文章 赵一旦
如题,有人知道关键词吗,每次失败日志太多哦。
显示各种task的cancel等。
最后突然就失败了。。。

目前感觉经常是因为cancel(180s)。导致Task did not exit gracefully within 180 + seconds。


此外,大家生产中会修改日志格式和日志文件吗。我调整了之后WEB-UI上那个日志从来没能看过。现在虽然有个日志list,但点了也没效果。

我调整了日志文件名。


Flink读取kafka没有报错也没有数据输出,Kafka消费端有数据,谢谢

2020-12-24 文章 Appleyuchi
大佬们好
我的环境是:


| 组件 | 版本 |
|
 Flink
|
1.12
|
|
Kafka
|
2.5.0
|
|
Zookeeper
|
3.6.0
|




完整代码是
https://paste.ubuntu.com/p/pRWpvJw4b8/
kafka消费端(使用命令行消费)确认有数据输出。
但是上述代码没有输出,DDL检查过确认无误。


因为听说executeSql会提交任务,所以把最后一句execute给注销了。
求问应该如何修改代码才能让代码有输出?
谢谢

Re: yarn.provided.lib.dirs在flink1.11 yarn提交不生效

2020-12-24 文章 Yang Wang
非常不建议你将非Flink binary的jar存放到yarn.provided.lib.dirs,因为这个下面的jar会以Yarn public
distributed cache的方式进行分发
并在NodeManager上缓存,共享给所有的application使用

你这个报错的根本原因是本地运行main的时候udf还是在hdfs上,所以报错在client端了

有两个办法修复:
1. 不要将udf放到hdfs上的provided lib dirs,除非你确实想将它共享给很多application
2.
使用application模式[1],这种情况用户的main是在JobManager端运行的,provided下面的jar已经都下载并且加入classpath了

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#application-mode

Best,
Yang

zhisheng  于2020年12月25日周五 上午11:26写道:

> hi
>
> 使用 -Dyarn.provided.lib.dirs 试试
>
> Best
> zhisheng
>
> datayangl  于2020年12月22日周二 下午4:56写道:
>
> >
> >
> > flink1.11 on yarn模式,我提前将flink
> > lib下的依赖及自定义函数jar上传到hdfs上,提交时使用yarn.provided.lib.dirs
> > 指定hdfs的依赖路径。原本设想程序中使用反射去寻找自定义函数的类并且实例化,但是提交时报错,程序并没有找到自定义函数的路径
> >
> > 提交命令:/usr/hdp/flink1.11/bin/flink run -m yarn-cluster -d -ynm udf-test
> -yD
> > yarn.provided.lib.dirs=hdfs://ip:8020/flink-yarn/jars -c
> > com.ly.common.udf.demo.FlinkUDFDemo  /data/bigdata/jars/udf-test.jar
> >
> > 相关信息如下:
> > 2020-12-22 08:41:11,157 INFO
> > org.apache.flink.yarn.cli.FlinkYarnSessionCli
> > [] - Dynamic Property set:
> > yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> > 2020-12-22 08:41:11,157 INFO
> > org.apache.flink.yarn.cli.FlinkYarnSessionCli
> > [] - Dynamic Property set:
> > yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> > -- class path:
> /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
> >
> > 
> >  The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: object com.ly.third.udf.flink.SortKey not found.
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > at
> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> > at
> >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at
> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > Caused by: scala.ScalaReflectionException: object
> > com.ly.third.udf.flink.SortKey not found.
> > at
> > scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
> > at
> > scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
> > at
> >
> >
> com.ly.common.udf.reflect.RegisterFlinkFunction$.loadFlinkFunction(RegisterFlinkFunction.scala:14)
> > at
> com.ly.common.udf.demo.FlinkUDFDemo$.main(FlinkUDFDemo.scala:27)
> > at com.ly.common.udf.demo.FlinkUDFDemo.main(FlinkUDFDemo.scala)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > ... 11 more
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: yarn.provided.lib.dirs在flink1.11 yarn提交不生效

2020-12-24 文章 datayangl
用-D 还是加载不了,难道yarn.provided.lib.dirs只有application mode支持???
我看阿里云有yarn-cluster的例子:
https://developer.aliyun.com/article/762501?spm=a2c6h.12873639.0.0.14ac3a9eM6GNSi

  

目前可以用-C加载本地自定义函数jar,但是需要所有节点都有指定的jar,但是这不是我想要的效果。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: yarn.provided.lib.dirs在flink1.11 yarn提交不生效

2020-12-24 文章 zhisheng
hi

使用 -Dyarn.provided.lib.dirs 试试

Best
zhisheng

datayangl  于2020年12月22日周二 下午4:56写道:

>
>
> flink1.11 on yarn模式,我提前将flink
> lib下的依赖及自定义函数jar上传到hdfs上,提交时使用yarn.provided.lib.dirs
> 指定hdfs的依赖路径。原本设想程序中使用反射去寻找自定义函数的类并且实例化,但是提交时报错,程序并没有找到自定义函数的路径
>
> 提交命令:/usr/hdp/flink1.11/bin/flink run -m yarn-cluster -d -ynm udf-test -yD
> yarn.provided.lib.dirs=hdfs://ip:8020/flink-yarn/jars -c
> com.ly.common.udf.demo.FlinkUDFDemo  /data/bigdata/jars/udf-test.jar
>
> 相关信息如下:
> 2020-12-22 08:41:11,157 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> [] - Dynamic Property set:
> yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> 2020-12-22 08:41:11,157 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> [] - Dynamic Property set:
> yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> -- class path: /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: object com.ly.third.udf.flink.SortKey not found.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: scala.ScalaReflectionException: object
> com.ly.third.udf.flink.SortKey not found.
> at
> scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
> at
> scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
> at
>
> com.ly.common.udf.reflect.RegisterFlinkFunction$.loadFlinkFunction(RegisterFlinkFunction.scala:14)
> at com.ly.common.udf.demo.FlinkUDFDemo$.main(FlinkUDFDemo.scala:27)
> at com.ly.common.udf.demo.FlinkUDFDemo.main(FlinkUDFDemo.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: pyflink1.12 使用connector read.query参数报错

2020-12-24 文章 Leonard Xu
Hi, 嘉伟  

1.12 应该不支持 read.query, 社区还在讨论是否要开放这个,有些concern, 简单的讲,就如你这个query写的,创建的这张JDBC 
表应该是一个 View 而不是对应一张JDBC 表,同时这个表只能用来作为source,不能用来作为sink。

祝好,
Leonard

> 在 2020年12月24日,19:16,冯嘉伟 <1425385...@qq.com> 写道:
> 
> hi! 试试这个
> 
> CREATE TABLE source_table(
>yldrate DECIMAL,
>pf_id VARCHAR,
>symbol_id VARCHAR) WITH(
>'connector' = 'jdbc',
>'url' = 'jdbc:mysql://ip/db',
>'driver' = 'com.mysql.cj.jdbc.Driver',
>'username' = 'xxx',
>'password' = 'xxx',
>'table-name' = 'TS_PF_SEC_YLDRATE',
>'read.query' = 'SELECT YLDRATE, PF_ID, SYMBOL_ID FROM
> TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE =
> "AC" AND PF_ID = "1030100122" AND SYMBOL_ID = "2030004042" AND BIZ_DATE
> between "20160701" AND "20170307"'
>)
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re:Re: flink-1.11.1 setMinPauseBetweenCheckpoints不生效

2020-12-24 文章 nicygan
Dian Fu:
  谢谢解惑,我试试换个版本。


thank you
by nicygan

在 2020-12-24 22:44:04,"Dian Fu"  写道:
>应该是个已知问题,在1.11.2里已经修复了:https://issues.apache.org/jira/browse/FLINK-18856
>
>> 在 2020年12月24日,下午9:34,赵一旦  写道:
>> 
>> I don't believe what you say...
>> 
>> nicygan  于2020年12月24日周四 下午7:25写道:
>> 
>>> dear  all:
>>>我在checkpoint设置中,设置了
>>> 
>>> checkpointConfig.setMinPauseBetweenCheckpoints(180_000L)
>>> 但是好像并没有生效,
>>> 比如id=238的结束时间为17:13:30
>>> 但是id=239的开始时间也是17:13:30
>>> 
>>> 
>>>我的理解id=239的开始时间至少应该是17:16:30
>>> 
>>>是我对这个参数理解有误吗?
>>> 
>>> 
>>> thanks
>>> by nicygan
>>> 


Re: flink-1.11.1 setMinPauseBetweenCheckpoints不生效

2020-12-24 文章 Dian Fu
应该是个已知问题,在1.11.2里已经修复了:https://issues.apache.org/jira/browse/FLINK-18856

> 在 2020年12月24日,下午9:34,赵一旦  写道:
> 
> I don't believe what you say...
> 
> nicygan  于2020年12月24日周四 下午7:25写道:
> 
>> dear  all:
>>我在checkpoint设置中,设置了
>> 
>> checkpointConfig.setMinPauseBetweenCheckpoints(180_000L)
>> 但是好像并没有生效,
>> 比如id=238的结束时间为17:13:30
>> 但是id=239的开始时间也是17:13:30
>> 
>> 
>>我的理解id=239的开始时间至少应该是17:16:30
>> 
>>是我对这个参数理解有误吗?
>> 
>> 
>> thanks
>> by nicygan
>> 



Re: flink应用起来后flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.

2020-12-24 文章 赵一旦
报错信息看下:Caused by:
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.NumberFormatException: Not a version: 9。



bigdata <1194803...@qq.com> 于2020年12月24日周四 下午9:49写道:

> flink1.10.1集群dml报错如下
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain. at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.NumberFormatException: Not a version: 9
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000


flink??????????flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.

2020-12-24 文章 bigdata
flink1.10.1dml
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.

Re: flink-1.11.1 setMinPauseBetweenCheckpoints不生效

2020-12-24 文章 赵一旦
I don't believe what you say...

nicygan  于2020年12月24日周四 下午7:25写道:

> dear  all:
> 我在checkpoint设置中,设置了
>
>  checkpointConfig.setMinPauseBetweenCheckpoints(180_000L)
>  但是好像并没有生效,
>  比如id=238的结束时间为17:13:30
>  但是id=239的开始时间也是17:13:30
>
>
> 我的理解id=239的开始时间至少应该是17:16:30
>
> 是我对这个参数理解有误吗?
>
>
> thanks
> by nicygan
>


Re: flink.client.program.ProgramInvocationException: The main method caused an error: Unable to instantiate java compiler

2020-12-24 文章 冯嘉伟
hi!

可以试试修改配置文件:
classloader.resolve-order: parent-first

或者可以尝试
org.apache.flink.table.runtime.generated.CompileUtils
这个工具类



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: pyflink1.12 使用connector read.query参数报错

2020-12-24 文章 冯嘉伟
hi! 试试这个

CREATE TABLE source_table(
yldrate DECIMAL,
pf_id VARCHAR,
symbol_id VARCHAR) WITH(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://ip/db',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'xxx',
'password' = 'xxx',
'table-name' = 'TS_PF_SEC_YLDRATE',
'read.query' = 'SELECT YLDRATE, PF_ID, SYMBOL_ID FROM
TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE =
"AC" AND PF_ID = "1030100122" AND SYMBOL_ID = "2030004042" AND BIZ_DATE
between "20160701" AND "20170307"'
)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink-1.11.1 setMinPauseBetweenCheckpoints不生效

2020-12-24 文章 nicygan
dear  all:
我在checkpoint设置中,设置了

 checkpointConfig.setMinPauseBetweenCheckpoints(180_000L)
 但是好像并没有生效,
 比如id=238的结束时间为17:13:30
 但是id=239的开始时间也是17:13:30


我的理解id=239的开始时间至少应该是17:16:30

是我对这个参数理解有误吗?


thanks
by nicygan


Re: pyflink1.12 使用connector read.query参数报错

2020-12-24 文章 Dian Fu
'table-name' = 'TS_PF_SEC_YLDRATE' 这一行后面少个逗号

> 在 2020年12月24日,下午2:02,肖越 <18242988...@163.com> 写道:
> 
> 使用DDL 定义connector连接Mysql数据库,想通过发送sql的方式直接获取数据:
> source_ddl = """
> CREATE TABLE source_table(
>yldrate DECIMAL,
>pf_id VARCHAR,
>symbol_id VARCHAR) WITH(
>'connector' = 'jdbc',
>'url' = 'jdbc:mysql://ip/db',
>'driver' = 'com.mysql.cj.jdbc.Driver',
>'username' = 'xxx',
>'password' = 'xxx',
>'table-name' = 'TS_PF_SEC_YLDRATE'
>'read.query' = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM 
> TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE = 
> 'AC' AND PF_ID = '1030100122' AND SYMBOL_ID = '2030004042' AND BIZ_DATE 
> between '20160701' AND '20170307'"
>)
> """
> 报错信息:
> File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
>  line 766, in execute_sql
>return TableResult(self._j_tenv.executeSql(stmt))
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
>  line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>  line 147, in deco
>return f(*a, **kw)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>  line 328, in get_return_value
>format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o6.executeSql.
> : org.apache.flink.table.api.SqlParserException: SQL parse failed. 
> Encountered "=" at line 12, column 30.
> Was expecting one of:
>"UESCAPE" ...
> ...
>")" ...
>"," ...
> 
> 提示期待的语法信息,没有看懂,为什么不能出现“=” ?希望路过的大佬,能够指导一下~~谢谢!



Re: Re: Flink-1.11.1流写filesystem分区提交问题

2020-12-24 文章 amen...@163.com
一语点醒梦中人,谢谢回复@冯嘉伟

因为我是先在sql-client中进行的提交测试,因此忽略了这个问题,谢谢

best,
amenhub



 
发件人: 冯嘉伟
发送时间: 2020-12-24 18:39
收件人: user-zh
主题: Re: Flink-1.11.1流写filesystem分区提交问题
有开启checkpoint吗?
 
Part files can be in one of three states:
 
In-progress : The part file that is currently being written to is
in-progress
Pending : Closed (due to the specified rolling policy) in-progress files
that are waiting to be committed
Finished : On successful checkpoints (STREAMING) or at the end of input
(BATCH) pending files transition to “Finished”
 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html

  
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink-1.11.1流写filesystem分区提交问题

2020-12-24 文章 冯嘉伟
有开启checkpoint吗?

Part files can be in one of three states:

In-progress : The part file that is currently being written to is
in-progress
Pending : Closed (due to the specified rolling policy) in-progress files
that are waiting to be committed
Finished : On successful checkpoints (STREAMING) or at the end of input
(BATCH) pending files transition to “Finished”

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/file_sink.html

  



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink-1.11.1流写filesystem分区提交问题

2020-12-24 文章 amen...@163.com
完了,现在的问题是发现好像所有的分区都没有提交,一直不提交,这是为什么呢?



 
发件人: amen...@163.com
发送时间: 2020-12-24 17:04
收件人: user-zh
主题: Flink-1.11.1流写filesystem分区提交问题
hi everyone,
 
最近在验证需求,kafka数据流写hdfs文件系统,使用官网文档Flink-1.11版本的示例demo成功提交到yarn之后,发现如期生成分区目录及文件,但是分区提交有些疑惑想请教下大家。
 
问题描述:
在15点37分时,查看hdfs如期生成[/user/flink/order/dt=2020-03-13/hour=14][/user/flink/order/dt=2020-03-14/hour=21]等相同格式的诸多分区目录,然后具体进入hour=14目录下发现partfile处于inprogress,官网描述说当前系统时间大于分区创建时间+延迟时间,即提交分区;当我在16点37分、38分再去查看时,hour=14目录下的partfile仍处于inprogress状态,查明原因是因为我在16点07分时又向kafka写入了数据,此时发现所有的分区目录下的partfile创建时间都变成了16点07分,因此之前15点37分就已经创建partfile的分区都还要等到17点07分才能进行提交。(理论上是这个意思吧)
 
那么问题来了,看如下ddl可知我的分区是基于day+hour,那么我的理解是分区提交时间计算是基于hour分区目录的创建时间来进行的,对吗?如果是这样的话,那为何我16点07分写数据时会影响到之前那些15点37分创建的分区提交呢?而导致全部都需要等到17点07分才能进行提交..
 
另外,查看了一下我16点07分写数据时,除了这时本身应写入的分区目录下的partfile是16点07分之外,其他所有分区目录下的partfile文件创建时间都被修改成了16点07分,而hour目录却没变化。
 
描述的有点长可能也有点乱,可能是我对流写文件还不够熟悉还没有理解其中真正的意思,所以希望有大佬能帮忙解答,谢谢!
 
source ddl:
CREATE TABLE kafka_source (
order_id STRING,
order_sales DOUBLE,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'flink-kafka',
'properties.bootstrap.servers' = '10.3.15.128:9092',
'properties.group.id' = 'kafka_hdfs',
'format' = 'json',
'scan.startup.mode' = 'group-offsets'
)
sink ddl:
CREATE TABLE hdfs_sink (
order_id STRING,
order_sales DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///user/flink/order',
'format' = 'json',
'sink.partition-commit.delay' = '1h',
'sink.partition-commit.policy.kind' = 'success-file'
)
transform dml:
INSERT INTO hdfs_sink 
SELECT 
order_id,
order_sales,
DATE_FORMAT(update_time, '-MM-dd'),
DATE_FORMAT(update_time, 'HH')
FROM kafka_source
 
best,
amenhub
 
 


DynamicTableSource中Filter push down

2020-12-24 文章 jy l
Hi:
各位大佬,请教一个问题。
我再flink-1.12.0上自定义一个DynamicTableSource,并支持SupportsFilterPushDown,SupportsProjectionPushDown等特性。
然后我的ScanRuntimeProvider使用的是InputFormatProvider。
我运行时,下推的filters在创建InputFormat和copy()方法之后,那我还怎么在InputFormat中根据filters去过滤数据源呢?

我的理解是SupportsFilterPushDown和SupportsProjectionPushDown中的方法应该在DynamicTableSource的copy()方法之后被调用,这样我先拿到project和filter,再去创建InputFormat,这样我就可以根据filter在数据源的地方将不需要的那部分数据过滤掉,使到达flink时的数据只是我需要的那部分数据。可是目前这些方法的调用顺序好像是在InputFormat创建之前。

望知道的告知一下,感谢!
祝好!


DynamicTableSource中Filter push down

2020-12-24 文章 automths
Hi:
各位大佬,请教一个问题。
我再flink-1.12.0上自定义一个DynamicTableSource,并支持SupportsFilterPushDown,SupportsProjectionPushDown等特性。
然后我的ScanRuntimeProvider使用的是InputFormatProvider。
我运行时,下推的filters在创建InputFormat和copy()方法之后,那我还怎么在InputFormat中根据filters去过滤数据源呢?


望知道的告知一下,感谢!
祝好!

Flink-1.11.1流写filesystem分区提交问题

2020-12-24 文章 amen...@163.com
hi everyone,

最近在验证需求,kafka数据流写hdfs文件系统,使用官网文档Flink-1.11版本的示例demo成功提交到yarn之后,发现如期生成分区目录及文件,但是分区提交有些疑惑想请教下大家。

问题描述:
在15点37分时,查看hdfs如期生成[/user/flink/order/dt=2020-03-13/hour=14][/user/flink/order/dt=2020-03-14/hour=21]等相同格式的诸多分区目录,然后具体进入hour=14目录下发现partfile处于inprogress,官网描述说当前系统时间大于分区创建时间+延迟时间,即提交分区;当我在16点37分、38分再去查看时,hour=14目录下的partfile仍处于inprogress状态,查明原因是因为我在16点07分时又向kafka写入了数据,此时发现所有的分区目录下的partfile创建时间都变成了16点07分,因此之前15点37分就已经创建partfile的分区都还要等到17点07分才能进行提交。(理论上是这个意思吧)

那么问题来了,看如下ddl可知我的分区是基于day+hour,那么我的理解是分区提交时间计算是基于hour分区目录的创建时间来进行的,对吗?如果是这样的话,那为何我16点07分写数据时会影响到之前那些15点37分创建的分区提交呢?而导致全部都需要等到17点07分才能进行提交..

另外,查看了一下我16点07分写数据时,除了这时本身应写入的分区目录下的partfile是16点07分之外,其他所有分区目录下的partfile文件创建时间都被修改成了16点07分,而hour目录却没变化。

描述的有点长可能也有点乱,可能是我对流写文件还不够熟悉还没有理解其中真正的意思,所以希望有大佬能帮忙解答,谢谢!

source ddl:
CREATE TABLE kafka_source (
order_id STRING,
order_sales DOUBLE,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'flink-kafka',
'properties.bootstrap.servers' = '10.3.15.128:9092',
'properties.group.id' = 'kafka_hdfs',
'format' = 'json',
'scan.startup.mode' = 'group-offsets'
)
sink ddl:
CREATE TABLE hdfs_sink (
order_id STRING,
order_sales DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///user/flink/order',
'format' = 'json',
'sink.partition-commit.delay' = '1h',
'sink.partition-commit.policy.kind' = 'success-file'
)
transform dml:
INSERT INTO hdfs_sink 
SELECT 
order_id,
order_sales,
DATE_FORMAT(update_time, '-MM-dd'),
DATE_FORMAT(update_time, 'HH')
FROM kafka_source

best,
amenhub




Re: Flink 操作hive 一些疑问

2020-12-24 文章 Jacob
Hi,

谢谢回复

对,也可以这样理解,总体分为两部分,先处理流消息,每隔15min写进hive表。然后再做mapreduce处理上步15min的数据。
  
目前的现状是:
第一步用flink处理,第二步是一个定时job去处理上一步的数据。

改善计划:

想整合这两步,都使用flin处理,flink新版本对hive有支持,就不用再使用MapReduce了,现在就是不知道怎样平滑地在同一个Job中执行。




-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/