Re: question on ValueState

2021-02-08 Thread yidan zhao
I have a related question.
Since fileStateBackend uses heap as the state storage and the checkpoint is
finally stored in the filesystem, so whether the JobManager/TaskManager
memory will limit the state size? The state size is limited by TM's memory
* number of TMs? or limited by JM's memory.


Khachatryan Roman  于2021年2月8日周一 下午6:05写道:

> Hi,
>
> I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the
> value on update.
> As for "value()", it may (de)serialize it and return a copy if there is an
> ongoing async snapshot in progress (to protect from modifications). This
> shouldn't happen often though.
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 3:24 AM Yun Tang  wrote:
>
>> Hi,
>>
>> MemoryStateBackend and FsStateBackend both hold keyed state in
>> HeapKeyedStateBackend [1], and the main structure to store data is
>> StateTable [2] which holds POJO format objects. That is to say, the object
>> would not be serialized when calling update().
>> On the other hand, RocksDB statebackend would store value with serialized
>> bytes.
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
>> [2]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>>
>> Best
>> Yun Tang
>>
>> --
>> *From:* Colletta, Edward 
>> *Sent:* Sunday, February 7, 2021 19:53
>> *To:* user@flink.apache.org 
>> *Subject:* question on ValueState
>>
>>
>> Using FsStateBackend.
>>
>>
>>
>> I was under the impression that ValueState.value will serialize an object
>> which is stored in the local state backend, copy the serialized object and
>> deserializes it.  Likewise update() would do the same steps copying the
>> object back to local state backend.And as a consequence, storing
>> collections in ValueState is much less efficient than using ListState or
>> MapState if possible.
>>
>>
>>
>> However, I am looking at some code I wrote a while ago which made the
>> assumption that the value() method just returned a reference to the
>> object.  The code only calls update() when creating the object if value()
>> returns null.Yet the code works, all changes to the object stored in
>> state are visible the next time value() is called.   I have some sample
>> code below.
>>
>>
>>
>> Can someone clarify what really happens when value() is called?
>>
>>
>>
>>
>>
>>public void processElement(M in, Context ctx, Collector out)
>> throws Exception {
>>
>> MyWindow myWindow;
>>
>> myWindow = windowState.value();
>>
>> if (myWindow == null) {
>>
>>
>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>> + interval) / interval) * interval);
>>
>> myWindow = new MyWindow(0L, slide, windowSize);
>>
>> windowState.update(myWindow);
>>
>> myWindow.eq.add(0L);
>>
>> }
>>
>>
>> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
>> + in.value);
>>
>> }
>>
>>
>>
>> @Override
>>
>> public void onTimer(long timestamp, OnTimerContext ctx,
>> Collector out) throws Exception {
>>
>>
>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>> + interval) / interval) * interval);
>>
>> MyWindow myWindow = windowState.value();
>>
>> myWindow.slide(0L);
>>
>> out.collect(myWindow.globalAccum);
>>
>> }
>>
>>
>>
>>
>>
>


Re: question on ValueState

2021-02-08 Thread yidan zhao
What I am interested in is whether I should use rocksDB to replace
fileBackend.
RocksDB's performance is not good, while it's state size can be very large.
Currently, my job's state is about 10GB, and I use 10 TaskManagers in
different machines, each 100G memory. I do not think I should use rocksDB,
is it right?

yidan zhao  于2021年2月9日周二 下午3:50写道:

> I have a related question.
> Since fileStateBackend uses heap as the state storage and the checkpoint
> is finally stored in the filesystem, so whether the JobManager/TaskManager
> memory will limit the state size? The state size is limited by TM's memory
> * number of TMs? or limited by JM's memory.
>
>
> Khachatryan Roman  于2021年2月8日周一 下午6:05写道:
>
>> Hi,
>>
>> I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the
>> value on update.
>> As for "value()", it may (de)serialize it and return a copy if there is
>> an ongoing async snapshot in progress (to protect from modifications). This
>> shouldn't happen often though.
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Feb 8, 2021 at 3:24 AM Yun Tang  wrote:
>>
>>> Hi,
>>>
>>> MemoryStateBackend and FsStateBackend both hold keyed state in
>>> HeapKeyedStateBackend [1], and the main structure to store data is
>>> StateTable [2] which holds POJO format objects. That is to say, the object
>>> would not be serialized when calling update().
>>> On the other hand, RocksDB statebackend would store value with
>>> serialized bytes.
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>>>
>>> Best
>>> Yun Tang
>>>
>>> --
>>> *From:* Colletta, Edward 
>>> *Sent:* Sunday, February 7, 2021 19:53
>>> *To:* user@flink.apache.org 
>>> *Subject:* question on ValueState
>>>
>>>
>>> Using FsStateBackend.
>>>
>>>
>>>
>>> I was under the impression that ValueState.value will serialize an
>>> object which is stored in the local state backend, copy the serialized
>>> object and deserializes it.  Likewise update() would do the same steps
>>> copying the object back to local state backend.And as a consequence,
>>> storing collections in ValueState is much less efficient than using
>>> ListState or MapState if possible.
>>>
>>>
>>>
>>> However, I am looking at some code I wrote a while ago which made the
>>> assumption that the value() method just returned a reference to the
>>> object.  The code only calls update() when creating the object if value()
>>> returns null.Yet the code works, all changes to the object stored in
>>> state are visible the next time value() is called.   I have some sample
>>> code below.
>>>
>>>
>>>
>>> Can someone clarify what really happens when value() is called?
>>>
>>>
>>>
>>>
>>>
>>>public void processElement(M in, Context ctx, Collector out)
>>> throws Exception {
>>>
>>> MyWindow myWindow;
>>>
>>> myWindow = windowState.value();
>>>
>>> if (myWindow == null) {
>>>
>>>
>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>>> + interval) / interval) * interval);
>>>
>>> myWindow = new MyWindow(0L, slide, windowSize);
>>>
>>> windowState.update(myWindow);
>>>
>>> myWindow.eq.add(0L);
>>>
>>> }
>>>
>>>
>>> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
>>> + in.value);
>>>
>>> }
>>>
>>>
>>>
>>> @Override
>>>
>>> public void onTimer(long timestamp, OnTimerContext ctx,
>>> Collector out) throws Exception {
>>>
>>>
>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>>> + interval) / interval) * interval);
>>>
>>> MyWindow myWindow = windowState.value();
>>>
>>> myWindow.slide(0L);
>>>
>>> out.collect(myWindow.globalAccum);
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>>


Re: Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Arvid Heise
Hi Jan,

Another solution is to insert Heartbeat-events at the source for each
sensor. The solution is very similar to how to advance watermarks when
there are no elements in the respective source partition.

However, it's only easy to implement if you have your own source and know
all sensors on application start. It might also be possible to implement if
you use a new Source interface.

On Tue, Feb 9, 2021 at 7:20 AM Yun Gao  wrote:

>
> Hi,
>
> I also think there should be different ways to achieve the target. For the
> first option listed previously,
> the pseudo-code roughly like
>
> class MyFunciton extends KeyedProcessFunction {
> ValueState count;
>
> void open() {
>count = ... // Create the value state
>}
>
> ​void processElement(T t, Context context, Collector collector) {
> ​Integer current = count.get();
> if (current == null) {
>   context.timeService().registerTimer(30); // Register
> timer for the first time
>   current = 0;
> }
>
> count.update(current + 1); // update the count
> }
>
> void onTimer(...) {
>  collector.collect(new Tuple2<>(getCurrentKey(), count.get());
>   context.timeService().registerTimer(30);  // register the
> following timer
> }
> }
>
> 1. For flink the state and timer are all bound to a key implicitly, thus I
> think they should
> not need to be bound manually.
> 2. To clear the outdated state, it could be cleared via count.clear(); if
> it has been 0
> for a long time. There are different ways to count the interval, like
> register another timer
> and clear the timer when received the elements or update the counter to
> -1, -2... to mark
> how much timer it has passed.
>
>
> Best,
>  Yun
>
>
>
>
> --Original Mail --
> *Sender:*Khachatryan Roman 
> *Send Date:*Tue Feb 9 02:35:20 2021
> *Recipients:*Jan Brusch 
> *CC:*Yun Gao , user 
> *Subject:*Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem
>
>> Hi,
>>
>> Probably another solution would be to register a timer
>> (using KeyedProcessFunction) once we see an element after keyBy. The timer
>> will fire in windowIntervalMs. Upon firing, it will emit a dummy element
>> which will be ignored (or subtracted) in the end.
>> Upon receiving each new element, the function will shift the timer
>> accordingly.
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch 
>> wrote:
>>
>>> Hi Yun,
>>>
>>> thanks for your reply.
>>>
>>> I do agree with your point about standard windows being for high level
>>> operations and the lower-level apis offering a rich toolset for most
>>> advanced use cases.
>>>
>>> I have tried to solve my problem with keyedProcessFunctions also but was
>>> not able to get it to work for two reasons:
>>>
>>> 1) I was not able to set up a combination of ValueState, Timers and
>>> Triggers that emulated a sliding window with a rising and falling count
>>> (including 0) good enough.
>>>
>>> 2) Memory Leak: States / Windows should be cleared after a certain time
>>> of being at count 0 in order to prevent an infinitely rising of ValueStates
>>> (that are not needed anymore)
>>>
>>>
>>> Can you maybe please elaborate in pseudocode how you would envision your
>>> solution?
>>>
>>>
>>> Best regards
>>>
>>> Jan
>>> On 08.02.21 05:31, Yun Gao wrote:
>>>
>>> Hi Jan,
>>>
>>> From my view, I think in Flink Window should be as a "high-level"
>>> operation for some kind
>>> of aggregation operation and if it could not satisfy the requirements,
>>> we could at least turn to
>>> using the "low-level" api by using KeyedProcessFunction[1].
>>>
>>> In this case, we could use a ValueState to store the current value for
>>> each key, and increment
>>> the value on each element. Then we could also register time for each key
>>> on receiving the first
>>> element for this key,  and in the onTimer callback, we could send the
>>> current state value, update
>>> the value to 0 and register another timer for this key after 30s.
>>>
>>> Best,
>>>  Yun
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>>
>>> --Original Mail --
>>> *Sender:*Jan Brusch 
>>> 
>>> *Send Date:*Sat Feb 6 23:44:00 2021
>>> *Recipients:*user  
>>> *Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem
>>>
 Hi,
 I was recently working on a problem where we wanted to implement a
 simple count on a sliding window, e.g. "how many messages of a certain
 type were emitted by a certain type of sensor in the last n minutes".
 Which sounds simple enough in theory:

 messageStream
  .keyBy(//EmitterType + MessageType)
  .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
 Time.seconds(30)))
  .map(_ => 1)
  .reduce((x,y) => x + y)
  

