????Pyflink????kerberos??????hdfs????????????

2021-02-03 Thread ??????




Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
 org.apache.hadoop.security.AccessControlException: SIMPLE authentication is 
not enabled. Available:[TOKEN, KERBEROS]


kerberos??flink??

??????flinkSQL??ValueStateDescriptor????????StateTtlConfig

2021-02-03 Thread ???????L
streamTableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1));




----
??: "stgztsw"http://apache-flink.147419.n8.nabble.com/

flinkSQL的ValueStateDescriptor没有设置StateTtlConfig

2021-02-03 Thread stgztsw
目前用的是flink1.10的版本,发现flinkSQL的场景(rocksdb),除了streamingJoinOperator以外,ValueStateDescriptor都没有设置StateTtlConfig,这样的话会不会导致groupby之类的聚合操作的状态永远不会被清理,而导致越来越大?好像也没有任何配置来调整?这是不是不太合理?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint

2021-02-03 Thread HunterXHunter
代码如下: evictor设置的在窗口触发前清理所有数据,按理进入sum是没有数据,但是调试的时候发现,sum经过计算会输出 null 进入
print,导致报 Nullpoint。不知道是bug还是我的问题;

class A {
String word;
Long time;

public A(String word, Long time) {
this.word = word;
this.time = time;
}
};
streamEnv.fromElements(new A("a", 1L))
.assignTimestampsAndWatermarks(
WatermarkStrategy.
forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(((element,
recordTimestamp) -> element.time))
)
.keyBy(x -> x.time)
.map(x -> new Tuple2<>(x.word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy((KeySelector, String>) o -> o.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0)))
.sum(1)
.print();
streamEnv.execute();



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink window evictor(TimeEvictor.of(Time.seconds(0))) 会出现 NullPoint问题

2021-02-03 Thread HunterXHunter
代码如下:
stream
.keyBy((KeySelector, String>) o -> o.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(100)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(2)))
.evictor(TimeEvictor.of(Time.seconds(0)))
.sum(1)
.print();

当 数据在窗口计算前被全部清除时,sum结果会是一个null,会传入print,导致 nullpoint



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink cli to upload flink jar and run flink jobs in the Jenkins pipeline

2021-02-03 Thread sidhant gupta
Thanks Yang for your help.

On Thu, Feb 4, 2021, 8:28 AM Yang Wang  wrote:

> Yes, if you are using the CLI(e.g. flink run/list/cancel -t yarn-session
> ...) for the job management,
> it will eventually call the RestClusterClient, which could retrieve the
> leader JobManager address from ZK.
>
> Please ensure that you have specified the HA related config options in CLI
> via -D or set them in the flink-conf.yaml.
>
> Best,
> Yang
>
> sidhant gupta  于2021年2月3日周三 下午10:02写道:
>
>> Is it possible to use flink CLI instead of flink client for connecting
>> zookeeper using network load balancer to retrieve the leader Jobmanager
>> address?
>>
>> On Wed, Feb 3, 2021, 12:42 PM Yang Wang  wrote:
>>
>>> I think the Flink client could make a connection with ZooKeeper via the
>>> network load balancer.
>>> Flink client is not aware of whether it is a network balancer or
>>> multiple ZooKeeper server address.
>>> After then Flink client will retrieve the active leader JobManager
>>> address via ZooKeeperHAService
>>> and submit the job successfully via rest client.
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> sidhant gupta  于2021年2月2日周二 下午11:14写道:
>>>
 Hi

 I have a flink ECS cluster setup with HA mode using zookeeper where I
 have 2 jobmanagers out of which one of will be elected as leader using
 zookeeper leader election. I have one application load balancer in front of
 the jobmanagers and one network load balancer in front of zookeeper.

 As per [1]
 
  ,
 we can provide zookeeper address in the flink cli arguments and it would
 upload/ submit the jar to the leader jobmanager. But since I am using
 network load balancer in front of zookeeper, I guess it is not able to make
 connection with the zookeeper. Please provide suggestions or sample command
 for uploading the flink job jar or run the job.

 Is  there any way by which we can distinguish between leader and
 standby jobmanagers in terms of request or response ?

 Can we use flink cli in jenkins to upload the jar to the flink cluster
 and run the jobs?


 [1]
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html

 Thanks
 Sidhant Gupta

>>>


Re: Flink 1.11 session cluster相关问题

2021-02-03 Thread zilong xiao
捞一下自己

zilong xiao  于2021年2月2日周二 上午10:35写道:

> 请问社区大佬,1.11版本的session
> cluster模式不支持在启动时指定启动taskmanager个数了吗?好像只能动态申请资源了?在1.4版本可以用-n,现在该参数已移除,为什么要这么做呢?我理解在启动一个session
> cluster的同时申请好TM个数也是一种常见场景吧?
>
> 求社区大佬指点
>


Re: org.codehaus.janino.CompilerFactory cannot be cast ....

2021-02-03 Thread Marco Villalobos
Oh, I found the solution. I simply need to not use TRACE log level for
Flink.

On Wed, Feb 3, 2021 at 7:07 PM Marco Villalobos 
wrote:

>
> Please advise me. I don't know what I am doing wrong.
>
> After I added the blink table planner to my my dependency management:
>
> dependency
> "org.apache.flink:flink-table-planner-blink_${scalaVersion}:${flinkVersion}"
>
> and added it as a dependency:
>
> implementation "org.apache.flink:flink-table-planner-blink_${scalaVersion}"
>
> and excluded it from shadowJar:
>
>
> exclude(dependency("org.apache.flink:flink-table-planner-blink_${scalaVersion}:"))
>
> I can run it just fine within my IDE. However, if then run this on a local
> cluster, I get this error:
>
> 2021-02-03 18:42:49,662 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalTableScan#0
> 2021-02-03 18:42:49,687 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalTableScan#1
> 2021-02-03 18:42:49,694 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalTableScan#2
> 2021-02-03 18:42:49,742 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalTableScan#3
> 2021-02-03 18:42:49,758 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalJoin#4
> 2021-02-03 18:42:49,763 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalJoin#5
> 2021-02-03 18:42:49,768 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalTableScan#6
> 2021-02-03 18:42:49,770 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalTableScan#7
> 2021-02-03 18:42:49,771 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalJoin#8
> 2021-02-03 18:42:49,771 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalJoin#9
> 2021-02-03 18:42:49,777 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalTableScan#10
> 2021-02-03 18:42:49,780 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalTableScan#11
> 2021-02-03 18:42:49,805 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalSnapshot#12
> 2021-02-03 18:42:49,807 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalFilter#13
> 2021-02-03 18:42:49,809 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalCorrelate#14
> 2021-02-03 18:42:49,816 TRACE org.apache.calcite.plan.RelOptPlanner
>  [] - new LogicalProject#15
> 2021-02-03 18:42:49,822 ERROR MyApp [] - Failed execution.
> java.lang.IllegalStateException: Unable to instantiate java compiler
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:433)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:374)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:474)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:487)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:95)
> ~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
> at
> 

?????? ??????????????????????????????????

2021-02-03 Thread op





----
??: 
   "user-zh"



Re:回复: 如何在程序里面判断作业是否是重启了

2021-02-03 Thread zapjone



下游数据做好幂等操作,就不怕重复操作了。。














在 2021-02-04 11:26:56,"op" <520075...@qq.com> 写道:
>重启可能会导致数据重发,想加个告警
>
>
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2021年2月4日(星期四) 中午11:11
>收件人:"user-zh"
>主题:Re: 如何在程序里面判断作业是否是重启了
>
>
>
>业务上的需求是什么?
>
>Best,
>tison.
>
>
>op <520075...@qq.com 于2021年2月4日周四 上午11:04写道:
>
> 大家好:
> nbsp;
> 
>nbsp;我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢?