Re: 关于非keyedstream使用定时器问题

2021-02-08 Thread yidan zhao
当然,如果是 randomeKey %30 这样,虽然最终效果差不多,但却导致30个sink batch可能都集中到某几个并发实例上。

yidan zhao  于2021年2月9日周二 下午3:22写道:

> 引入随机key最大问题是,本身希望batch方式sink,key太随机压根无法batch。
> 如果randomKey%1024这样分桶到1024的话,也不行,会导致每个bucket中数据量太少,进而基本都是timeout而sink,而不是达到batchSize,换言之,每次sink都会有1024个并发sink次数。
> 实际后端的存储可能不期望这么高并发,本身做batch的目的就是希望降低sink次数。
>
> 我希望的是按照并发度(比如30并发度)就每次sink30次(或超过30次,因为可能>batchSize,如果都
> yidan zhao  于2021年2月9日周二 下午3:04写道:
>
>> 如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?
>>
>> 目前我实现一个sink,带超时希望用到timerservice。但是不支持。
>> 同时不希望使用keyedStream,因为会导致数据不均衡。
>>
>> 除了引入随机key外还有什么方法吗。
>>
>


Re: 关于非keyedstream使用定时器问题

2021-02-08 Thread yidan zhao
引入随机key最大问题是,本身希望batch方式sink,key太随机压根无法batch。
如果randomKey%1024这样分桶到1024的话,也不行,会导致每个bucket中数据量太少,进而基本都是timeout而sink,而不是达到batchSize,换言之,每次sink都会有1024个并发sink次数。
实际后端的存储可能不期望这么高并发,本身做batch的目的就是希望降低sink次数。
我希望的是按照并发度(比如30并发度)就每次sink30次(或超过30次,因为可能>batchSize,如果都 于2021年2月9日周二 下午3:04写道:

> 如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?
>
> 目前我实现一个sink,带超时希望用到timerservice。但是不支持。
> 同时不希望使用keyedStream,因为会导致数据不均衡。
>
> 除了引入随机key外还有什么方法吗。
>


关于非keyedstream使用定时器问题

2021-02-08 Thread yidan zhao
如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?

目前我实现一个sink,带超时希望用到timerservice。但是不支持。
同时不希望使用keyedStream,因为会导致数据不均衡。

除了引入随机key外还有什么方法吗。


Re: Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Yun Gao

Hi,

I also think there should be different ways to achieve the target. For the 
first option listed previously, 
the pseudo-code roughly like

class MyFunciton extends KeyedProcessFunction {
ValueState count;

void open() {
   count = ... // Create the value state
   }  

​void processElement(T t, Context context, Collector collector) {
​Integer current = count.get();
if (current == null) {
  context.timeService().registerTimer(30); // Register 
timer for the first time
  current = 0;
}

count.update(current + 1); // update the count
}

void onTimer(...) {
 collector.collect(new Tuple2<>(getCurrentKey(), count.get());
 context.timeService().registerTimer(30);  // register the following timer
}
}

1. For flink the state and timer are all bound to a key implicitly, thus I 
think they should
not need to be bound manually.
2. To clear the outdated state, it could be cleared via count.clear(); if it 
has been 0 
for a long time. There are different ways to count the interval, like register 
another timer
and clear the timer when received the elements or update the counter to -1, 
-2... to mark
how much timer it has passed.


Best,
 Yun





 --Original Mail --
Sender:Khachatryan Roman 
Send Date:Tue Feb 9 02:35:20 2021
Recipients:Jan Brusch 
CC:Yun Gao , user 
Subject:Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

Hi,

Probably another solution would be to register a timer (using 
KeyedProcessFunction) once we see an element after keyBy. The timer will fire 
in windowIntervalMs. Upon firing, it will emit a dummy element which will be 
ignored (or subtracted) in the end.
Upon receiving each new element, the function will shift the timer accordingly.

Regards,
Roman

On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch  wrote:

Hi Yun,
thanks for your reply.
I do agree with your point about standard windows being for high level 
operations and the lower-level apis offering a rich toolset for most advanced 
use cases.
I have tried to solve my problem with keyedProcessFunctions also but was not 
able to get it to work for two reasons:
1) I was not able to set up a combination of ValueState, Timers and Triggers 
that emulated a sliding window with a rising and falling count (including 0) 
good enough.
2) Memory Leak: States / Windows should be cleared after a certain time of 
being at count 0 in order to prevent an infinitely rising of ValueStates (that 
are not needed anymore)

Can you maybe please elaborate in pseudocode how you would envision your 
solution?

Best regards
Jan
On 08.02.21 05:31, Yun Gao wrote:

Hi Jan,

From my view, I think in Flink Window should be as a "high-level" operation for 
some kind
of aggregation operation and if it could not satisfy the requirements, we could 
at least turn to
using the "low-level" api by using KeyedProcessFunction[1].

In this case, we could use a ValueState to store the current value for each 
key, and increment
the value on each element. Then we could also register time for each key on 
receiving the first 
element for this key,  and in the onTimer callback, we could send the current 
state value, update
the value to 0 and register another timer for this key after 30s.

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction


 --Original Mail --
Sender:Jan Brusch 
Send Date:Sat Feb 6 23:44:00 2021
Recipients:user 
Subject:Sliding Window Count: Tricky Edge Case / Count Zero Problem 
Hi,
I was recently working on a problem where we wanted to implement a 
simple count on a sliding window, e.g. "how many messages of a certain 
type were emitted by a certain type of sensor in the last n minutes". 
 Which sounds simple enough in theory:

 messageStream
  .keyBy(//EmitterType + MessageType)
 .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), 
 Time.seconds(30)))
  .map(_ => 1)
  .reduce((x,y) => x + y)
  .addSink(...)

But there is a tricky edge case: The downstream systems will never know 
when the count for a certain key goes back to 0, which is important for 
our use case. The technical reason being that flink doesn't open a 
window if there are no entries, i.e. a window with count 0 doesn't exist 
 in flink.

 We came up with the following solution for the time being:

 messageStream
  .keyBy(//EmitterType + MessageType)
  .window(GlobalWindows.create())
 .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
 .evictor(// CustomEvictor: Evict all messages older than n minutes 
 BEFORE processing the window)
 .process(// CustomCounter: Count all Messages in Window State);
  .addSink(...)

In the case of zero messages in the last n minutes, all messages will be 
evicted from the window and the process-function will get triggered one 

Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread Yun Gao
Hi,

Have you also include the kakfa-connector related jar in the classpath?

Best,
Yun
 --Original Mail --
Sender:joris.vanagtmaal 
Send Date:Tue Feb 9 03:16:52 2021
Recipients:User-Flink 
Subject:Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector
Traceback (most recent call last):
  File "streaming-dms.py", line 309, in 
anomalies()
  File "streaming-dms.py", line 142, in anomalies
t_env.sql_query(query).insert_into("ark_sink")
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 748, in sql_query
j_table = self._j_tenv.sqlQuery(query)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 162, in deco
raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
 at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
 at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
 at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
 at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.base/java.lang.Thread.run(Thread.java:834)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: fink on yarn per job container 被杀

2021-02-08 Thread key lou
谢谢。看了相关文章 和邮件列表类似的问题,中心思路都是调大堆外内存。 还是有几个疑问
1、在 flink 1.10 中 在state 不断增长的情况下 是否没办法控制 rocksdb 内存的增长? 导致 有container 被
kill 的风险。rocksdb 没有当内存不足时就clear 内存刷磁盘的动作?
 2、当使用 rocksdbStateBackend 时 如果配置的是 hdfs 路径。rocksdb 是否还会有本地文件生成。在 tm
节点上一直没有找到相关文件。


zhiyezou <1530130...@qq.com> 于2021年2月7日周日 上午9:41写道:

> Hi
> 实在不好意思,没事先检查链接。
> https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==mid=2247490197idx=1sn=b0893a9bf12fbcae76852a156302de95
> 可以先看下是否正确配置了state ttl,如果配置了还出现此类问题那应该还是managed memory这块的问题,相关配置参考链接文档
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> louke...@gmail.com;
> 发送时间:2021年2月5日(星期五) 下午2:03
> 收件人:"user-zh"
> 主题:Re: fink on yarn per job container 被杀
>
>
>
> 谢谢 回答.
>  是指的这个参数 taskmanager.memory.jvm-overhead 调整吗(微信连接有点问题)?
> 我看邮件列表很多大佬的回答基本上都是要调大堆外内存。
> 难道 rocksdb 就申请内存的时候就控制不住上限吗?我的state 是会一直增长,但是我希望rocksDB 内存能在我设置的范围内,不至于被
> yarn kill . 这块要能实现吗?
>
> zhiyezou <1530130...@qq.com 于2021年2月5日周五 下午1:25写道:
>
>  Hi
>  nbsp; nbsp;可以先jvm-overhead相关配置,具体原理及参数请参考这篇文章,
> 
> https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==amp;mid=2247490197amp;;idx=1amp;sn=b0893a9bf12fbcae76852a156302de95
> 
> 
> ;
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  louke...@gmail.comgt;;
>  发送时间:nbsp;2021年2月4日(星期四) 下午5:46
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;fink on yarn per job container 被杀
> 
> 
> 
>  各位大佬好
>  nbsp;rocksDB state 场景下 state 过大 被杀。 有啥好的解决办法? 为啥在 flink 1.10.1 中
>  taskmanager.memory.managed.sizenbsp; 限制不住 rocksDB 内存申请?改如何控制上线?
>  java.lang.Exception: Container
>  [pid=137231,containerID=container_e118_1611713951789_92045_01_03]
>  is running beyond physical memory limits. Current usage: 4.1 GB of 4
> GB
>  physical memory used; 8.3 GB of 8.4 GB virtual memory used. Killing
>  container.
>  Dump of the process-tree for
> container_e118_1611713951789_92045_01_03 :
>  nbsp;nbsp;nbsp; |- PID PPID PGRPID SESSID CMD_NAME
>  USER_MODE_TIME(MILLIS) SYSTEM_TIME(
>  MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>  nbsp;nbsp;nbsp; |- 137259 137231 137231 137231 (java)
> 3935 488
>  8764928000 1086082
>  /app/jdk/bin/java -Xmx2029372037 -Xms2029372037
> -XX:MaxDirectMemorySize=
>  493921243 -XX:MaxMetaspaceSize=268435456
> -XX:+HeapDumpOnOutOfMemoryError -
> 
> 
> Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.
>  YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=
>  134217728b -D taskmanager.memory.network.max=359703515b -D
>  taskmanager.memory.network.min=359703515b -D
>  taskmanager.memory.framework.heap.size=134217728b -D
>  taskmanager.memory.managed.size=1073741824b -D
> taskmanager.cpu.cores=1.0 -D
>  taskmanager.memory.task.heap.size=1895154309b -D
>  taskmanager.memory.task.off-heap.size=0b --configDir .
>  -Djobmanager.rpc.address=cnsz22pl377
>  -Dweb.port=0
>  -Dweb.tmpdir=/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc
>  -Djobmanager.rpc.port=18918 -Drest.address=CNSZ22PL377
>  nbsp;nbsp;nbsp; |- 137231 137229 137231 137231 (bash)
> 0 0 115855360 356
>  /bin/bash -c
>  /app/jdk/bin/java -Xmx2029372037 -Xms2029372037
> -XX:MaxDirectMemorySize=
>  493921243 -XX:MaxMetaspaceSize=268435456
> -XX:+HeapDumpOnOutOfMemoryError -
> 
> 
> Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.
>  YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=
>  134217728b -D taskmanager.memory.network.max=359703515b -D
>  taskmanager.memory.network.min=359703515b -D
>  taskmanager.memory.framework.heap.size=134217728b -D
>  taskmanager.memory.managed.size=1073741824b -D
> taskmanager.cpu.cores=1.0 -D
>  taskmanager.memory.task.heap.size=1895154309b -D
>  taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager
>  .rpc.address='cnsz22pl377' -Dweb.port='0' -Dweb.tmpdir=
>  '/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc'
>  -Djobmanager.rpc.port=
>  '18918' -Drest.address='CNSZ22PL377' 1gt;
> 
> 
> /HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.out
>  2gt;
> 
> /HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.err
> 
> 
>  *state.backend.rocksdb.memory.fixed-per-slot* 1024M
>  *state.backend.rocksdb.memory.managed* true
>  *taskmanager.memory.managed.size* 1024M


Re: Flink standalone on k8s HA异常

2021-02-08 Thread Yang Wang
启用HA以后,你需要创建一个有create/watch ConfigMap的权限的service account
然后挂载给JobManager和TaskManager
从你的报错看应该是没有配置service account

Best,
Yang


casel.chen  于2021年2月9日周二 上午12:10写道:

> 我试着答k8s上部署flink
> standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么?
>
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: oss:///odps-prd/rtdp/flink/recovery
>
>
> 2021-02-09 00:03:04,421 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not
> start cluster entrypoint StandaloneSessionClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:59)
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> Caused by: org.apache.flink.util.FlinkException: Could not create the ha
> services from the instantiated HighAvailabilityServicesFactory
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:332)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:290)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:223)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_282]
> at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> ... 2 more
> Caused by: java.lang.NullPointerException
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.(Fabric8FlinkKubeClient.java:84)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.kubeclient.DefaultKubeClientFactory.fromConfiguration(DefaultKubeClientFactory.java:88)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:38)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:332)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:290)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:223)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_282]
> at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 

Re: Native Kubernetes annotation parsing problem

2021-02-08 Thread Yang Wang
If you are setting the config options in flink-conf.yaml, then you could
directly add the following example.
*kubernetes.jobmanager.annotations:
iam.amazonaws.com/role:'arn:aws:iam:::role/XX/
'*

However, if you are using the dynamic properties in the CLI. Then the
configuration value should also
be wrapped with a double quote. It just because we need to escape the value
with single quote.

*-Dkubernetes.jobmanager.annotations="iam.amazonaws.com/role:'arn:aws:iam:::role/XX/
'"*

It seems that IAM is not a common feature in Kubernetes. But from the
documentation of AWS[1],
I think it could be specified via service account[2]. Hope this helps.

[1].
https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
[2].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html#rbac

Best,
Yang

Kevin Kwon  于2021年2月8日周一 下午11:58写道:

> I think it will be more generic question of how I inject IAM roles in
> Native Kubernetes pods
>
> I'm using Kubeiam and seems the namespace annotation doesn't work
>
> On Mon, Feb 8, 2021 at 2:30 PM Kevin Kwon  wrote:
>
>> Hi team, I'm using Native Kubernetes annotation config
>>
>>
>> *kubernetes.jobmanager.annotations*
>>
>> and I'm facing some problem with parsing.
>>
>> I use annotation
>>
>>
>> *iam.amazonaws.com/role:'arn:aws:iam:::role/XX/
>> '*
>>
>> but seems no matter what I do, the colon is getting parsed for key,
>> value. can anyone help?
>>
>


Re: Flink SQL temporal table join with Hive 报错

2021-02-08 Thread macia kk
SELECT *FROM
(
SELECT  tt.*
FROM
input_tabe_01 tt
FULL OUTER JOIN input_tabe_02 mt
ON (mt.transaction_sn = tt.reference_id)
and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES
and tt.create_time <= mt.create_time - INTERVAL '5' MINUTES
WHERE COALESCE(tt.create_time, mt.create_time) is not NULL
) lt
LEFT JOIN exchange_rate ex
/*+ 
OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'
= 'all') */
FOR SYSTEM_TIME AS OF lt.event_time ex ON DATE_FORMAT
(lt.event_time, '-MM-dd') = cast(ex.date_id as String)


Rui Li  于2021年2月9日周二 上午10:20写道:

> Hi,
>
> 那join的语句是怎么写的呢?
>
> On Mon, Feb 8, 2021 at 2:45 PM macia kk  wrote:
>
> > 图就是哪个报错
> >
> > 建表语句如下,表示公共表,我也没有改的权限.
> >
> > CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
> > 'country', `currency` string COMMENT 'currency', `exchange_rate`
> > decimal(25,10) COMMENT 'exchange rate')
> > PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd')
> > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io
> > .parquet.serde.ParquetHiveSerDe'
> > WITH SERDEPROPERTIES (
> >   'serialization.format' = '1'
> > )
> >
> >
> > Rui Li  于2021年2月8日周一 下午2:17写道:
> >
> > > 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?
> > >
> > > On Mon, Feb 8, 2021 at 10:33 AM macia kk  wrote:
> > >
> > > > Currently the join key in Temporal Table Join can not be empty.
> > > >
> > > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
> > > >
> > > > [image: image.png]
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Dongwon Kim
Hi Khachatryan,

Thanks for the explanation and the input!

1. Use the State Processor API to create a new snapshot [1]

I haven't used it. but does the API prevent the class of a specific
serializer from being loaded?

2. If the operator has only this state then changing uid (together with
> allowNonRestoredState) should help

Very unfortunately, I have another per-key state defined on the operator
which is very important and cannot be abandoned T.T

3. Probably just changing POJO to an empty class will suffice in your case?

Yeah, I might be bringing the class definition for a while.

Best,

Dongwon


On Tue, Feb 9, 2021 at 2:35 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in
> particular.
>
> From what I see, the 2nd snapshot (sp2) is built using the same set of
> states obtained from the starting savepoint/checkpoint (sp1) to write its
> metadata. This metadata includes serializers snapshots, including
> PojoSerializer for your custom type. On restore, this metadata is read, and
> POJO class itself is loaded.
>
> I see the following ways to overcome this issue:
> 1. Use the State Processor API to create a new snapshot [1]
> 2. If the operator has only this state then changing uid (together with
> allowNonRestoredState) should help
> 3. Probably just changing POJO to an empty class will suffice in your case?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim  wrote:
>
>> Hi 张静,
>>
>> Q1: By default, a savepoint restore will try to match all state
>>> back to the restored job. `AllowNonRestoredState` cannot avoid
>>> recovery all state from savepoint, but only skip match all of the
>>> restore state back to the restored job. So `ClassNotFoundException `
>>> could not be avoid.
>>
>> okay
>>
>>Q2: Not really. After you recover new job from the savepoint
>>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>>> then remove the definition of the POJO type. then you can restore from
>>> savepoint2.
>>>
>> I did it but it ends up with the same ClassNotFoundException :-(
>>
>> What I did exactly are
>> (1) Trigger sp1 from v1
>> (2) Start v2-1 (w/ the definition of the POJO but do not use it at all)
>> from sp1
>> (3) Trigger sp2 from v2-1
>> (4) Start v2-2 (w/o the definition of the POJO)  from sp2
>> (5) v2-2 failed with the same ClassNotFoundException regarding the POJO
>> type
>>
>> Should v2-2 successfully start from sp2?
>>
>> Best,
>>
>> Dongwon
>>
>>
>>
>>
>>
>>
>> On Mon, Feb 8, 2021 at 11:48 PM 张静  wrote:
>>
>>> Hi, Dongwon,
>>>  Q1: By default, a savepoint restore will try to match all state
>>> back to the restored job. `AllowNonRestoredState` cannot avoid
>>> recovery all state from savepoint, but only skip match all of the
>>> restore state back to the restored job. So `ClassNotFoundException `
>>> could not be avoid.
>>>  Q2: Not really. After you recover new job from the savepoint
>>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>>> then remove the definition of the POJO type. then you can restore from
>>> savepoint2.
>>> Correct me please if I'm wrong. Thanks.
>>>
>>> Best,
>>> Beyond1920
>>>
>>> Dongwon Kim  于2021年2月8日周一 下午9:43写道:
>>> >
>>> > Hi,
>>> >
>>> > I have an original job (say v1) and I want to start a new job (say v2)
>>> from a savepoint of v1.
>>> >
>>> > An operator of v1 used to have per-key states of a POJO type, but I
>>> want to remove the states together with the definition of the POJO type.
>>> >
>>> > When I start v2 from a savepoint of v1, I specified
>>> "--allowNonRestoredState" but  I got the following exception:
>>> >
>>> > 2021-02-08 22:07:28,324 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
>>> input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
>>> RUNNING to FAILED on container_e02_1607261469522_0242_01_08 @
>>> mobdata-flink-dn29.dakao.io (dataPort=45505).
>>> > java.lang.Exception: Exception while creating
>>> StreamOperatorStateContext.
>>> > at
>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> > at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>>> 

Re: Flink SQL temporal table join with Hive 报错

2021-02-08 Thread Rui Li
Hi,

那join的语句是怎么写的呢?

On Mon, Feb 8, 2021 at 2:45 PM macia kk  wrote:

> 图就是哪个报错
>
> 建表语句如下,表示公共表,我也没有改的权限.
>
> CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
> 'country', `currency` string COMMENT 'currency', `exchange_rate`
> decimal(25,10) COMMENT 'exchange rate')
> PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd')
> ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io
> .parquet.serde.ParquetHiveSerDe'
> WITH SERDEPROPERTIES (
>   'serialization.format' = '1'
> )
>
>
> Rui Li  于2021年2月8日周一 下午2:17写道:
>
> > 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?
> >
> > On Mon, Feb 8, 2021 at 10:33 AM macia kk  wrote:
> >
> > > Currently the join key in Temporal Table Join can not be empty.
> > >
> > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
> > >
> > > [image: image.png]
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