?????? ??????????????????????????????????

2021-02-03 Thread op
??




----
??: 
   "user-zh"



pyflink1.12 定义源表后, 执行表关联的query效率较慢?

2021-02-03 Thread 肖越
不知道大家有没有遇到这种情况,请求大佬帮忙分析一下。


我在flink中定义了两张源表,分别对应于 Mysql 数据库中的表格,
表 a 有6934行数据;表 b 有11415574行数据;
在关联操作后,进行常规的SELECT  WHERE等操作,最后查找符合条件的250条数据。
最后是print() 查找结果操作,每次单机执行都会跑10分钟!


相比于,pyflink1.11 的connector read.query()操作慢了好多,
请问pyflink1.12中是什么操作增加了执行时间,是将query这部分操作放到flink执行了么?
是否有其他的改善方式?



Re: 如何在程序里面判断作业是否是重启了

2021-02-03 Thread tison
业务上的需求是什么?

Best,
tison.


op <520075...@qq.com> 于2021年2月4日周四 上午11:04写道:

> 大家好:
> 
> 我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢?


Flink window evictor(TimeEvictor.of(Time.seconds(0))) 会出现 NullPoint问题

2021-02-03 Thread HunterXHunter
当程序使用 
evictor(TimeEvictor.of(Time.seconds(0)))
来清除 窗口触发前数据时,当数据全部被清除了,在print时会报Null point

Caused by: java.lang.NullPointerException
at
org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(PrintSinkOutputWriter.java:73)
at
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction.invoke(PrintSinkFunction.java:81)


为什么没数据也会传一个 null 到 Sink?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


org.codehaus.janino.CompilerFactory cannot be cast ....

2021-02-03 Thread Marco Villalobos
Please advise me. I don't know what I am doing wrong.

After I added the blink table planner to my my dependency management:

dependency
"org.apache.flink:flink-table-planner-blink_${scalaVersion}:${flinkVersion}"

and added it as a dependency:

implementation "org.apache.flink:flink-table-planner-blink_${scalaVersion}"

and excluded it from shadowJar:

exclude(dependency("org.apache.flink:flink-table-planner-blink_${scalaVersion}:"))

I can run it just fine within my IDE. However, if then run this on a local
cluster, I get this error:

2021-02-03 18:42:49,662 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalTableScan#0
2021-02-03 18:42:49,687 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalTableScan#1
2021-02-03 18:42:49,694 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalTableScan#2
2021-02-03 18:42:49,742 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalTableScan#3
2021-02-03 18:42:49,758 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalJoin#4
2021-02-03 18:42:49,763 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalJoin#5
2021-02-03 18:42:49,768 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalTableScan#6
2021-02-03 18:42:49,770 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalTableScan#7
2021-02-03 18:42:49,771 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalJoin#8
2021-02-03 18:42:49,771 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalJoin#9
2021-02-03 18:42:49,777 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalTableScan#10
2021-02-03 18:42:49,780 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalTableScan#11
2021-02-03 18:42:49,805 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalSnapshot#12
2021-02-03 18:42:49,807 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalFilter#13
2021-02-03 18:42:49,809 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalCorrelate#14
2021-02-03 18:42:49,816 TRACE org.apache.calcite.plan.RelOptPlanner
   [] - new LogicalProject#15
2021-02-03 18:42:49,822 ERROR MyApp [] - Failed execution.
java.lang.IllegalStateException: Unable to instantiate java compiler
at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:433)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:374)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:474)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:487)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:95)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.calcite.rel.metadata.RelMetadataQuery.isVisibleInExplain(RelMetadataQuery.java:822)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.calcite.rel.externalize.RelWriterImpl.explain_(RelWriterImpl.java:66)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at
org.apache.calcite.rel.externalize.RelWriterImpl.done(RelWriterImpl.java:148)
~[flink-table-blink_2.12-1.11.2.jar:1.11.2]
at 

??????????????????????????????????

2021-02-03 Thread op

 
??RestartStrategiesRestart??

Re: Flink cli to upload flink jar and run flink jobs in the Jenkins pipeline

2021-02-03 Thread Yang Wang
Yes, if you are using the CLI(e.g. flink run/list/cancel -t yarn-session
...) for the job management,
it will eventually call the RestClusterClient, which could retrieve the
leader JobManager address from ZK.

Please ensure that you have specified the HA related config options in CLI
via -D or set them in the flink-conf.yaml.

Best,
Yang

sidhant gupta  于2021年2月3日周三 下午10:02写道:

> Is it possible to use flink CLI instead of flink client for connecting
> zookeeper using network load balancer to retrieve the leader Jobmanager
> address?
>
> On Wed, Feb 3, 2021, 12:42 PM Yang Wang  wrote:
>
>> I think the Flink client could make a connection with ZooKeeper via the
>> network load balancer.
>> Flink client is not aware of whether it is a network balancer or multiple
>> ZooKeeper server address.
>> After then Flink client will retrieve the active leader JobManager
>> address via ZooKeeperHAService
>> and submit the job successfully via rest client.
>>
>> Best,
>> Yang
>>
>>
>> sidhant gupta  于2021年2月2日周二 下午11:14写道:
>>
>>> Hi
>>>
>>> I have a flink ECS cluster setup with HA mode using zookeeper where I
>>> have 2 jobmanagers out of which one of will be elected as leader using
>>> zookeeper leader election. I have one application load balancer in front of
>>> the jobmanagers and one network load balancer in front of zookeeper.
>>>
>>> As per [1]
>>> 
>>>  ,
>>> we can provide zookeeper address in the flink cli arguments and it would
>>> upload/ submit the jar to the leader jobmanager. But since I am using
>>> network load balancer in front of zookeeper, I guess it is not able to make
>>> connection with the zookeeper. Please provide suggestions or sample command
>>> for uploading the flink job jar or run the job.
>>>
>>> Is  there any way by which we can distinguish between leader and standby
>>> jobmanagers in terms of request or response ?
>>>
>>> Can we use flink cli in jenkins to upload the jar to the flink cluster
>>> and run the jobs?
>>>
>>>
>>> [1]
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html
>>>
>>> Thanks
>>> Sidhant Gupta
>>>
>>


Re: Get/Set Job Id in Flink 1.11

2021-02-03 Thread Yang Wang
Hi Wang Li,

The application mode is introduced in release 1.10 and has replaced the old
StandaloneJobClusterEntrypoint.
By default, if you enable the HA, then you will get a ZERO_JOB_ID.
Otherwise, it will be a random uuid.

For standalone application mode, you could use the "./bin/standalone-job.sh
--job-id " to start the application
cluster with fixed job id.

Best,
Yang

Li Wang  于2021年2月4日周四 上午10:12写道:

> Hi team,
>
> We're running flink jobs in application mode. Pre Flink 1.7, the job id by
> default is ``. However, in Flink 1.11, we
> found the job id is random. Is there a way to set job id or we can only
> retrieve the job id by ourselves each time? Thanks.
>
> - Li
>


Re: How to pre upload jar file on Flink Session Cluster Instead of adding manually from the Web UI

2021-02-03 Thread Yang Wang
Hi Robert,

After checking the JarRunHandler implementation, I think you requirement
could be done as following steps.

1. Use the init container to download the user jars or directly baked jars
into the image under path /path/of/flink-jars/flink-web-upload
2. Set the Flink configuration option "web.upload.dir: /path/of/flink-jars"
for your session cluster. Please note that do not contains the
"flink-web-upload" sub directory.
3. Submit the job via the curl command