Re: pyFlink UDF Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness:

2021-02-08 Thread Dian Fu
这个问题应该有人问过,你搜搜看。

另外,如果GC频繁的话,把内存调大看看~


> 在 2021年2月8日,下午5:14,陈康 <844256...@qq.com> 写道:
> 
> 感谢回复...切换了版本...运行报错如下图 。。
> [hadoop@hadoop01 bin]$ pip list | grep flink
> apache-flink   1.11.1
> 
> Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
> with id 397c590a9c19b173a83a4476f8eeaca0 timed out.
>   ... 26 more
> 
>   观察到TM
> 年轻代gc频繁..
> ==
> [hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -m
> localhost:8081 -pyexec /opt/python36/bin/python3  -py udf.py
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> Job has been submitted with JobID a27f139f01ef951d832cfa8382523e4f
> Traceback (most recent call last):
>  File "udf.py", line 63, in 
>t_env.execute("job")
>  File
> "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 1057, in execute
>  File
> "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>  File
> "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>  File
> "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: a27f139f01ef951d832cfa8382523e4f)
>   at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>   at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
>   at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
>   at
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: a27f139f01ef951d832cfa8382523e4f)
>   at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
>   at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>   at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
>   at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>   at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>   at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>   at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
>   at
> 

Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread joris.vanagtmaal
Traceback (most recent call last):
  File "streaming-dms.py", line 309, in 
anomalies()
  File "streaming-dms.py", line 142, in anomalies
t_env.sql_query(query).insert_into("ark_sink")
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 748, in sql_query
j_table = self._j_tenv.sqlQuery(query)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 162, in deco
raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
 at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
 at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
 at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
 at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
 at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.base/java.lang.Thread.run(Thread.java:834)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread joris.vanagtmaal
I'm trying to read data from my eventhub in Azure, but i end up with the
Flink error message 'findAndCreateTableSource failed'

using Flink 1.13-Snapshot

source_ddl = f"""CREATE TABLE dms_source(
x_value VARCHAR
 ) WITH (
  'connector.type' = 'Kafka',
  'connector.version' = 'universal',
  'connector.partition' = '0',
  'connector.sasl.jaas.config'=
'org.apache.kafka.common.security.plain.PlainLoginModule required
username="$ConnectionString"
password="Endpoint=sb://**EVEN_HUB_NAME**.servicebus.windows.net/;SharedAccessKeyName=**KEY_
NAME**;SharedAccessKey=***PRIMARY_KEY***;EntityPath=**EVENT_HUB_INSTANCE_NAME**";',
  'connector.sasl.mechanism' = 'PLAIN',
  'connector.security.protocol' = 'SASL_SSL',
  'connector.properties.bootstrap.servers' =
'**EVEN_HUB_NAME**.servicebus.windows.net:9093',
  'connector.properties.group.id' = '$Default',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json')
"""

Any tips on how to debug this?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Khachatryan Roman
Hi,

Probably another solution would be to register a timer
(using KeyedProcessFunction) once we see an element after keyBy. The timer
will fire in windowIntervalMs. Upon firing, it will emit a dummy element
which will be ignored (or subtracted) in the end.
Upon receiving each new element, the function will shift the timer
accordingly.

Regards,
Roman


On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch 
wrote:

> Hi Yun,
>
> thanks for your reply.
>
> I do agree with your point about standard windows being for high level
> operations and the lower-level apis offering a rich toolset for most
> advanced use cases.
>
> I have tried to solve my problem with keyedProcessFunctions also but was
> not able to get it to work for two reasons:
>
> 1) I was not able to set up a combination of ValueState, Timers and
> Triggers that emulated a sliding window with a rising and falling count
> (including 0) good enough.
>
> 2) Memory Leak: States / Windows should be cleared after a certain time of
> being at count 0 in order to prevent an infinitely rising of ValueStates
> (that are not needed anymore)
>
>
> Can you maybe please elaborate in pseudocode how you would envision your
> solution?
>
>
> Best regards
>
> Jan
> On 08.02.21 05:31, Yun Gao wrote:
>
> Hi Jan,
>
> From my view, I think in Flink Window should be as a "high-level"
> operation for some kind
> of aggregation operation and if it could not satisfy the requirements, we
> could at least turn to
> using the "low-level" api by using KeyedProcessFunction[1].
>
> In this case, we could use a ValueState to store the current value for
> each key, and increment
> the value on each element. Then we could also register time for each key
> on receiving the first
> element for this key,  and in the onTimer callback, we could send the
> current state value, update
> the value to 0 and register another timer for this key after 30s.
>
> Best,
>  Yun
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
>
> --Original Mail --
> *Sender:*Jan Brusch 
> 
> *Send Date:*Sat Feb 6 23:44:00 2021
> *Recipients:*user  
> *Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem
>
>> Hi,
>> I was recently working on a problem where we wanted to implement a
>> simple count on a sliding window, e.g. "how many messages of a certain
>> type were emitted by a certain type of sensor in the last n minutes".
>> Which sounds simple enough in theory:
>>
>> messageStream
>>  .keyBy(//EmitterType + MessageType)
>>  .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
>> Time.seconds(30)))
>>  .map(_ => 1)
>>  .reduce((x,y) => x + y)
>>  .addSink(...)
>>
>> But there is a tricky edge case: The downstream systems will never know
>> when the count for a certain key goes back to 0, which is important for
>> our use case. The technical reason being that flink doesn't open a
>> window if there are no entries, i.e. a window with count 0 doesn't exist
>> in flink.
>>
>> We came up with the following solution for the time being:
>>
>> messageStream
>>  .keyBy(//EmitterType + MessageType)
>>  .window(GlobalWindows.create())
>>  .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
>>  .evictor(// CustomEvictor: Evict all messages older than n minutes
>> BEFORE processing the window)
>>  .process(// CustomCounter: Count all Messages in Window State);
>>  .addSink(...)
>>
>> In the case of zero messages in the last n minutes, all messages will be
>> evicted from the window and the process-function will get triggered one
>> last time on the now empty window, so we can produce a count of 0.
>>
>> I have two problems, though, with this solution:
>> 1) It is computationally inefficient for a simple count, as custom
>> process functions will always keep all messages in state. And, on every
>> trigger all elements will have to be touched twice: To compare the
>> timestamp and to count.
>> 2) It does seem like a very roundabout solution to a simple problem.
>>
>> So, I was wondering if there was a more efficient or "flink-like"
>> approach to this. Sorry for the long writeup, but I would love to hear
>> your takes.
>>
>>
>> Best regards
>> Jan
>>
>> --
>> neuland  – Büro für Informatik GmbH
>> Konsul-Smidt-Str. 8g, 28217 Bremen
>>
>> Telefon (0421) 380107 57
>> Fax (0421) 380107 99
>> https://www.neuland-bfi.de
>>
>> https://twitter.com/neuland
>> https://facebook.com/neulandbfi
>> https://xing.com/company/neulandbfi
>>
>>
>> Geschäftsführer: Thomas Gebauer, Jan Zander
>> Registergericht: Amtsgericht Bremen, HRB 23395 HB
>> USt-ID. DE 246585501
>
> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99https://www.neuland-bfi.de
> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan 

Any plans to make Flink configurable with pure data?

2021-02-08 Thread Pilgrim Beart
To a naive Flink newcomer (me) it's a little surprising that there is no
pure "data" mechanism for specifying a Flink pipeline, only "code"
interfaces. With the DataStream interface I can use Java, Scala or Python
to set up a pipeline and then execute it - but that doesn't really
seem to *need
*a programming model, it seems like configuration, which could be done with
data? OK, one does need occasionally to specify some custom code, e.g. a
ProcessFunction, but for any given use-case, a relatively static library of
such functions would seem fine.

My use case is that I have lots of customers, and I'm doing a similar job
for each of them, so I'd prefer to have a library of common code (e.g.
ProcessFunctions), and then specify each customer's specific requirements
in a single config file.  To do that in Java, I'd have to do
metaprogramming (to build various pieces of Java out of that config file).

Flink SQL seems to be the closest solution, but doesn't appear to support
fundamental Flink concepts such as timers (?). Is there a plan to evolve
Flink SQL to support timers? Timeouts is my specific need.

Thanks,

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot



Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

2021-02-08 Thread Khachatryan Roman
Hi,

Could you provide the exception stack trace?

Regards,
Roman


On Mon, Feb 8, 2021 at 3:46 PM joris.vanagtmaal <
joris.vanagtm...@wartsila.com> wrote:

> I'm trying to read data from my eventhub in Azure, but i end up with the
> Flink error message 'findAndCreateTableSource failed'
>
> using Flink 1.13-Snapshot
>
> source_ddl = f"""CREATE TABLE dms_source(
> x_value VARCHAR
>  ) WITH (
>   'connector.type' = 'Kafka',
>   'connector.version' = 'universal',
>   'connector.partition' = '0',
>   'connector.sasl.jaas.config'=
> 'org.apache.kafka.common.security.plain.PlainLoginModule required
> username="$ConnectionString"
> password="Endpoint=sb://**EVEN_HUB_NAME**.
> servicebus.windows.net/;SharedAccessKeyName=**KEY_
>
> NAME**;SharedAccessKey=***PRIMARY_KEY***;EntityPath=**EVENT_HUB_INSTANCE_NAME**";',
>   'connector.sasl.mechanism' = 'PLAIN',
>   'connector.security.protocol' = 'SASL_SSL',
>   'connector.properties.bootstrap.servers' =
> '**EVEN_HUB_NAME**.servicebus.windows.net:9093',
>   'connector.properties.group.id' = '$Default',
>   'connector.startup-mode' = 'latest-offset',
>   'format.type' = 'json')
> """
>
>  Any tips on how to debug this?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Khachatryan Roman
Hi,

I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in
particular.

>From what I see, the 2nd snapshot (sp2) is built using the same set of
states obtained from the starting savepoint/checkpoint (sp1) to write its
metadata. This metadata includes serializers snapshots, including
PojoSerializer for your custom type. On restore, this metadata is read, and
POJO class itself is loaded.

I see the following ways to overcome this issue:
1. Use the State Processor API to create a new snapshot [1]
2. If the operator has only this state then changing uid (together with
allowNonRestoredState) should help
3. Probably just changing POJO to an empty class will suffice in your case?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Regards,
Roman


On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim  wrote:

> Hi 张静,
>
> Q1: By default, a savepoint restore will try to match all state
>> back to the restored job. `AllowNonRestoredState` cannot avoid
>> recovery all state from savepoint, but only skip match all of the
>> restore state back to the restored job. So `ClassNotFoundException `
>> could not be avoid.
>
> okay
>
>Q2: Not really. After you recover new job from the savepoint
>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>> then remove the definition of the POJO type. then you can restore from
>> savepoint2.
>>
> I did it but it ends up with the same ClassNotFoundException :-(
>
> What I did exactly are
> (1) Trigger sp1 from v1
> (2) Start v2-1 (w/ the definition of the POJO but do not use it at all)
> from sp1
> (3) Trigger sp2 from v2-1
> (4) Start v2-2 (w/o the definition of the POJO)  from sp2
> (5) v2-2 failed with the same ClassNotFoundException regarding the POJO
> type
>
> Should v2-2 successfully start from sp2?
>
> Best,
>
> Dongwon
>
>
>
>
>
>
> On Mon, Feb 8, 2021 at 11:48 PM 张静  wrote:
>
>> Hi, Dongwon,
>>  Q1: By default, a savepoint restore will try to match all state
>> back to the restored job. `AllowNonRestoredState` cannot avoid
>> recovery all state from savepoint, but only skip match all of the
>> restore state back to the restored job. So `ClassNotFoundException `
>> could not be avoid.
>>  Q2: Not really. After you recover new job from the savepoint
>> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
>> then remove the definition of the POJO type. then you can restore from
>> savepoint2.
>> Correct me please if I'm wrong. Thanks.
>>
>> Best,
>> Beyond1920
>>
>> Dongwon Kim  于2021年2月8日周一 下午9:43写道:
>> >
>> > Hi,
>> >
>> > I have an original job (say v1) and I want to start a new job (say v2)
>> from a savepoint of v1.
>> >
>> > An operator of v1 used to have per-key states of a POJO type, but I
>> want to remove the states together with the definition of the POJO type.
>> >
>> > When I start v2 from a savepoint of v1, I specified
>> "--allowNonRestoredState" but  I got the following exception:
>> >
>> > 2021-02-08 22:07:28,324 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
>> input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
>> RUNNING to FAILED on container_e02_1607261469522_0242_01_08 @
>> mobdata-flink-dn29.dakao.io (dataPort=45505).
>> > java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> > at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
>> > Caused by: org.apache.flink.util.FlinkException: Could not restore
>> keyed state backend for
>> CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from
>> any of the 1 provided restore options.
>> > at
>> 

Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread meneldor
Thanks for the quick reply, Timo. Ill test with the  row_ts and compaction
mode suggestions. However, ive read somewhere in the archives that the
append only stream is only possible if i extract "the first" record from
the ranking only which in my case is the oldest record.

Regards

On Mon, Feb 8, 2021, 18:56 Timo Walther  wrote:

> Hi,
>
> could the problem be that you are mixing OVER and TUMBLE window with
> each other? The TUMBLE is correctly defined over time attribute `row_ts`
> but the OVER window is defined using a regular column `upd_ts`. This
> might be the case why the query is not append-only but updating.
>
> Maybe you can split the problem into sub queries and share the plan with
> us using .explain()?
>
> The nulls in upsert-kafka should be gone once you enable compaction mode
> in Kafka.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 08.02.21 10:53, Khachatryan Roman wrote:
> > Hi,
> >
> > AFAIK this should be supported in 1.12 via FLINK-19568 [1]
> > I'm pulling in Timo and Jark who might know better.
> >
> > https://issues.apache.org/jira/browse/FLINK-19857
> > 
> >
> > Regards,
> > Roman
> >
> >
> > On Mon, Feb 8, 2021 at 9:14 AM meneldor  > > wrote:
> >
> > Any help please? Is there a way to use the "Last row" from a
> > deduplication in an append-only stream or tell upsert-kafka to not
> > produce *null* records in the sink?
> >
> > Thank you
> >
> > On Thu, Feb 4, 2021 at 1:22 PM meneldor  > > wrote:
> >
> > Hello,
> > Flink 1.12.1(pyflink)
> > I am deduplicating CDC records coming from Maxwell in a kafka
> > topic.  Here is the SQL:
> >
> > CREATE TABLE stats_topic(
> >`data` ROW<`id` BIGINT, `account` INT, `upd_ts`
> > BIGINT>,
> >`ts` BIGINT,
> >`xid` BIGINT ,
> >row_ts AS
> TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
> >WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
> > '15' SECOND
> >  ) WITH (
> >'connector' = 'kafka',
> >'format' = 'json',
> >'topic' = 'stats_topic',
> >'properties.bootstrap.servers' = 'localhost:9092',
> >'properties.group.id
> > ' = 'test_group'
> >  )
> >
> > CREATE TABLE sink_table(
> >`id` BIGINT,
> >`account` INT,
> >`upd_ts` BIGINT
> >  ) WITH (
> >'connector' = 'kafka',
> >'format' = 'json',
> >'topic' = 'sink_topic',
> >'properties.bootstrap.servers' = 'localhost:9092',
> >'properties.group.id
> > ' = 'test_group'
> >  )
> >
> >
> > INSERT INTO sink_table
> > SELECT
> > id,
> > account,
> > upd_ts
> > FROM (
> > SELECT
> >   id,
> >   account,
> >   upd_ts,
> >   ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
> > AS rownum
> > FROM stats_topic
> > GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
> > MINUTE)
> > )
> > WHERE rownum=1
> >
> >
> >   As there are a lot of CDC records for a single ID im using
> > ROW_NUMBER() and produce them on a 20 minutes interval to the
> > sink_topic. The problem is that flink doesnt allow me to use it
> > in combination with with the kafka connector:
> >
> > pyflink.util.exceptions.TableException: Table sink
> > 'default_catalog.default_database.sink_table' doesn't
> > support consuming update and delete changes which is
> > produced by node Rank(strategy=[UndefinedStrategy],
> > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
> > partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1,
> $f2])
> >
> >
> > If I use the*upsert-kafka* connector everything is fine but then
> > i receive empty JSON records in the sink topic:
> >
> > {"id": 11, "account": 4, "upd_ts": 1612334952}
> > {"id": 22, "account": 4, "upd_ts": 1612334953}
> > {}
> > {"id": 33, "account": 4, "upd_ts": 1612334955}
> > {}
> > {"id": 44, "account": 4, "upd_ts": 1612334956}
> >
> >
> > Thank you!
> >
>
>


Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread Timo Walther

Hi,

could the problem be that you are mixing OVER and TUMBLE window with 
each other? The TUMBLE is correctly defined over time attribute `row_ts` 
but the OVER window is defined using a regular column `upd_ts`. This 
might be the case why the query is not append-only but updating.


Maybe you can split the problem into sub queries and share the plan with 
us using .explain()?


The nulls in upsert-kafka should be gone once you enable compaction mode 
in Kafka.


I hope this helps.

Regards,
Timo


On 08.02.21 10:53, Khachatryan Roman wrote:

Hi,

AFAIK this should be supported in 1.12 via FLINK-19568 [1]
I'm pulling in Timo and Jark who might know better.

https://issues.apache.org/jira/browse/FLINK-19857 



Regards,
Roman


On Mon, Feb 8, 2021 at 9:14 AM meneldor > wrote:


Any help please? Is there a way to use the "Last row" from a
deduplication in an append-only stream or tell upsert-kafka to not
produce *null* records in the sink?

Thank you

On Thu, Feb 4, 2021 at 1:22 PM meneldor mailto:menel...@gmail.com>> wrote:

Hello,
Flink 1.12.1(pyflink)
I am deduplicating CDC records coming from Maxwell in a kafka
topic.  Here is the SQL:

CREATE TABLE stats_topic(
           `data` ROW<`id` BIGINT, `account` INT, `upd_ts`
BIGINT>,
           `ts` BIGINT,
           `xid` BIGINT ,
           row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
           WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL
'15' SECOND
         ) WITH (
           'connector' = 'kafka',
           'format' = 'json',
           'topic' = 'stats_topic',
           'properties.bootstrap.servers' = 'localhost:9092',
           'properties.group.id
' = 'test_group'
         )

CREATE TABLE sink_table(
           `id` BIGINT,
           `account` INT,
           `upd_ts` BIGINT
         ) WITH (
           'connector' = 'kafka',
           'format' = 'json',
           'topic' = 'sink_topic',
           'properties.bootstrap.servers' = 'localhost:9092',
           'properties.group.id
' = 'test_group'
         )


INSERT INTO sink_table
SELECT
id,
account,
upd_ts
FROM (
SELECT
  id,
  account,
  upd_ts,
  ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc)
AS rownum
FROM stats_topic
GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20'
MINUTE)
)
WHERE rownum=1


  As there are a lot of CDC records for a single ID im using
ROW_NUMBER() and produce them on a 20 minutes interval to the
sink_topic. The problem is that flink doesnt allow me to use it
in combination with with the kafka connector:

pyflink.util.exceptions.TableException: Table sink
'default_catalog.default_database.sink_table' doesn't
support consuming update and delete changes which is
produced by node Rank(strategy=[UndefinedStrategy],
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
partitionBy=[$f0], orderBy=[$f2 DESC], select=[$f0, $f1, $f2])


If I use the*upsert-kafka* connector everything is fine but then
i receive empty JSON records in the sink topic:

{"id": 11, "account": 4, "upd_ts": 1612334952}
{"id": 22, "account": 4, "upd_ts": 1612334953}
{}
{"id": 33, "account": 4, "upd_ts": 1612334955}
{}
{"id": 44, "account": 4, "upd_ts": 1612334956}


Thank you!





Flink standalone on k8s HA异常

2021-02-08 Thread casel.chen
我试着答k8s上部署flink 
standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么?


high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: oss:///odps-prd/rtdp/flink/recovery


2021-02-09 00:03:04,421 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not 
start cluster entrypoint StandaloneSessionClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
 [flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:59)
 [flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:332)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:290)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:223)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_282]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
 ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 2 more
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.(Fabric8FlinkKubeClient.java:84)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.kubernetes.kubeclient.DefaultKubeClientFactory.fromConfiguration(DefaultKubeClientFactory.java:88)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:38)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:332)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:290)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:223)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_282]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
 ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 2 more

Re: Native Kubernetes annotation parsing problem