curl -H "Content-Type: application/json" -X POST -d
'{"entryClass":"org.apache.flink.streaming.examples.statemachine.StateMachineExample","parallelism":null,"programArgs":null,"savepointPath":null,"allowNonRestoredState":null}'
http://localhost:8081/jars/StateMachineExample.jar/run


Moreover, you could also have a try with the application mode. Both the
standalone[1] and native application[2] mode could meet your requirements
with better isolation.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/standalone/kubernetes.html#deploy-application-cluster
[2].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html#application-mode


Best,
Yang


Robert Cullen  于2021年2月4日周四 上午4:04写道:

> I have a Kubernetes cluster with Flink running in Session Mode.
>
> Is there a way to drop the jar file into a folder and/or add it to the
> Docker image?
>
>
> --
> Robert Cullen
> 240-475-4490
>


Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 Thread 陈康
谢谢、我试试



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Get/Set Job Id in Flink 1.11

2021-02-03 Thread Li Wang
Hi team,

We're running flink jobs in application mode. Pre Flink 1.7, the job id by
default is ``. However, in Flink 1.11, we
found the job id is random. Is there a way to set job id or we can only
retrieve the job id by ourselves each time? Thanks.

- Li


Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 Thread Xingbo Huang
Hi,

你其实可以在open方法里面进行加载的,这样只会加载一次,在eval方法中加载将会导致多次加载。

Best,
Xingbo

陈康 <844256...@qq.com> 于2021年2月4日周四 上午9:25写道:

> 感谢回复、之前是在__init__方法中加载Keras模型、经钉钉群大佬指教在eval中使用再加载、问题解决了,谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-02-03 Thread yang nick
flink window  doesn't support update stream.

HongHuangNeu  于2021年2月4日周四 上午9:24写道:

> 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming
> update and delete changes,有没有什么替代方案?输入是来自于流式去重,就是
>
> SELECT [column_list]
> FROM (
>SELECT [column_list],
>  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
>ORDER BY time_attr [asc|desc]) AS rownum
>FROM table_name)
> WHERE rownum = 1
>
> 这样的语句
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 Thread 陈康
感谢回复、之前是在__init__方法中加载Keras模型、经钉钉群大佬指教在eval中使用再加载、问题解决了,谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-02-03 Thread HongHuangNeu
如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming
update and delete changes,有没有什么替代方案?输入是来自于流式去重,就是

SELECT [column_list]
FROM (
   SELECT [column_list],
 ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
   ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

这样的语句



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: PyFlink How to set timeout for UDF

2021-02-03 Thread Dian Fu
先理解一下你的需求:是说Python UDF的实现,处理一条数据的时间可能非常长,如果很长时间没有执行完,希望作业停止?

> 在 2021年2月3日,下午1:04,苗红宾  写道:
> 
> Hi:
> 
> Hope you are doing well!
> 
> My UDF always running in a long time, so I'm wondering, how to set timeout 
> for UDF in Pyflink, in order to auto-stop the execution when it running in a 
> long time.
> 
> Many Thanks!
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 



如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-02-03 Thread HongHuangNeu
如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming
update and delete changes,有没有什么替代方案?输入是来自于流式去重,就是

SELECT [column_list]
FROM (
   SELECT [column_list],
 ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
   ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

这样的语句



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 Thread Dian Fu
可以发一下你的__init__方法吗?应该是在__init__方法里有不能pickle的对象。

> 在 2021年2月3日,下午6:01,陈康 <844256...@qq.com> 写道:
> 
>  
> https://blog.csdn.net/weixin_44904816/article/details/108744530
> 看到一篇博客说:“PyFlink以后还可以支持 Tensorflow、Keras”.好吧..
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



How to pre upload jar file on Flink Session Cluster Instead of adding manually from the Web UI

2021-02-03 Thread Robert Cullen
I have a Kubernetes cluster with Flink running in Session Mode.

Is there a way to drop the jar file into a folder and/or add it to the
Docker image?


-- 
Robert Cullen
240-475-4490


AW: AbstractMethodError while writing to parquet

2021-02-03 Thread Jan Oelschlegel
Hi Till,

thanks for hint. I checked it and found a version conflict with flink-parquet.

With this version it is running:



org.apache.parquet
parquet-avro
1.10.0



But how can I avoid this in the future? I had to add parquet-avro, because 
without there were some errors. Do I have to lookup such conflicts manually and 
then choose the same version like at flink dependencies ?


Best,
Jan

Von: Till Rohrmann 
Gesendet: Mittwoch, 3. Februar 2021 11:41
An: Jan Oelschlegel 
Cc: user 
Betreff: Re: AbstractMethodError while writing to parquet

Hi Jan,

it looks to me that you might have different parquet-avro dependencies on your 
class path. Could you make sure that you don't have different versions of the 
library on your classpath?

Cheers,
Till

On Tue, Feb 2, 2021 at 3:29 PM Jan Oelschlegel 
mailto:oelschle...@integration-factory.de>> 
wrote:
Hi at all,

i’m using Flink 1.11 with the datastream api. I would like to write my results 
in parquet format into HDFS.

Therefore i generated an Avro SpecificRecord with the avro-maven-plugin:



org.apache.avro
avro-maven-plugin
1.8.2


generate-sources

schema



src/main/resources/avro/

${project.basedir}/target/generated-sources/
String






Then  I’m using the SpecificRecord in the StreamingFileSink:


val sink: StreamingFileSink[SpecificRecord] = StreamingFileSink
  .forBulkFormat(
new Path("hdfs://example.com:8020/data/"),
ParquetAvroWriters.forSpecificRecord(classOf[SpecificRecord])
  )
  .build()


The job cancels with the following error:


java.lang.AbstractMethodError: 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg/apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/Statistics;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;)V
at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:53)
at 
org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:315)
at 
org.apache.parquet.column.impl.ColumnWriteStoreBase.sizeCheck(ColumnWriteStoreBase.java:200)
at 
org.apache.parquet.column.impl.ColumnWriteStoreBase.endRecord(ColumnWriteStoreBase.java:187)
at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.endRecord(ColumnWriteStoreV1.java:27)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:307)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:166)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:202)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 

How to use the TableAPI to query the data in the Sql Training Rides table ?

2021-02-03 Thread cristi.cioriia
Hey guys,

I'm pretty new to Flink, I hope I could get some help on getting data out of
a Flink cluster. 

I've setup the cluster by following the steps in
https://github.com/ververica/sql-training and now I wanted to retrieve the
data from the Rides table in a Scala program, using the TableAPI. The code I
used is:



, but when I run it I get the following exception:



I have added on my classpath the following maven dependencies:



and exposed the port 6123 of the jobmanager in the docker-compose file and
checked that I can telnet to it.

Your help is greatly appreciated.

Cristi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

2021-02-03 Thread Yuval Itzchakov
Hi Timo,

The problem with this is I would still have to determine the keys manually,
which is not really feasible in my case. Is there any internal API that
might be of use to extract this information?

On Wed, Feb 3, 2021 at 5:19 PM Timo Walther  wrote:

> Hi Yuval,
>
> we changed this behavior a bit to be more SQL compliant. Currently,
> sinks must be explicitly defined with a PRIMARY KEY constraint. We had
> discussions about implicit sinks, but nothing on the roadmap yet. The
> `CREATE TEMPORARY TABLE LIKE` clause should make it easy to extend the
> original table with just a primary key.
>
> Regards,
> Timo
>
>
> On 03.02.21 14:09, Yuval Itzchakov wrote:
> > Hi,
> > I'm reworking an existing UpsertStreamTableSink into the new
> > DynamicTableSink API. In the previous API, one would get the unique keys
> > for upsert queries via the `setKeyFields` method, which would calculate
> > them based on the grouping keys during the translation phase.
> >
> > Searching around, I saw that JDBC
> > (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling>)
>
> > relies on explicit key passing via the PRIMARY KEY constraint. However,
> > this would require additional manual insertion which I am trying to
> avoid.
> >
> > What would be the proper way to receive the unique keys for upsert
> > queries with the new DynamicTableSink API?
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
>
>

-- 
Best Regards,
Yuval Itzchakov.


Re: Job submission failure via flink cli

2021-02-03 Thread Chesnay Schepler

Please make sure the client and server version are in sync.

On 2/3/2021 4:12 PM, sidhant gupta wrote:
I am getting following error while running the below command with the 
attached conf/flink-conf.yaml:


bin/flink run -c firstflinkpackage.someJob ../somejob.jar arg1 arg2 arg3


2021-02-03 15:04:24,113 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
JobGraph submission 9cbf97d3f9b368bf2c27a52b39601500 (Flink FHIR Mapper).
2021-02-03 15:04:24,115 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - 
Submitting job 9cbf97d3f9b368bf2c27a52b39601500 (Flink FHIR Mapper).
2021-02-03 15:04:24,334 INFO 
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore [] - Added 
JobGraph(jobId: 9cbf97d3f9b368bf2c27a52b39601500) to ZooKeeper.
2021-02-03 15:04:24,335 INFO 
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC 
endpoint for org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_5 .
2021-02-03 15:04:24,336 INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job 
Flink FHIR Mapper (9cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back 
off time strategy 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
backoffTimeMS=5000) for Flink FHIR Mapper 
(9cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Running 
initialization on master for job Flink FHIR Mapper 
(9cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO 
org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran 
initialization on master in 0 ms.
2021-02-03 15:04:24,461 INFO 
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore [] - 
Removed job graph 9cbf97d3f9b368bf2c27a52b39601500 from ZooKeeper.
2021-02-03 15:04:24,461 INFO 
org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore [] - 
Removed job graph 9cbf97d3f9b368bf2c27a52b39601500 from ZooKeeper.
2021-02-03 15:04:24,697 ERROR 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed 
to submit job 9cbf97d3f9b368bf2c27a52b39601500.
org.apache.flink.runtime.client.JobExecutionException: Could not 
instantiate JobManager.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) ~[?:?]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11.2.jar:1.11.2]

Caused by: java.lang.NullPointerException
at java.util.Collections$UnmodifiableCollection.(Unknown 
Source) ~[?:?]
at java.util.Collections$UnmodifiableList.(Unknown Source) 
~[?:?]

at java.util.Collections.unmodifiableList(Unknown Source) ~[?:?]
at 
org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(JobVertex.java:352) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:232) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 

Re: DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

2021-02-03 Thread Timo Walther

Hi Yuval,

we changed this behavior a bit to be more SQL compliant. Currently, 
sinks must be explicitly defined with a PRIMARY KEY constraint. We had 
discussions about implicit sinks, but nothing on the roadmap yet. The 
`CREATE TEMPORARY TABLE LIKE` clause should make it easy to extend the 
original table with just a primary key.


Regards,
Timo


On 03.02.21 14:09, Yuval Itzchakov wrote:

Hi,
I'm reworking an existing UpsertStreamTableSink into the new 
DynamicTableSink API. In the previous API, one would get the unique keys 
for upsert queries via the `setKeyFields` method, which would calculate 
them based on the grouping keys during the translation phase.


Searching around, I saw that JDBC 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling 
) 
relies on explicit key passing via the PRIMARY KEY constraint. However, 
this would require additional manual insertion which I am trying to avoid.


What would be the proper way to receive the unique keys for upsert 
queries with the new DynamicTableSink API?


--
Best Regards,
Yuval Itzchakov.




Re: Max with retract aggregate function does not support type: ''CHAR''.

2021-02-03 Thread Timo Walther

Hi Yuval,

yes this is rather a bug. If we support VARCHAR here we should also 
support CHAR. Feel free to open an issue.


Regards,
Timo

On 03.02.21 11:46, Yuval Itzchakov wrote:
I can understand that in some sense it's nonsensical to MAX on a CHAR, 
since Blink will only determine a CHAR when there's a constant in the 
SQL, but I was surprised that it didn't work with just an identity 
implementation.


On Wed, Feb 3, 2021 at 12:33 PM Till Rohrmann > wrote:


Thanks for reaching out to the Flink community Yuval. I am pulling
in Timo and Jark who might be able to answer this question. From
what I can tell, it looks a bit like an oversight because VARCHAR is
also supported.

Cheers,
Till

On Tue, Feb 2, 2021 at 6:12 PM Yuval Itzchakov mailto:yuva...@gmail.com>> wrote:

Hi,
I'm trying to use MAX on a field that is statically known from
another table (let's ignore why for a moment). While running the
SQL query, I receive an error:

Max with retract aggregate function does not support type: ''CHAR''.

Looking at the code for creating the max function:

image.png

It does seem like all primitives are supported. Is there a
particular reason why a CHAR would not be supported? Is this an
oversight?
-- 
Best Regards,

Yuval Itzchakov.



--
Best Regards,
Yuval Itzchakov.




Job submission failure via flink cli

2021-02-03 Thread sidhant gupta
I am getting following error while running the below command with the
attached conf/flink-conf.yaml:

bin/flink run -c firstflinkpackage.someJob ../somejob.jar arg1 arg2 arg3


2021-02-03 15:04:24,113 INFO org.apache.flink.runtime.dispatcher.
StandaloneDispatcher [] - Received JobGraph submission
9cbf97d3f9b368bf2c27a52b39601500
(Flink FHIR Mapper).
2021-02-03 15:04:24,115 INFO org.apache.flink.runtime.dispatcher.
StandaloneDispatcher [] - Submitting job 9cbf97d3f9b368bf2c27a52b39601500 (
Flink FHIR Mapper).
2021-02-03 15:04:24,334 INFO org.apache.flink.runtime.jobmanager.
ZooKeeperJobGraphStore [] - Added JobGraph(jobId:
9cbf97d3f9b368bf2c27a52b39601500)
to ZooKeeper.
2021-02-03 15:04:24,335 INFO org.apache.flink.runtime.rpc.akka.
AkkaRpcService [] - Starting RPC endpoint for
org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_5 .
2021-02-03 15:04:24,336 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job Flink FHIR Mapper (9cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3,
backoffTimeMS=5000) for Flink FHIR Mapper (9
cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on master for job Flink FHIR Mapper (9
cbf97d3f9b368bf2c27a52b39601500).
2021-02-03 15:04:24,337 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran initialization on master in 0 ms.
2021-02-03 15:04:24,461 INFO org.apache.flink.runtime.jobmanager.
ZooKeeperJobGraphStore [] - Removed job graph 9cbf97d3f9b368bf2c27a52b39601500
from ZooKeeper.
2021-02-03 15:04:24,461 INFO org.apache.flink.runtime.jobmanager.
ZooKeeperJobGraphStore [] - Removed job graph 9cbf97d3f9b368bf2c27a52b39601500
from ZooKeeper.
2021-02-03 15:04:24,697 ERROR org.apache.flink.runtime.dispatcher.
StandaloneDispatcher [] - Failed to submit job 9
cbf97d3f9b368bf2c27a52b39601500.
org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
at org.apache.flink.runtime.dispatcher.Dispatcher
.lambda$createJobManagerRunner$6(Dispatcher.java:398) ~[flink-dist_2.11-1.11
.2.jar:1.11.2]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
ForkJoinExecutorConfigurator.scala:44) [flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
.java:1339) [flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
.java:107) [flink-dist_2.11-1.11.2.jar:1.11.2]
Caused by: java.lang.NullPointerException
at java.util.Collections$UnmodifiableCollection.(Unknown Source)
~[?:?]
at java.util.Collections$UnmodifiableList.(Unknown Source) ~[?:?]
at java.util.Collections.unmodifiableList(Unknown Source) ~[?:?]
at org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(
JobVertex.java:352) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(
ExecutionJobVertex.java:232) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.executiongraph.ExecutionGraph
.attachJobGraph(ExecutionGraph.java:814) ~[flink-dist_2.11-1.11.2.jar:1.11.2
]
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder
.buildGraph(ExecutionGraphBuilder.java:228) ~[flink-dist_2.11-1.11.2.jar:
1.11.2]
at org.apache.flink.runtime.scheduler.SchedulerBase
.createExecutionGraph(SchedulerBase.java:269) ~[flink-dist_2.11-1.11.2.jar:
1.11.2]
at org.apache.flink.runtime.scheduler.SchedulerBase
.createAndRestoreExecutionGraph(SchedulerBase.java:242) ~[flink-dist_2.11-
1.11.2.jar:1.11.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase
.java:229) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.(
DefaultScheduler.java:119) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory
.createInstance(DefaultSchedulerFactory.java:103) ~[flink-dist_2.11-1.11.2
.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(
JobMaster.java:284) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:
272) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.jobmaster.factories.
DefaultJobMasterServiceFactory.createJobMasterService(
DefaultJobMasterServiceFactory.java:98) 

Re: Question regarding a possible use case for Iterative Streams.

2021-02-03 Thread Marco Villalobos
Hi Gorden,

Thank you very much for the detailed response.

I considered using the state-state processor API, however, our enrichment 
requirements make the state-processor API a bit inconvenient.
1. if an element from the stream matches a record in the database then it can 
remain in the cache a very long time (potentially forever).
2. if an element from the stream does not match a record in the database then 
that miss cannot be cached a very long time because that record might be added 
to the database and we have to pick it up in a timely manner.
3. Our stream has many elements that lack enrichment information in the 
database.

Thus, for that reason, the state processor api only really helps with records 
that already exist in the database, even though the stream has many records 
that do not exist.

That is why I was brainstorming over my idea of using an iterative stream that 
uses caching in the body, but AsyncIO in a feedback loop.

You mentioned "in general I think it is currently discouraged to us it 
(iterative streams)." May I ask what is your source for that statement? I see 
no mention of any discouragement in Flink's documentation.

I will look into how State Functions can help me in this scenario. I have not 
read up much on stateful functions.

If I were to write a proof of concept, and my database queries were performed 
with JDBC, could I just write an embedded function that performs the JDBC call 
directly (I want to avoid changing our deployment topology for now) and package 
it with my Data Stream Job?

Thank you.

> On Feb 2, 2021, at 10:08 PM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Marco,
> 
> In the ideal setup, enrichment data existing in external databases is
> bootstrapped into the streaming job via Flink's State Processor API, and any
> follow-up changes to the enrichment data is streamed into the job as a
> second union input on the enrichment operator.
> For this solution to scale, lookups to the enrichment data needs to be by
> the same key as the input data, i.e. the enrichment data is co-partitioned
> with the input data stream.
> 
> I assume you've already thought about whether or not this would work for
> your case, as it's a common setup for streaming enrichment.
> 
> Otherwise, I believe your brainstorming is heading in the right direction,
> in the case that remote database lookups + local caching in state is a must.
> I'm personally not familiar with the iterative streams in Flink, but in
> general I think it is currently discouraged to use it.
> 
> On the other hand, I think using Stateful Function's [1] programing
> abstraction might work here, as it allows arbitrary messaging between
> functions and cyclic dataflows.
> There's also an SDK that allows you to embed StateFun functions within a
> Flink DataStream job [2].
> 
> Very briefly, the way you would model this database cache hit / remote
> lookup is by implementing a function, e.g. called DatabaseCache.
> The function would expect message types of Lookup(lookupKey), and replies
> with a response of Result(lookupKey, value). The abstraction allows you, for
> on incoming message, to register state (similar to vanilla Flink), as well
> as register async operations with which you'll use to perform remote
> database lookups in case of cache / state miss. It also provides means for
> "timers" in the form of delayed messages being sent to itself, if you need
> some mechanism for cache invalidation.
> 
> Hope this provides some direction for you to think about!
> 
> Cheers,
> Gordon
> 
> [1] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/
> [2]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/flink-datastream.html
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-03 Thread Randal Pitt
Thanks everyone for the responses.

I tried out the JeMalloc suggestion from FLINK-19125 using a patched 1.11.3
image and so far it appears to working well. I see it's included in 1.12.1
and Docker images are available so I'll look at upgrading too.

Best regards,

Randal.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoint problem in 1.12.0

2021-02-03 Thread Till Rohrmann
Thanks for reaching out to the Flink community. I will respond on the JIRA
ticket.

Cheers,
Till

On Wed, Feb 3, 2021 at 1:59 PM simpleusr  wrote:

> Hi
>
> I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems
> to be broken in our kafka connector sourced datastream jobs.
>
> Since there is a siginificant version gap and there are many backwards
> uncompatible / deprecated changes in flink runtime between versions, I had
> to modify our jobs and noticed that checkpoint offsets are not committed to
> kafka for source connectors.
>
> To simplfiy the issues I created simple repoducer projects:
>
> https://github.com/simpleusr/flink_problem_1.5.5
>
> https://github.com/simpleusr/flink_problem_1.12.0
>
> It seems that there are major changes in the checkpoint infrastructure.
>
> For 1.5.5 checkpoint cycles works as expected as can be seen from the logs
> (please note that sample project contains a small hack in
> org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster
> from
> stopping) :
>
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> [2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> 
>
> [2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> [2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
>
> However for 1.12.0 checkpoint cycles stuck at initial checkpoint:
>
> *[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339584496 for job ce255b141393a358db734db2d27ef0ea.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
>
> As far as I see, checkpoint cycle is stuck at waiting in
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator for
> coordinatorCheckpointsComplete although coordinatorsToCheckpoint is
> empty...
>
>
> final CompletableFuture coordinatorCheckpointsComplete =
> pendingCheckpointCompletableFuture
>
> .thenComposeAsync((pendingCheckpoint) ->
>
>
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
>
> coordinatorsToCheckpoint, pendingCheckpoint, timer),
> timer);
>
>
> Simply returning from
>
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
> when there is no coordinatorsToCheckpoint seems to resolve the problem:
>
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
>
> I have also created an issue for this
>
> https://issues.apache.org/jira/browse/FLINK-21248
>
>
> Please help me if I am missing something or there is another solution
> without code change.
>
> We need to perform the upgrade and modify our jobs as soon as possible (I
> hope other breaking changes do not happen) so any help will be
> appreciated..
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink cli to upload flink jar and run flink jobs in the Jenkins pipeline

2021-02-03 Thread sidhant gupta
Is it possible to use flink CLI instead of flink client for connecting
zookeeper using network load balancer to retrieve the leader Jobmanager
address?

On Wed, Feb 3, 2021, 12:42 PM Yang Wang  wrote:

> I think the Flink client could make a connection with ZooKeeper via the
> network load balancer.
> Flink client is not aware of whether it is a network balancer or multiple
> ZooKeeper server address.
> After then Flink client will retrieve the active leader JobManager address
> via ZooKeeperHAService
> and submit the job successfully via rest client.
>
> Best,
> Yang
>
>
> sidhant gupta  于2021年2月2日周二 下午11:14写道:
>
>> Hi
>>
>> I have a flink ECS cluster setup with HA mode using zookeeper where I
>> have 2 jobmanagers out of which one of will be elected as leader using
>> zookeeper leader election. I have one application load balancer in front of
>> the jobmanagers and one network load balancer in front of zookeeper.
>>
>> As per [1]
>> 
>>  ,
>> we can provide zookeeper address in the flink cli arguments and it would
>> upload/ submit the jar to the leader jobmanager. But since I am using
>> network load balancer in front of zookeeper, I guess it is not able to make
>> connection with the zookeeper. Please provide suggestions or sample command
>> for uploading the flink job jar or run the job.
>>
>> Is  there any way by which we can distinguish between leader and standby
>> jobmanagers in terms of request or response ?
>>
>> Can we use flink cli in jenkins to upload the jar to the flink cluster
>> and run the jobs?
>>
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Best-way-to-find-the-current-alive-jobmanager-with-HA-mode-zookeeper-td21787.html
>>
>> Thanks
>> Sidhant Gupta
>>
>


Re: flink kryo exception

2021-02-03 Thread Till Rohrmann
>From these snippets it is hard to tell what's going wrong. Could you maybe
give us a minimal example with which to reproduce the problem?
Alternatively, have you read through Flink's serializer documentation [1]?
Have you tried to use a simple POJO instead of inheriting from a HashMap?

The stack trace looks as if the job fails deserializing some key of your
MapRecord map.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues

Cheers,
Till

On Wed, Feb 3, 2021 at 11:49 AM 赵一旦  wrote:

> Some facts are possibly related with these, since another job do not meet
> these expectations.
> The problem job use a class which contains a field of Class MapRecord, and
> MapRecord is defined to extend HashMap so as to accept variable json data.
>
> Class MapRecord:
>
> @NoArgsConstructor
> @Slf4j
> public class MapRecord extends HashMap implements 
> Serializable {
> @Override
> public void setTimestamp(Long timestamp) {
> put("timestamp", timestamp);
> put("server_time", timestamp);
> }
>
> @Override
> public Long getTimestamp() {
> try {
> Object ts = getOrDefault("timestamp", getOrDefault("server_time", 
> 0L));
> return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
> } catch (Exception e) {
> log.error("Error, MapRecord's timestamp invalid.", e);
> return 0L;
> }
> }
> }
>
> Class UserAccessLog:
>
> public class UserAccessLog extends AbstractRecord {
> private MapRecord d;  // I think this is related to the problem...
>
> ... ...
>
> }
>
>
> 赵一旦  于2021年2月3日周三 下午6:43写道:
>
>> Actually the exception is different every time I stop the job.
>> Such as:
>> (1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
>> The stack as I given above.
>>
>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
>> 2021-02-03 18:37:24
>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
>> at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>> at java.util.ArrayList.get(ArrayList.java:433)
>> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
>> MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:135)
>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>> .deserialize(KryoSerializer.java:346)
>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>> .deserialize(PojoSerializer.java:411)
>> at org.apache.flink.streaming.runtime.streamrecord.
>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>> at org.apache.flink.streaming.runtime.streamrecord.
>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>> at org.apache.flink.runtime.plugable.
>> NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate
>> .java:55)
>> at org.apache.flink.runtime.io.network.api.serialization.
>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:145)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:67)
>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>> .processInput(StreamTwoInputProcessor.java:92)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:372)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:186)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:575)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:539)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> (3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered
>> class ID: 96
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID: 96
>> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
>> DefaultClassResolver.java:119)
>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:135)
>> at 