2021-02-08 Thread Kevin Kwon
I think it will be more generic question of how I inject IAM roles in
Native Kubernetes pods

I'm using Kubeiam and seems the namespace annotation doesn't work

On Mon, Feb 8, 2021 at 2:30 PM Kevin Kwon  wrote:

> Hi team, I'm using Native Kubernetes annotation config
>
>
> *kubernetes.jobmanager.annotations*
>
> and I'm facing some problem with parsing.
>
> I use annotation
>
>
> *iam.amazonaws.com/role:'arn:aws:iam:::role/XX/
> '*
>
> but seems no matter what I do, the colon is getting parsed for key, value.
> can anyone help?
>


Re: Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Dongwon Kim
Hi 张静,

Q1: By default, a savepoint restore will try to match all state
> back to the restored job. `AllowNonRestoredState` cannot avoid
> recovery all state from savepoint, but only skip match all of the
> restore state back to the restored job. So `ClassNotFoundException `
> could not be avoid.

okay

   Q2: Not really. After you recover new job from the savepoint
> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
> then remove the definition of the POJO type. then you can restore from
> savepoint2.
>
I did it but it ends up with the same ClassNotFoundException :-(

What I did exactly are
(1) Trigger sp1 from v1
(2) Start v2-1 (w/ the definition of the POJO but do not use it at all)
from sp1
(3) Trigger sp2 from v2-1
(4) Start v2-2 (w/o the definition of the POJO)  from sp2
(5) v2-2 failed with the same ClassNotFoundException regarding the POJO type

Should v2-2 successfully start from sp2?

Best,

Dongwon






On Mon, Feb 8, 2021 at 11:48 PM 张静  wrote:

> Hi, Dongwon,
>  Q1: By default, a savepoint restore will try to match all state
> back to the restored job. `AllowNonRestoredState` cannot avoid
> recovery all state from savepoint, but only skip match all of the
> restore state back to the restored job. So `ClassNotFoundException `
> could not be avoid.
>  Q2: Not really. After you recover new job from the savepoint
> (savepoint1)based on (1), you could do a new savepoint (savepoint2),
> then remove the definition of the POJO type. then you can restore from
> savepoint2.
> Correct me please if I'm wrong. Thanks.
>
> Best,
> Beyond1920
>
> Dongwon Kim  于2021年2月8日周一 下午9:43写道:
> >
> > Hi,
> >
> > I have an original job (say v1) and I want to start a new job (say v2)
> from a savepoint of v1.
> >
> > An operator of v1 used to have per-key states of a POJO type, but I want
> to remove the states together with the definition of the POJO type.
> >
> > When I start v2 from a savepoint of v1, I specified
> "--allowNonRestoredState" but  I got the following exception:
> >
> > 2021-02-08 22:07:28,324 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
> RUNNING to FAILED on container_e02_1607261469522_0242_01_08 @
> mobdata-flink-dn29.dakao.io (dataPort=45505).
> > java.lang.Exception: Exception while creating StreamOperatorStateContext.
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
> > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from
> any of the 1 provided restore options.
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > ... 9 more
> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Caught unexpected exception.
> > at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
> > at
> 

Joining and Grouping Flink Tables with Java API

2021-02-08 Thread Abdelilah CHOUKRI
Hi,

We're trying to use Flink 1.11 Java tables API to process a streaming use
case:

We have 2 streams, each one with different structures. Both events,
coming from Kafka, can be:
- A new event (not in the system already)
- An updated event (updating an event that previously was inserted)
so we only want to store the latest data in the Table.

We need to join the 2 previous Tables to have all this data stored in the
Flink system. We think that the best way is to store joined data as a
Table.
This is going to be a Flink Table, that will be a join of the 2 tables by a
common key.

To sum up, we have:
- Stream 1 (coming from Kafka topic) -> Flink Table 1
- Stream 2 (coming from Kafka topic) -> Flink Table 2
- Table 3 = Table 1 join Table 2
- DataStream using RetractStream of Table 3

To get the last element in Table 1 and Table 2, we are using Functions
(LastValueAggFunction):

streamTableEnvironment.registerFunction("LAST_VALUE_STRING", new
LastValueAggFunction.StringLastValueAggFunction());
...
streamTableEnvironment.fromDataStream(inputDataStream)
.groupBy($("id"))
.select(
$("id").as("o_id"),
call("LAST_VALUE_STRING", $("title")).as("o_title"),
call("LAST_VALUE_STRING", $("description")).as("o_description")
);


The questions are:
- Is our approach correct to get the data stored in the Flink system?
- Is it necessary to use the *LastValueAggFunction *in our case ? as we
want to retract the stream to
out custom Pojo instead of *Row*, but we're getting the attached error:
(attached*: stack_trace.log*)


Abdelilah Choukdi,
Backend dev at ManoMano.


stack_trace.log
Description: Binary data


Re: Table Cache Problem

2021-02-08 Thread Yongsong He
thanks for your help,Timo,it is very helpful

在 2021年2月8日星期一,Timo Walther  写道:

> Hi Yongsong,
>
> in newer Flink versions we introduced the concept of statament sets, which
> are available via `TableEnvironment.createStatementSet()`. They allow you
> to opimized a branching pipeline as a whole with reusing subplans.
>
> In older Flink versions, you can convert the Table to a DataStream and
> reregister it as a Table. In this case, the subplan will be materialized
> into a DataStream pipeline and the planner sees it as a blackbox that will
> be shared by multiple branches.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 08.02.21 03:59, Yongsong He wrote:
>
>> Hi experts,
>> I want to cache a temporary table for reuse it
>>
>> Flink version 1.10.1
>>
>> the table is consumer from kafka,  struct like:
>> create table a (
>> field1 string,
>> field2 string,
>> field3 string,
>> field4 string
>> )
>>
>> the sample code looks like:
>>
>> val settings = EnvironmentSettings.newInstance().inStreamingMode().
>> useBlinkPlanner().build()
>> val tableEnv = StreamTableEnvironment.create(env, settings)
>>
>> val temptable = tableEnv.sqlQuery(s"select * from a where ${condition}")
>>
>> temptable.where(condition1) ... then do something
>> temptable.where(condition2) ... then do otherthing
>>
>>
>> I want to reuse temptable for higher performance, what operators need or
>> it already cached in flink sql plan ?
>>
>> Any help would be appreciated :)
>>
>>
>


Enabling allowNonRestoredState when resuming from a savepoint causes ClassNotFoundException

2021-02-08 Thread Dongwon Kim
Hi,

I have an original job (say v1) and I want to start a new job (say v2) from
a savepoint of v1.

An operator of v1 used to have per-key states of a POJO type, but I want to
remove the states together with the definition of the POJO type.

When I start v2 from a savepoint of v1, I specified
"--allowNonRestoredState" but  I got the following exception:

2021-02-08 22:07:28,324 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
RUNNING to FAILED on container_e02_1607261469522_0242_01_08 @
mobdata-flink-dn29.dakao.io (dataPort=45505).
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
Caused by: org.apache.flink.util.FlinkException: Could not restore
keyed state backend for
CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8)
from any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException:
Caught unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
... 9 more
Caused by: java.io.IOException: Could not find class
'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in
classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
at 

??????flink????join??????????????????

2021-02-08 Thread Mailbox service
??----
??:lxk7...@163.com

Native Kubernetes annotation parsing problem

2021-02-08 Thread Kevin Kwon
Hi team, I'm using Native Kubernetes annotation config


*kubernetes.jobmanager.annotations*

and I'm facing some problem with parsing.

I use annotation


*iam.amazonaws.com/role:'arn:aws:iam:::role/XX/
'*

but seems no matter what I do, the colon is getting parsed for key, value.
can anyone help?


flink双流join如何确保数据不丢失

2021-02-08 Thread lxk7...@163.com

目前在使用flink进行双流join,多是使用interval join,根据经验值给定时间间隔,那么该如何保证数据不丢失呢?
如果数据晚于这个时间间隔,那么数据就被丢弃了,而我做的是关于订单的数据,这是不被允许的。


lxk7...@163.com


回复: flink升级hadoop3

2021-02-08 Thread lxk7...@163.com

不知道你的问题具体是指什么意思。
如果是升级hadoop的话,直接将flink下的配置文件中关于hadoop的jar包改成hadoop3的就行了


lxk7...@163.com
 
发件人: kandy.wang
发送时间: 2021-02-07 10:27
收件人: user-zh
主题: flink升级hadoop3
flink 如何升级hadoop3 ? 


Re: Cannot connect to queryable state proxy

2021-02-08 Thread Khachatryan Roman
Hi ChangZhuo,

Queryable state is exposed on the same address as the TM RPC. You can
change this address by modifying taskmanager.host [1].
However, I'm not sure if setting it to 127.0.0.1 or localhost will not
break connectivity with the other components.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#taskmanager-host

Regards,
Roman


On Sun, Feb 7, 2021 at 2:20 PM ChangZhuo Chen (陳昌倬) 
wrote:

> On Thu, Feb 04, 2021 at 04:26:42PM +0800, ChangZhuo Chen (陳昌倬) wrote:
> > Hi,
> >
> > We have problem connecting to queryable state client proxy as described
> > in [0]. Any help is appreciated.
> >
> > * The port 6125 is opened in taskmanager pod.
> >
> >   ```
> >   root@-654b94754d-2vknh:/tmp# ss -tlp
> >   StateRecv-Q   Send-Q Local
> Address:Port  Peer Address:Port  Process
> >   LISTEN   01024
> 0.0.0.0:46561  0.0.0.0:*
> >   LISTEN   03
> 0.0.0.0:9249   0.0.0.0:*
> >   LISTEN   01024
> 0.0.0.0:6122   0.0.0.0:*
> >   LISTEN   01024
> 10.200.11.3:9067   0.0.0.0:*
> >   LISTEN   01024
> 10.200.11.3:6125   0.0.0.0:*
> >   LISTEN   01024
> 0.0.0.0:38607  0.0.0.0:*
> >   ```
>
> The problem is that Flink only listens 10.200.11.3:6125 for queryable
> state client proxy, so we need to use correct network to connect to it.
> Is there any way we can make Flink to listen to 0.0.0.0 for queryable
> state client proxy?
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: pyFlink UDF Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness:

2021-02-08 Thread 陈康
感谢回复...切换了版本...运行报错如下图 。。
[hadoop@hadoop01 bin]$ pip list | grep flink
apache-flink   1.11.1

Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager
with id 397c590a9c19b173a83a4476f8eeaca0 timed out.
... 26 more

  观察到TM
年轻代gc频繁..
==
[hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -m
localhost:8081 -pyexec /opt/python36/bin/python3  -py udf.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID a27f139f01ef951d832cfa8382523e4f
Traceback (most recent call last):
  File "udf.py", line 63, in 
t_env.execute("job")
  File
"/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 1057, in execute
  File
"/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: a27f139f01ef951d832cfa8382523e4f)
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: a27f139f01ef951d832cfa8382523e4f)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at

???? Yarn application state ?? Flink Job status ????????????

2021-02-08 Thread ??????

  Yarn App  
NEW,NEW_SAVING,SUBMITTED,ACCEPTED,RUNNING,FINISHED,FAILED,KILLED ??
  Flink Job  
CREATED,RUNNING,FAILING,FAILED,CANCELLING,CANCELED,FINISHED,RESTARTING,SUSPENDED,RECONCILING
 


  ?? Flink Job?? 
Yarn app ?? RUNNING


  ??1??Flink??Yarn Rest API Yarn 
app state ?? Flink Rest API Job ?? status





Re: question on ValueState

2021-02-08 Thread Khachatryan Roman
Hi,

I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the value
on update.
As for "value()", it may (de)serialize it and return a copy if there is an
ongoing async snapshot in progress (to protect from modifications). This
shouldn't happen often though.

Regards,
Roman


On Mon, Feb 8, 2021 at 3:24 AM Yun Tang  wrote:

> Hi,
>
> MemoryStateBackend and FsStateBackend both hold keyed state in
> HeapKeyedStateBackend [1], and the main structure to store data is
> StateTable [2] which holds POJO format objects. That is to say, the object
> would not be serialized when calling update().
> On the other hand, RocksDB statebackend would store value with serialized
> bytes.
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>
> Best
> Yun Tang
>
> --
> *From:* Colletta, Edward 
> *Sent:* Sunday, February 7, 2021 19:53
> *To:* user@flink.apache.org 
> *Subject:* question on ValueState
>
>
> Using FsStateBackend.
>
>
>
> I was under the impression that ValueState.value will serialize an object
> which is stored in the local state backend, copy the serialized object and
> deserializes it.  Likewise update() would do the same steps copying the
> object back to local state backend.And as a consequence, storing
> collections in ValueState is much less efficient than using ListState or
> MapState if possible.
>
>
>
> However, I am looking at some code I wrote a while ago which made the
> assumption that the value() method just returned a reference to the
> object.  The code only calls update() when creating the object if value()
> returns null.Yet the code works, all changes to the object stored in
> state are visible the next time value() is called.   I have some sample
> code below.
>
>
>
> Can someone clarify what really happens when value() is called?
>
>
>
>
>
>public void processElement(M in, Context ctx, Collector out)
> throws Exception {
>
> MyWindow myWindow;
>
> myWindow = windowState.value();
>
> if (myWindow == null) {
>
>
> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
> + interval) / interval) * interval);
>
> myWindow = new MyWindow(0L, slide, windowSize);
>
> windowState.update(myWindow);
>
> myWindow.eq.add(0L);
>
> }
>
>
> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
> + in.value);
>
> }
>
>
>
> @Override
>
> public void onTimer(long timestamp, OnTimerContext ctx,
> Collector out) throws Exception {
>
>
> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
> + interval) / interval) * interval);
>
> MyWindow myWindow = windowState.value();
>
> myWindow.slide(0L);
>
> out.collect(myWindow.globalAccum);
>
> }
>
>
>
>
>


Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread Khachatryan Roman
Hi,

AFAIK this should be supported in 1.12 via FLINK-19568 [1]
I'm pulling in Timo and Jark who might know better.

https://issues.apache.org/jira/browse/FLINK-19857

Regards,
Roman


On Mon, Feb 8, 2021 at 9:14 AM meneldor  wrote:

> Any help please? Is there a way to use the "Last row" from a deduplication
> in an append-only stream or tell upsert-kafka to not produce *null*
> records in the sink?
>
> Thank you
>
> On Thu, Feb 4, 2021 at 1:22 PM meneldor  wrote:
>
>> Hello,
>> Flink 1.12.1(pyflink)
>> I am deduplicating CDC records coming from Maxwell in a kafka topic.
>> Here is the SQL:
>>
>> CREATE TABLE stats_topic(
>>>   `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
>>>   `ts` BIGINT,
>>>   `xid` BIGINT ,
>>>   row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>>   WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'format' = 'json',
>>>   'topic' = 'stats_topic',
>>>   'properties.bootstrap.servers' = 'localhost:9092',
>>>   'properties.group.id' = 'test_group'
>>> )
>>>
>>> CREATE TABLE sink_table(
>>>   `id` BIGINT,
>>>   `account` INT,
>>>   `upd_ts` BIGINT
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'format' = 'json',
>>>   'topic' = 'sink_topic',
>>>   'properties.bootstrap.servers' = 'localhost:9092',
>>>   'properties.group.id' = 'test_group'
>>> )
>>>
>>>
>>> INSERT INTO sink_table
>>> SELECT
>>> id,
>>> account,
>>> upd_ts
>>> FROM (
>>> SELECT
>>>  id,
>>>  account,
>>>  upd_ts,
>>>  ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
>>> FROM stats_topic
>>> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
>>> )
>>> WHERE rownum=1
>>>
>>
>>  As there are a lot of CDC records for a single ID im using ROW_NUMBER()
>> and produce them on a 20 minutes interval to the sink_topic. The problem is
>> that flink doesnt allow me to use it in combination with with the kafka
>> connector:
>>
>>> pyflink.util.exceptions.TableException: Table sink
>>> 'default_catalog.default_database.sink_table' doesn't support consuming
>>> update and delete changes which is produced by node
>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC],
>>> select=[$f0, $f1, $f2])
>>>
>>
>> If I use the* upsert-kafka* connector everything is fine but then i
>> receive empty JSON records in the sink topic:
>>
>>> {"id": 11, "account": 4, "upd_ts": 1612334952}
>>> {"id": 22, "account": 4, "upd_ts": 1612334953}
>>> {}
>>> {"id": 33, "account": 4, "upd_ts": 1612334955}
>>> {}
>>> {"id": 44, "account": 4, "upd_ts": 1612334956}
>>>
>>
>> Thank you!
>>
>


Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem

2021-02-08 Thread Jan Brusch

Hi Yun,

thanks for your reply.

I do agree with your point about standard windows being for high level 
operations and the lower-level apis offering a rich toolset for most 
advanced use cases.


I have tried to solve my problem with keyedProcessFunctions also but was 
not able to get it to work for two reasons:


1) I was not able to set up a combination of ValueState, Timers and 
Triggers that emulated a sliding window with a rising and falling count 
(including 0) good enough.


2) Memory Leak: States / Windows should be cleared after a certain time 
of being at count 0 in order to prevent an infinitely rising of 
ValueStates (that are not needed anymore)



Can you maybe please elaborate in pseudocode how you would envision your 
solution?



Best regards

Jan

On 08.02.21 05:31, Yun Gao wrote:

Hi Jan,

From my view, I think in Flink Window should be as a "high-level" 
operation for some kind
of aggregation operation and if it could not satisfy the requirements, 
we could at least turn to

using the "low-level" api by using KeyedProcessFunction[1].

In this case, we could use a ValueState to store the current value for 
each key, and increment
the value on each element. Then we could also register time for each 
key on receiving the first
element for this key,  and in the onTimer callback, we could send the 
current state value, update

the value to 0 and register another timer for this key after 30s.

Best,
 Yun



[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction

--Original Mail --
*Sender:*Jan Brusch 
*Send Date:*Sat Feb 6 23:44:00 2021
*Recipients:*user 
*Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem

Hi,
I was recently working on a problem where we wanted to implement a

simple count on a sliding window, e.g. "how many messages of a certain

type were emitted by a certain type of sensor in the last n minutes".

Which sounds simple enough in theory:

messageStream
 .keyBy(//EmitterType + MessageType)
 .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),

Time.seconds(30)))
 .map(_ => 1)
 .reduce((x,y) => x + y)
 .addSink(...)

But there is a tricky edge case: The downstream systems will never know

when the count for a certain key goes back to 0, which is important for

our use case. The technical reason being that flink doesn't open a

window if there are no entries, i.e. a window with count 0 doesn't exist

in flink.

We came up with the following solution for the time being:

messageStream
 .keyBy(//EmitterType + MessageType)
 .window(GlobalWindows.create())
 .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
 .evictor(// CustomEvictor: Evict all messages older than n minutes

BEFORE processing the window)
 .process(// CustomCounter: Count all Messages in Window State);
 .addSink(...)

In the case of zero messages in the last n minutes, all messages will be

evicted from the window and the process-function will get triggered one

last time on the now empty window, so we can produce a count of 0.

I have two problems, though, with this solution:
1) It is computationally inefficient for a simple count, as custom

process functions will always keep all messages in state. And, on every

trigger all elements will have to be touched twice: To compare the

timestamp and to count.
2) It does seem like a very roundabout solution to a simple problem.

So, I was wondering if there was a more efficient or "flink-like"
approach to this. Sorry for the long writeup, but I would love to hear

your takes.


Best regards
Jan

-- 
neuland  – Büro für Informatik GmbH

Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501


--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501



Re: Table Cache Problem

2021-02-08 Thread Timo Walther

Hi Yongsong,

in newer Flink versions we introduced the concept of statament sets, 
which are available via `TableEnvironment.createStatementSet()`. They 
allow you to opimized a branching pipeline as a whole with reusing subplans.


In older Flink versions, you can convert the Table to a DataStream and 
reregister it as a Table. In this case, the subplan will be materialized 
into a DataStream pipeline and the planner sees it as a blackbox that 
will be shared by multiple branches.


I hope this helps.

Regards,
Timo


On 08.02.21 03:59, Yongsong He wrote:

Hi experts,
I want to cache a temporary table for reuse it

Flink version 1.10.1

the table is consumer from kafka,  struct like:
create table a (
field1 string,
field2 string,
field3 string,
field4 string
)

the sample code looks like:

val settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()

val tableEnv = StreamTableEnvironment.create(env, settings)

val temptable = tableEnv.sqlQuery(s"select * from a where ${condition}")

temptable.where(condition1) ... then do something
temptable.where(condition2) ... then do otherthing


I want to reuse temptable for higher performance, what operators need or 
it already cached in flink sql plan ?


Any help would be appreciated :)





Re: Jobmanager stopped because uncaught exception

2021-02-08 Thread Khachatryan Roman
Hi,

The open issue you mentioned (FLINK-21053) is about preventing potential
issues in the future.
The issue you are experiencing is most likely FLINK-20992 as Yang Wang said.
So upgrading to 1.12.2 should solve the problem.

Regards,
Roman


On Mon, Feb 8, 2021 at 9:05 AM Lei Wang  wrote:

> I see there's a related issue
> https://issues.apache.org/jira/browse/FLINK-21053 which is still open.
>
> Does it mean the similar issue will still exist  even if i upgrade to
> 1.12.2 ?
>
> Thanks,
> Lei
>
> On Mon, Feb 8, 2021 at 3:54 PM Yang Wang  wrote:
>
>> Maybe it is a known issue[1] and has already been resolved in 1.12.2(will
>> release soon).
>> BTW, I think it is unrelated with the aliyun oss info logs.
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-20992
>>
>>
>> Best,
>> Yang
>>
>> Lei Wang  于2021年2月8日周一 下午2:22写道:
>>
>>> Flink standalone HA.   Flink version 1.12.1
>>>
>>> 2021-02-08 13:57:50,550 ERROR
>>> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL:
>>> Thread 'cluster-io-thread-30' produced an uncaught exception. Stopping the
>>> process...
>>> java.util.concurrent.RejectedExecutionException: Task
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3a4ab3cb
>>> rejected from 
>>> java.util.concurrent.ScheduledThreadPoolExecutor@6222948[Terminated,
>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 455]
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
>>> ~[?:1.8.0_275]
>>> at
>>> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66)
>>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> ~[?:1.8.0_275]
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> ~[?:1.8.0_275]
>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
>>>
>>> Using aliyun oss as statebackend storage.
>>> Before the ERROR, there's a lot of  info message like this:
>>>
>>> 2021-02-08 13:57:50,452 INFO
>>>  org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] -
>>> [Server]Unable to execute HT
>>> TP request: Not Found
>>> [ErrorCode]: NoSuchKey
>>> [RequestId]: 6020D2DEA1E11430349E8323
>>>
>>>
>>> Any insight on this?
>>>
>>> Thanks,
>>> Lei
>>>
>>


Re: pyFlink UDF Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness:

2021-02-08 Thread Dian Fu
看起来似乎是因为Flink集群的版本和PyFlink的版本不一致导致的:集群装的Flink是1.11.1,PyFlink是1.12.0?

先把版本都统一一下,再试试。

> 在 2021年2月8日,上午10:28,陈康 <844256...@qq.com> 写道:
> 
> 请教大佬们: 一个最简单pyflink UDF跑起来,报 Failed to create stage bundle factory!
> INFO:root:Initializing python harness: 在IdeaIJ上可以运行、大家有遇到过吗?谢谢~
> 
> /opt/module/flink-1.11.1/bin/flink run -m localhost:8081 -pyexec
> /opt/python36/bin/python3  -py udf.py
> 
> [hadoop@hadoop01 pyflink]$ /opt/python36/bin/python3  -V
> Python 3.6.5
> [hadoop@hadoop01 pyflink]$ pip list | grep flink
> apache-flink   1.12.0
> 
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka,
> Json
> from pyflink.table.udf import udf, TableFunction, ScalarFunction
> 
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
> 'true')
> 
> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
> DataTypes.BIGINT())
> t_env.register_function("add", add)
> 
> t_env.sql_update("""
>   CREATE TABLE mySource (   
>   a bigint,
>   b bigint
>   ) WITH ( 
>   'connector' = 'kafka',
>   'topic' = 'udf',
>   'properties.bootstrap.servers' = 'hadoop01:9092',
>   'properties.group.id' = 'pyflinkUDF',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'json'
>   ) 
> """)
> t_env.sql_update("""
>   CREATE TABLE mySink (   
>   a bigint,
>   b bigint
>   ) WITH ( 
>   'connector' = 'print'   
>   ) 
> """)
> t_env.sql_update("insert into mySink select a, add(a,b) from mySource")
> t_env.execute("job")
> 
> 
> 
> 
> [hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -m
> localhost:8081 -pyexec /opt/python36/bin/python3  -py udf.py
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> Job has been submitted with JobID 67fc6d3f4f3f97339345202f4de53366
> Traceback (most recent call last):
>  File "udf.py", line 63, in 
>t_env.execute("job")
>  File
> "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 1057, in execute
>  File
> "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>  File
> "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>  File
> "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 67fc6d3f4f3f97339345202f4de53366)
>   at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>   at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
>   at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
>   at
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> 

Re: Flink upset-kaka connector not working with Avro confluent

2021-02-08 Thread Till Rohrmann
Hi Shamit,

thanks for reaching out to the community. I am pulling in Timo who might
know more about this problem.

Cheers,
Till

On Sun, Feb 7, 2021 at 6:22 AM shamit jain  wrote:

> Hello Team,
>
> I am facing issue with "upsert-kafka" connector which should read the Avro
> message serialized using
> "io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working
> with "kafka" connector.
>
> Looks like we are not able to pass the schema registry url and subject
> name like the way we are passing while using "kafka" connector.
>
> Please help.
>
>
> Table definition with upsert-kafka is below (not working),
>
> CREATE TABLE proposalLine (PROPOSAL_LINE_ID
> bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
> ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
> 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'=' href="http://localhost:8081'">http://localhost:8081',
> 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
> 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')
>
> ERROR:
>  Caused by: java.io.IOException: Failed to deserialize Avro record.
> at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
> at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
> at
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
> at org.apache.avro.io
> .ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
> at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
> ... 9 more
>
>
> Table definition with kafka connector is below (working),
> CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String
> ) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline',
> 'format'='avro-confluent','avro-confluent.schema-registry.url' = 'http://localhost:8081'">http://localhost:8081',
> 'avro-confluent.schema-registry.subject' =
> 'lndcdcadsprpslproposalline-value')
>
> Regards,
> Shamit


Re: Kafka connector doesn't support consuming update and delete changes in Table SQL API

2021-02-08 Thread meneldor
Any help please? Is there a way to use the "Last row" from a deduplication
in an append-only stream or tell upsert-kafka to not produce *null* records
in the sink?

Thank you

On Thu, Feb 4, 2021 at 1:22 PM meneldor  wrote:

> Hello,
> Flink 1.12.1(pyflink)
> I am deduplicating CDC records coming from Maxwell in a kafka topic.  Here
> is the SQL:
>
> CREATE TABLE stats_topic(
>>   `data` ROW<`id` BIGINT, `account` INT, `upd_ts` BIGINT>,
>>   `ts` BIGINT,
>>   `xid` BIGINT ,
>>   row_ts AS TO_TIMESTAMP(FROM_UNIXTIME(data.upd_ts)),
>>   WATERMARK FOR `row_ts` AS `row_ts`   - INTERVAL '15' SECOND
>> ) WITH (
>>   'connector' = 'kafka',
>>   'format' = 'json',
>>   'topic' = 'stats_topic',
>>   'properties.bootstrap.servers' = 'localhost:9092',
>>   'properties.group.id' = 'test_group'
>> )
>>
>> CREATE TABLE sink_table(
>>   `id` BIGINT,
>>   `account` INT,
>>   `upd_ts` BIGINT
>> ) WITH (
>>   'connector' = 'kafka',
>>   'format' = 'json',
>>   'topic' = 'sink_topic',
>>   'properties.bootstrap.servers' = 'localhost:9092',
>>   'properties.group.id' = 'test_group'
>> )
>>
>>
>> INSERT INTO sink_table
>> SELECT
>> id,
>> account,
>> upd_ts
>> FROM (
>> SELECT
>>  id,
>>  account,
>>  upd_ts,
>>  ROW_NUMBER() OVER (PARTITION BY id ORDER BY upd_ts desc) AS rownum
>> FROM stats_topic
>> GROUP BY id, account, upd_ts, TUMBLE(row_ts, INTERVAL '20' MINUTE)
>> )
>> WHERE rownum=1
>>
>
>  As there are a lot of CDC records for a single ID im using ROW_NUMBER()
> and produce them on a 20 minutes interval to the sink_topic. The problem is
> that flink doesnt allow me to use it in combination with with the kafka
> connector:
>
>> pyflink.util.exceptions.TableException: Table sink
>> 'default_catalog.default_database.sink_table' doesn't support consuming
>> update and delete changes which is produced by node
>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[$f0], orderBy=[$f2 DESC],
>> select=[$f0, $f1, $f2])
>>
>
> If I use the* upsert-kafka* connector everything is fine but then i
> receive empty JSON records in the sink topic:
>
>> {"id": 11, "account": 4, "upd_ts": 1612334952}
>> {"id": 22, "account": 4, "upd_ts": 1612334953}
>> {}
>> {"id": 33, "account": 4, "upd_ts": 1612334955}
>> {}
>> {"id": 44, "account": 4, "upd_ts": 1612334956}
>>
>
> Thank you!
>


Re: Jobmanager stopped because uncaught exception

2021-02-08 Thread Lei Wang
I see there's a related issue
https://issues.apache.org/jira/browse/FLINK-21053 which is still open.

Does it mean the similar issue will still exist  even if i upgrade to
1.12.2 ?

Thanks,
Lei

On Mon, Feb 8, 2021 at 3:54 PM Yang Wang  wrote:

> Maybe it is a known issue[1] and has already been resolved in 1.12.2(will
> release soon).
> BTW, I think it is unrelated with the aliyun oss info logs.
>
> [1]. https://issues.apache.org/jira/browse/FLINK-20992
>
>
> Best,
> Yang
>
> Lei Wang  于2021年2月8日周一 下午2:22写道:
>
>> Flink standalone HA.   Flink version 1.12.1
>>
>> 2021-02-08 13:57:50,550 ERROR
>> org.apache.flink.runtime.util.FatalExitExceptionHandler  [] - FATAL:
>> Thread 'cluster-io-thread-30' produced an uncaught exception. Stopping the
>> process...
>> java.util.concurrent.RejectedExecutionException: Task
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3a4ab3cb
>> rejected from 
>> java.util.concurrent.ScheduledThreadPoolExecutor@6222948[Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 455]
>> at
>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
>> ~[?:1.8.0_275]
>> at
>> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1290)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:66)
>> ~[flink-dist_2.11-1.12.1.jar:1.12.1]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> ~[?:1.8.0_275]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ~[?:1.8.0_275]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
>>
>> Using aliyun oss as statebackend storage.
>> Before the ERROR, there's a lot of  info message like this:
>>
>> 2021-02-08 13:57:50,452 INFO
>>  org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] -
>> [Server]Unable to execute HT
>> TP request: Not Found
>> [ErrorCode]: NoSuchKey
>> [RequestId]: 6020D2DEA1E11430349E8323
>>
>>
>> Any insight on this?
>>
>> Thanks,
>> Lei
>>
>