DynamicTableSink's equivalent of UpsertStreamTableSink.setKeyFields

2021-02-03 Thread Yuval Itzchakov
Hi,
I'm reworking an existing UpsertStreamTableSink into the new
DynamicTableSink API. In the previous API, one would get the unique keys
for upsert queries via the `setKeyFields` method, which would calculate
them based on the grouping keys during the translation phase.

Searching around, I saw that JDBC (
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#key-handling)
relies on explicit key passing via the PRIMARY KEY constraint. However,
this would require additional manual insertion which I am trying to avoid.

What would be the proper way to receive the unique keys for upsert queries
with the new DynamicTableSink API?

-- 
Best Regards,
Yuval Itzchakov.


Checkpoint problem in 1.12.0

2021-02-03 Thread simpleusr
Hi

I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems
to be broken in our kafka connector sourced datastream jobs.

Since there is a siginificant version gap and there are many backwards
uncompatible / deprecated changes in flink runtime between versions, I had
to modify our jobs and noticed that checkpoint offsets are not committed to
kafka for source connectors.

To simplfiy the issues I created simple repoducer projects:

https://github.com/simpleusr/flink_problem_1.5.5

https://github.com/simpleusr/flink_problem_1.12.0

It seems that there are major changes in the checkpoint infrastructure.

For 1.5.5 checkpoint cycles works as expected as can be seen from the logs
(please note that sample project contains a small hack in
org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster from
stopping) :

*[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)

[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)



[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)

[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*

However for 1.12.0 checkpoint cycles stuck at initial checkpoint:

*[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
1612339584496 for job ce255b141393a358db734db2d27ef0ea.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*

As far as I see, checkpoint cycle is stuck at waiting in
org.apache.flink.runtime.checkpoint.CheckpointCoordinator for
coordinatorCheckpointsComplete although coordinatorsToCheckpoint is empty...

 
final CompletableFuture coordinatorCheckpointsComplete =
pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->

OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(

coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);


Simply returning from
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
when there is no coordinatorsToCheckpoint seems to resolve the problem:

*[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)

[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)

[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)

[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)

[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)

[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)

[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)

[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*

I have also created an issue for this

https://issues.apache.org/jira/browse/FLINK-21248


Please help me if I am missing something or there is another solution
without code change.

We need to perform the upgrade and modify our jobs as soon as possible (I
hope other breaking changes do not happen) so any help will be appreciated..



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: 请问,flink支持StreamFileSink在将pending文件转为finished的时候做一些操作吗

2021-02-03 Thread Paul Lam
如果使用 RollOnCheckpoint 的文件滚动策略,可以开发一个 UDF 实现 CheckpointListener 接口,在 
notifyCheckpointComplete 函数里面发消息。不过要注意这个消息可能会重复。

Best,
Paul Lam

> 2021年2月3日 17:36,上官 <17635713...@163.com> 写道:
> 
> 各位大神,我的工作需要Flink将DataStream中的数据写入到HDFS上,我需要在flink将写入文件变为下游可读的时候,发送一个消息到消息队列,请问Flink支持这种操作吗



Re: flink kryo exception

2021-02-03 Thread 赵一旦
Some facts are possibly related with these, since another job do not meet
these expectations.
The problem job use a class which contains a field of Class MapRecord, and
MapRecord is defined to extend HashMap so as to accept variable json data.

Class MapRecord:

@NoArgsConstructor
@Slf4j
public class MapRecord extends HashMap implements Serializable {
@Override
public void setTimestamp(Long timestamp) {
put("timestamp", timestamp);
put("server_time", timestamp);
}

@Override
public Long getTimestamp() {
try {
Object ts = getOrDefault("timestamp",
getOrDefault("server_time", 0L));
return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
} catch (Exception e) {
log.error("Error, MapRecord's timestamp invalid.", e);
return 0L;
}
}
}

Class UserAccessLog:

public class UserAccessLog extends AbstractRecord {
private MapRecord d;  // I think this is related to the problem...

... ...

}


赵一旦  于2021年2月3日周三 下午6:43写道:

> Actually the exception is different every time I stop the job.
> Such as:
> (1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
> The stack as I given above.
>
> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
> 2021-02-03 18:37:24
> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
> at java.util.ArrayList.rangeCheck(ArrayList.java:657)
> at java.util.ArrayList.get(ArrayList.java:433)
> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
> MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:135)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:411)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
> .read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:92)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:145)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
> .processInput(StreamTwoInputProcessor.java:92)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:372)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:575)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:539)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
>
> (3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered
> class ID: 96
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
> 96
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:119)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:135)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:411)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
> 

Re: Max with retract aggregate function does not support type: ''CHAR''.

2021-02-03 Thread Yuval Itzchakov
I can understand that in some sense it's nonsensical to MAX on a CHAR,
since Blink will only determine a CHAR when there's a constant in the SQL,
but I was surprised that it didn't work with just an identity
implementation.

On Wed, Feb 3, 2021 at 12:33 PM Till Rohrmann  wrote:

> Thanks for reaching out to the Flink community Yuval. I am pulling in Timo
> and Jark who might be able to answer this question. From what I can tell,
> it looks a bit like an oversight because VARCHAR is also supported.
>
> Cheers,
> Till
>
> On Tue, Feb 2, 2021 at 6:12 PM Yuval Itzchakov  wrote:
>
>> Hi,
>> I'm trying to use MAX on a field that is statically known from another
>> table (let's ignore why for a moment). While running the SQL query, I
>> receive an error:
>>
>> Max with retract aggregate function does not support type: ''CHAR''.
>>
>> Looking at the code for creating the max function:
>>
>> [image: image.png]
>>
>> It does seem like all primitives are supported. Is there a particular
>> reason why a CHAR would not be supported? Is this an oversight?
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

-- 
Best Regards,
Yuval Itzchakov.


Re: flink kryo exception

2021-02-03 Thread 赵一旦
Actually the exception is different every time I stop the job.
Such as:
(1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
The stack as I given above.

(2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
2021-02-03 18:37:24
java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)

(3)  com.esotericsoftware.kryo.KryoException: Encountered unregistered class
ID: 96
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
96
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)

...

Till Rohrmann  于2021年2月3日周三 下午6:28写道:

> Hi,
>
> could you show 

Re: AbstractMethodError while writing to parquet

2021-02-03 Thread Till Rohrmann
Hi Jan,

it looks to me that you might have different parquet-avro dependencies on
your class path. Could you make sure that you don't have different versions
of the library on your classpath?

Cheers,
Till

On Tue, Feb 2, 2021 at 3:29 PM Jan Oelschlegel <
oelschle...@integration-factory.de> wrote:

> Hi at all,
>
>
>
> i’m using Flink 1.11 with the datastream api. I would like to write my
> results in parquet format into HDFS.
>
>
>
> Therefore i generated an Avro SpecificRecord with the avro-maven-plugin:
>
>
> 
> org.apache.avro
> avro-maven-plugin
> 1.8.2
> 
> 
> generate-sources
> 
> schema
> 
> 
> 
> src/main/resources/avro/
> 
> ${project.basedir}/target/generated-sources/
> String
> 
> 
> 
> 
>
>
>
>
>
> Then  I’m using the SpecificRecord in the StreamingFileSink:
>
>
> val sink: StreamingFileSink[SpecificRecord] = StreamingFileSink
>   .*forBulkFormat*(
> new Path("hdfs://example.com:8020/data/"),
> ParquetAvroWriters.*forSpecificRecord*(*classOf*[SpecificRecord])
>   )
>   .build()
>
>
>
>
>
> The job cancels with the following error:
>
>
>
>
>
> java.lang.AbstractMethodError: org.apache.parquet.hadoop.
> ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg
> /apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/
> Statistics;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/
> Encoding;Lorg/apache/parquet/column/Encoding;)V
>
> at org.apache.parquet.column.impl.ColumnWriterV1.writePage(
> ColumnWriterV1.java:53)
>
> at org.apache.parquet.column.impl.ColumnWriterBase.writePage(
> ColumnWriterBase.java:315)
>
> at org.apache.parquet.column.impl.ColumnWriteStoreBase.sizeCheck(
> ColumnWriteStoreBase.java:200)
>
> at org.apache.parquet.column.impl.ColumnWriteStoreBase.endRecord(
> ColumnWriteStoreBase.java:187)
>
> at org.apache.parquet.column.impl.ColumnWriteStoreV1.endRecord(
> ColumnWriteStoreV1.java:27)
>
> at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer
> .endMessage(MessageColumnIO.java:307)
>
> at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport
> .java:166)
>
> at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(
> InternalParquetRecordWriter.java:128)
>
> at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:
> 299)
>
> at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(
> ParquetBulkWriter.java:52)
>
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> BulkPartWriter.write(BulkPartWriter.java:48)
>
> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .write(Bucket.java:202)
>
> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
> .onElement(Buckets.java:282)
>
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingFileSink.invoke(StreamingFileSink.java:420)
>
> at org.apache.flink.streaming.api.operators.StreamSink.processElement(
> StreamSink.java:56)
>
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:52)
>
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:30)
>
> at org.apache.flink.streaming.api.operators.StreamMap.processElement(
> StreamMap.java:41)
>
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:52)
>
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> CountingOutput.java:30)
>
> at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:53)
>
> at de.integration_factory.datastream.aggregations.CasesProcessFunction
> .process(CasesProcessFunction.scala:34)
>
> at 

Re: Max with retract aggregate function does not support type: ''CHAR''.

2021-02-03 Thread Till Rohrmann
Thanks for reaching out to the Flink community Yuval. I am pulling in Timo
and Jark who might be able to answer this question. From what I can tell,
it looks a bit like an oversight because VARCHAR is also supported.

Cheers,
Till

On Tue, Feb 2, 2021 at 6:12 PM Yuval Itzchakov  wrote:

> Hi,
> I'm trying to use MAX on a field that is statically known from another
> table (let's ignore why for a moment). While running the SQL query, I
> receive an error:
>
> Max with retract aggregate function does not support type: ''CHAR''.
>
> Looking at the code for creating the max function:
>
> [image: image.png]
>
> It does seem like all primitives are supported. Is there a particular
> reason why a CHAR would not be supported? Is this an oversight?
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: flink kryo exception

2021-02-03 Thread Till Rohrmann
Hi,

could you show us the job you are trying to resume? Is it a SQL job or a
DataStream job, for example?

>From the stack trace, it looks as if the class g^XT is not on the class
path.

Cheers,
Till

On Wed, Feb 3, 2021 at 10:30 AM 赵一旦  wrote:

> I have a job, the checkpoint and savepoint all right.
> But, if I stop the job using 'stop -p', after the savepoint generated,
> then the job goes to fail. Here is the log:
>
> 2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task
>  [] - ual_ft_uid_subid_SidIncludeFilter ->
> ual_ft_uid_subid_Default
> PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor ->
> ual_ft_uid_subid_EmptyUidFilter (17/30)#0
> (46abce5d1148b56094726d442df2fd9c) switched
> from RUNNING to FAILED.
>
> com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> [flink-dist_2.11-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
> Caused by: java.lang.ClassNotFoundException: g^XT
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> ~[?:1.8.0_251]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> ~[?:1.8.0_251]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ~[?:1.8.0_251]
> at
> 

Re: HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 Thread 陈康
 
https://blog.csdn.net/weixin_44904816/article/details/108744530
看到一篇博客说:“PyFlink以后还可以支持 Tensorflow、Keras”.好吧..



--
Sent from: http://apache-flink.147419.n8.nabble.com/


请问,flink支持StreamFileSink在将pending文件转为finished的时候做一些操作吗

2021-02-03 Thread 上官
各位大神,我的工作需要Flink将DataStream中的数据写入到HDFS上,我需要在flink将写入文件变为下游可读的时候,发送一个消息到消息队列,请问Flink支持这种操作吗

flink kryo exception

2021-02-03 Thread 赵一旦
I have a job, the checkpoint and savepoint all right.
But, if I stop the job using 'stop -p', after the savepoint generated, then
the job goes to fail. Here is the log:

2021-02-03 16:53:55,179 WARN  org.apache.flink.runtime.taskmanager.Task
   [] - ual_ft_uid_subid_SidIncludeFilter ->
ual_ft_uid_subid_Default
PassThroughFilter[null, null) -> ual_ft_uid_subid_UalUidFtExtractor ->
ual_ft_uid_subid_EmptyUidFilter (17/30)#0
(46abce5d1148b56094726d442df2fd9c) switched
from RUNNING to FAILED.

com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:92)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException: g^XT
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
~[?:1.8.0_251]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
~[?:1.8.0_251]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
~[?:1.8.0_251]
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_251]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_251]
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 22 more


HELP!!!! PyFlink UDF加载Keras模型,并注册函数出现的问题

2021-02-03 Thread 陈康
在pyflink自定义UDF加载Keras模型并注册UDF时、报错:TypeError: can't pickle _thread.lock
objects、有大佬遇到过吗?谢谢!(插入的图不知看不看的到)

class myKerasMLP(ScalarFunction):
def __init__(self):
...

def open(self, function_context):
...

def eval(self, x, y):
...

def load_model(self):
"""
加载模型,如果 redis 里存在模型,则优先从 redis 加载,否则初始化一个新模型
:return:
"""
import redis
import pickle
import logging

logging.info('载入模型!')
r = redis.StrictRedis(**self.redis_params)
model = None

try:
# redis加载model json
model = model_from_json(r.get(self.model_name))
# redis加载model权重
weights = pickle.loads(r.get(self.weights))
# # 设置权重
model.set_weights(weights)
model.summary()
except TypeError:
logging.info('Redis 内没有指定名称的模型,因此初始化一个新模型')
except (redis.exceptions.RedisError, TypeError, Exception):
logging.warning('Redis 出现异常,因此初始化一个新模型')
finally:
print("MLP model", model)
return model

myKerasMLP = udf(myKerasMLP(), input_types=[DataTypes.FLOAT(),
DataTypes.FLOAT()],
 result_type=DataTypes.FLOAT())
print('UDF 模型加载完成!')
t_env.create_temporary_system_function('train_and_predict', myKerasMLP)
print('UDF 注册成功')
---
_
Layer (type) Output Shape  Param #   
=
dense_1 (Dense)  (None, 8) 72
_
dense_2 (Dense)  (None, 10)90
_
dense_3 (Dense)  (None, 1) 11
=
Total params: 173
Trainable params: 173
Non-trainable params: 0
_
MLP model 
UDF 模型加载完成!


 
 




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Question

2021-02-03 Thread Chesnay Schepler

Sure.

https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint

https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper
https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper

On 2/3/2021 3:08 AM, Abu Bakar Siddiqur Rahman Rocky wrote:

Hi,

Is there any source code for the checkpoints, snapshot and zookeeper 
mechanism?


Thank you

On Mon, Feb 1, 2021 at 4:23 AM Chesnay Schepler > wrote:


Could you expand a bit on what you mean? Are you referring to
/savepoints/?

On 1/28/2021 3:24 PM, Abu Bakar Siddiqur Rahman Rocky wrote:

Hi,

Is there any library to use and remember the apache flink snapshot?

Thank you

--
Regards,
Abu Bakar Siddiqur Rahman






--
Regards,
Abu Bakar Siddiqur Rahman
Graduate Research Student
Natural Language Processing Laboratory
Centro de Investigacion en Computacion
Instituto Politecnico Nacional, Mexico City





Re: Memory usage increases on every job restart resulting in eventual OOMKill

2021-02-03 Thread Yun Tang
Hi Randal,

Please consider to use jemalloc instead of glibc as default memory allocator 
[1] to avoid memory fragmentation. As far as I know, at least two groups of 
users, who run Flink on YARN and k8s respectively, have reported similar 
problem that memory continues growing up once restart [2]. The problem both 
went away once they adopt to use JeMalloc.

[1] https://issues.apache.org/jira/browse/FLINK-19125
[2] https://issues.apache.org/jira/browse/FLINK-18712

Best
Yun Tang

From: Lasse Nedergaard 
Sent: Wednesday, February 3, 2021 14:07
To: Xintong Song 
Cc: user 
Subject: Re: Memory usage increases on every job restart resulting in eventual 
OOMKill

Hi

We had something similar and our problem was class loader leaks. We used a 
summary log component to reduce logging but still turned out that it used a 
static object that wasn’t released when we got an OOM or restart. Flink was 
reusing task managers so only workaround was to stop the job wait until they 
was removed and start again until we fixed the underlying problem.

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 3. feb. 2021 kl. 02.54 skrev Xintong Song :


How is the memory measured?
I meant which flink or k8s metric is collected? I'm asking because depending on 
which metric is used, the *container memory usage* can be defined differently. 
E.g., whether mmap memory is included.

Also, could you share the effective memory configurations for the taskmanagers? 
You should find something like the following at the beginning of taskmanger 
logs.

INFO  [] - Final TaskExecutor Memory configuration:
INFO  [] -   Total Process Memory:  1.688gb (1811939328 bytes)
INFO  [] - Total Flink Memory:  1.250gb (1342177280 bytes)
INFO  [] -   Total JVM Heap Memory: 512.000mb (536870902 bytes)
INFO  [] - Framework:   128.000mb (134217728 bytes)
INFO  [] - Task:384.000mb (402653174 bytes)
INFO  [] -   Total Off-heap Memory: 768.000mb (805306378 bytes)
INFO  [] - Managed: 512.000mb (536870920 bytes)
INFO  [] - Total JVM Direct Memory: 256.000mb (268435458 bytes)
INFO  [] -   Framework: 128.000mb (134217728 bytes)
INFO  [] -   Task:  0 bytes
INFO  [] -   Network:   128.000mb (134217730 bytes)
INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
INFO  [] - JVM Overhead:192.000mb (201326592 bytes)


Thank you~

Xintong Song


On Tue, Feb 2, 2021 at 8:59 PM Randal Pitt 
mailto:randal.p...@foresite.com>> wrote:
Hi Xintong Song,

Correct, we are using standalone k8s. Task managers are deployed as a
statefulset so have consistent pod names. We tried using native k8s (in fact
I'd prefer to) but got persistent
"io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 242214695 (242413759)" errors which resulted in jobs being
restarted every 30-60 minutes.

We are using Prometheus Node Exporter to capture memory usage. The graph
shows the metric:

sum(container_memory_usage_bytes{container_name="taskmanager",pod_name=~"$flink_task_manager"})
by (pod_name)

I've  attached the original

so Nabble doesn't shrink it.

Best regards,

Randal.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/