Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-08 Thread Kezhu Wang
Could it be same as FLINK-21028[1] (titled as “Streaming application didn’t
stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?


[1]: https://issues.apache.org/jira/browse/FLINK-21028


Best,
Kezhu Wang

On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote:

Hi Thomas,

I tried but do not re-produce the exception yet. I have filed
an issue for the exception first [1].



[1] https://issues.apache.org/jira/browse/FLINK-22928


--Original Mail --
*Sender:*Thomas Wang 
*Send Date:*Tue Jun 8 07:45:52 2021
*Recipients:*Yun Gao 
*CC:*user 
*Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API

> This is actually a very simple job that reads from Kafka and writes to S3
> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API
> and nothing custom.
>
> Thomas
>
> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao  wrote:
>
>> Hi Thoms,
>>
>> Very thanks for reporting the exceptions, and it seems to be not work as
>> expected to me...
>> Could you also show us the dag of the job ? And does some operators in
>> the source task
>> use multiple-threads to emit records?
>>
>> Best,
>> Yun
>>
>>
>> --Original Mail --
>> *Sender:*Thomas Wang 
>> *Send Date:*Sun Jun 6 04:02:20 2021
>> *Recipients:*Yun Gao 
>> *CC:*user 
>> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API
>>
>>> One thing I noticed is that if I set drain = true, the job could be
>>> stopped correctly. Maybe that's because I'm using a Parquet file sink which
>>> is a bulk-encoded format and only writes to disk during checkpoints?
>>>
>>> Thomas
>>>
>>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang  wrote:
>>>
>>>> Hi Yun,
>>>>
>>>> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm
>>>> not quite sure what they mean though. Any hints?
>>>>
>>>> Thanks.
>>>>
>>>> Thomas
>>>>
>>>> ```
>>>> 2021-06-05 10:02:51
>>>> java.util.concurrent.ExecutionException:
>>>> org.apache.flink.streaming.runtime.tasks.
>>>> ExceptionInChainedOperatorException: Could not forward element to next
>>>> operator
>>>> at java.util.concurrent.CompletableFuture.reportGet(
>>>> CompletableFuture.java:357)
>>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>>> .java:1928)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .close(StreamOperatorWrapper.java:130)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .close(StreamOperatorWrapper.java:134)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>>> .close(StreamOperatorWrapper.java:80)
>>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>>> .closeOperators(OperatorChain.java:302)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(
>>>> StreamTask.java:576)
>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>> StreamTask.java:544)
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>>> ExceptionInChainedOperatorException: Could not forward element to next
>>>> operator
>>>> at org.apache.flink.streaming.runtime.tasks.
>>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
>>>> at org.apache.flink.streaming.api.operators.CountingOutput
>>>> .emitWatermark(CountingOutput.java:41)
>>>> at org.apache.flink.streaming.runtime.operators.
>>>> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
>>>> TimestampsAndWatermarksOperator.java:165)
>>>> at org.apache.flink.api.common.eventtime.
>>>> BoundedOutOfOrdernessWatermarks.onPeriodicEmit(
>>>> BoundedOutOfOrdernessWatermarks.java:69)
>>>> at org.apache.flink.streaming.runtime.operators.
>>>> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator
>>>> .java:125)

Re: 自定义窗口触发器 Trigger

2021-03-27 Thread Kezhu Wang
那是 window cleanup timer。

Flink timer 需要存储在 state ,所以没法像普通程序那样可以存储丰富的回调上下文。当 timer 触发时,operator
基本都是把这个 timer 派送给所有可能的接受者,接受者需要过滤掉非期望的 timer 回调。在 WindowOperator 中,即使
cleanup timer 不是 trigger 注册的,trigger 也会收到 cleanup timer 的回调,trigger
需要自己过滤掉这个回调。你可以看下 EventTimeTrigger,
ContinuousEventTimeTrigger,它们都会过滤掉非期望的回调(返回 TriggerResult.CONTINUS)。


Best,
Kezhu Wang

On March 8, 2021 at 15:37:52, smq (374060...@qq.com) wrote:

Trigger 抽象类其中有两个方法,onProcessingTime ()和OnEventTime (),在源码描述中,当trigger
context设置的定时器触发时会调用上面两个方法。但是在实际测试中,发现,就算没有设置定时器,上面的方法也能在水印超过窗口最大时间之后触发。请问有人知道了解过这个吗。


Re: 退订

2021-03-24 Thread Kezhu Wang
你需要发邮件到 

Best,
Kezhu Wang

On March 25, 2021 at 10:15:56, drewfranklin (drewfrank...@163.com) wrote:

退订


Re: Flink shuffle vs rebalance

2021-03-15 Thread Kezhu Wang
ShufflePartitioner:

  public int selectChannel(SerializationDelegate>
record) {
  return random.nextInt(numberOfChannels);
  }

RebalancePartitioner
 public int selectChannel(SerializationDelegate>
record) {
  nextChannelToSendTo = (nextChannelToSendTo + 1) %
numberOfChannels;
  return nextChannelToSendTo;
 }


一个随机,一个严格 round-robin。


Best,
Kezhu Wang

On March 15, 2021 at 22:02:33, 赢峰 (si_ji_f...@163.com) wrote:



Flink 中 shuffle Partitioner 和 rebalance partitoner 有什么区别?


Re: Flink savepoint迁移问题

2021-03-11 Thread Kezhu Wang
> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。


On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的
initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang  写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D=0>
<http://java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9

Re: Flink savepoint迁移问题

2021-03-10 Thread Kezhu Wang
新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+unionOffsetStates = stateStore.getUnionListState(
+new ListStateDescriptor<>(
+OFFSETS_STATE_NAME,
+TypeInformation.of(new TypeHint>() {
+})));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

 new ListStateDescriptor<>(
 OFFSETS_STATE_NAME,
-TypeInformation.of(new TypeHint>() {
+TypeInformation.of(new TypeHint>() {
 })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io.EOFException: No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)

at 
com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)

at 
com.esotericsoftware.kryo.io.Input.readInt(Input.java:350)

at
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)

at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)

at

Re: Stateful functions 2.2 and stop with savepoint

2021-03-04 Thread Kezhu Wang
Hi all,

My BAD!!!

Sorry for apparent mess up in that moment.

I will write a separate test for stream iterations.


The stateful function part should be a separated issue.


Best,
Kezhu Wang


On March 4, 2021 at 22:13:48, Piotr Nowojski (piotr.nowoj...@gmail.com)
wrote:

Hi Meissner,

Can you clarify, are you talking about stateful functions? [1] Or the
stream iterations [2]? The first e-mail suggests stateful functions, but
the ticket that Kezhu created is talking about the latter.

Piotrek

[1] https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#iterations



niedz., 28 lut 2021 o 15:33 Kezhu Wang  napisał(a):

> Hi,
>
> You could also try `cancel —withSavepoint [savepointDir]` even it is in
> deprecation. Comparing to take-savepoints and then cancel approach, there
> will be no checkpoints in between. This may be important if there are two
> phase commit operators in your job.
>
>
> Best,
> Kezhu Wang
>
>
> On February 28, 2021 at 20:50:29, Meissner, Dylan (
> dylan.t.meiss...@nordstrom.com) wrote:
>
> Thank you for opening the bug and including the extra context.
>
> I'll track the progress and, in the meantime, I will work around by taking
> two separate actions when stopping job: take-savepoints, then cancel.
> --
> *From:* Kezhu Wang 
> *Sent:* Sunday, February 28, 2021 12:31 AM
> *To:* user@flink.apache.org ; Meissner, Dylan <
> dylan.t.meiss...@nordstrom.com>
> *Subject:* Re: Stateful functions 2.2 and stop with savepoint
>
> Hi,
>
> Thanks for reporting. I think it is a Flink bug and have created
> FLINK-21522 for it. You could track progress there.
>
>
> FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522
>
>
> Best,
> Kezhu Wang
>
> On February 28, 2021 at 00:59:04, Meissner, Dylan (
> dylan.t.meiss...@nordstrom.com) wrote:
>
> I have an embedded function with a SinkFunction as an egress, implemented
> as this pseudo-code:
>
> val serializationSchema = KafkaSchemaSerializationSchema(... props
> required to use a Confluent Schema Registry with Avro, auth etc ...)
> return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema,
> props, AT_LEAST_ONCE))
>
> Checkpointing and taking a savepoint without stopping work as expected.
>
> However, when I run "flink stop " or even "flink stop --drain
> ", the operation never completes, reporting IN_PROGRESS until I hit
> the "failure-cause:
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
> before completing" CompletedException.
>
> In the "Checkpoint History" it shows only 2 of my 3 operators completed
> their work:
>
> Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1
> (100%) | end-to-end duration: 638ms | data-size 1.38 KB
> feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0%
> | end-to-end duration: n/a | data-size: n/a
> feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms |
> data-size: 0 B
>
> I've been unable to gain any insights from logs so far. Thoughts?
>
>


Re: Suspected classloader leak in Flink 1.11.1

2021-03-01 Thread Kezhu Wang
Hi Tamir,

> The histogram has been taken from Task Manager using jcmd tool.

>From that histogram, I guest there is no classloader leaking.

> A simple batch job with single operation . The memory bumps to ~600MB
(after single execution). once the job is finished the memory never freed.

It could be just new code paths and hence new classes. A single execution
does not making much sense. Multiple or dozen runs and continuous memory
increasing among them and not decreasing after could be symptom of leaking.

You could use following steps to verify whether there are issues in your
task managers:
* Run job N times, the more the better.
* Wait all jobs finished or stopped.
* Trigger manually gc dozen times.
* Take class histogram and check whether there are any
“ChildFirstClassLoader”.
* If there are roughly N “ChildFirstClassLoader” in histogram, then we can
pretty sure there might be class loader leaking.
* If there is no “ChildFirstClassLoader” or few but memory still higher
than a threshold, say ~600MB or more, it could be other shape of leaking.


In all leaking case, an heap dump as @Chesnay said could be more helpful
since it can tell us which object/class/thread keep memory from freeing.


Besides this, I saw an attachment “task-manager-thrad-print.txt” in initial
mail, when and where did you capture ? Task Manager ? Is there any job
still running ?


Best,
Kezhu Wang

On March 1, 2021 at 18:38:55, Tamir Sagi (tamir.s...@niceactimize.com)
wrote:

Hey Kezhu,

The histogram has been taken from Task Manager using jcmd tool.

By means of batch job, do you means that you compile job graph from DataSet
API in client side and then submit it through RestClient ? I am not
familiar with data set api, usually, there is no `ChildFirstClassLoader`
creation in client side for job graph building. Could you depict a pseudo
for this or did you create `ChildFirstClassLoader` yourself ?

Yes, we have a batch app. we read a file from s3 using hadoop-s3-plugin,
then map that data into DataSet then just print it.
Then we have a Flink Client application which saves the batch app jar.

Attached the following files:

   1. batch-source-code.java - main function
   2. FlatMapXSightMsgProcessor.java - custom RichFlatMapFunction
   3. flink-job-submit.txt - The code to submit the job


I've noticed 2 behaviors:

   1. Idle - Once Task manager application boots up the memory consumption
   gradually grows, starting ~360MB to ~430MB(within few minutes) I see logs
   where many classes are loaded into JVM and never get released.(Might be a
   normal behavior)
   2. Batch Job Execution - A simple batch job with single operation . The
   memory bumps to ~600MB (after single execution). once the job is finished
   the memory never freed. I executed GC several times (Manually +
   Programmatically) it did not help(although some classes were unloaded). the
   memory keeps growing while more batch jobs are executed.

Attached Task Manager Logs from yesterday after a single batch
execution.(Memory grew to 612MB and never freed)

   1. taskmgr.txt - Task manager logs (2021-02-28T16:06:05,983 is the timestamp
   when the job was submitted)
   2. gc-class-historgram.txt
   3. thread-print.txt
   4. vm-class-loader-stats.txt
   5. vm-class-loaders.txt
   6. heap_info.txt


Same behavior has been observed in Flink Client application. Once the batch
job is executed the memory is increased gradually and does not get cleaned
afterwards.(We observed many ChildFirstClassLoader instances)


Thank you
Tamir.

--
*From:* Kezhu Wang 
*Sent:* Sunday, February 28, 2021 6:57 PM
*To:* Tamir Sagi 
*Subject:* Re: Suspected classloader leak in Flink 1.11.1


*EXTERNAL EMAIL*


HI Tamir,

The histogram has no instance of `ChildFirstClassLoader`.

> we are running Flink on a session cluster (version 1.11.1) on Kubernetes,
submitting batch jobs with Flink client on Spring boot application (using
RestClusterClient).

> By analyzing the memory of the client Java application with profiling
tools, We saw that there are many instances of Flink's
ChildFirstClassLoader (perhaps as the number of jobs which were running),
and therefore many instances of the same class, each from a different
instance of the Class Loader (as shown in the attached screenshot).
Similarly, to the Flink task manager memory.

By means of batch job, do you means that you compile job graph from DataSet
API in client side and then submit it through RestClient ? I am not
familiar with data set api, usually, there is no `ChildFirstClassLoader`
creation in client side for job graph building. Could you depict a pseudo
for this or did you create `ChildFirstClassLoader` yourself ?


> In addition, we have tried calling GC manually, but it did not change
much.

It might take serval runs to collect a class loader instance.


Best,
Kezhu Wang


On February 28, 2021 at 23:27:38, Tamir Sagi (tamir.s...@niceactimize.com)
wrote:

Hey Kezhu,
Thanks for fast resp

Re: Issues running multiple Jobs using the same JAR

2021-02-28 Thread Kezhu Wang
Hi Morgan,

You could check FLINK-11654, from its description, I think it is the
problem you encountered.

> We run multiple jobs on a cluster which write a lot to the same Kafka
topic from identically named sinks. When EXACTLY_ONCE semantic is enabled
for the KafkaProducers we run into a lot of ProducerFencedExceptions and
all jobs go into a restart cycle.

FLINK-11654: https://issues.apache.org/jira/browse/FLINK-11654


Best,
Kezhu Wang


On February 28, 2021 at 22:35:02, Morgan Geldenhuys (
morgan.geldenh...@tu-berlin.de) wrote:

Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same JAR
in the same Flink native cluster (all 1.12.1).

When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannot perform send because at
least one previous transactional or idempotent request has failed with
errors.
at org.apache.kafka.clients.producer.internals.TransactionManager
.failIfNotReadyForSend(TransactionManager.java:356)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer
.java:926)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer
.java:865)
at org.apache.flink.streaming.connectors.kafka.internals.
FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:915)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:187)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:395)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:609)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Unknown Source)
Suppressed: org.apache.flink.streaming.connectors.kafka.
FlinkKafkaException: Failed to send data to Kafka: Producer attempted an
operation with an old epoch. Either there is a newer producer with the same
transactionalId, or the producer's transaction has been expired by the
broker.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.checkErroneous(FlinkKafkaProducer.java:1392)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.close(FlinkKafkaProducer.java:965)
at org.apache.flink.api.common.functions.util.FunctionUtils
.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.
AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.disposeAllOperators(StreamTask.java:783)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.runAndSuppressThrowable(StreamTask.java:762)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.cleanUpInvoke(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
StreamTask.java:585)
... 3 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException:
Producer attempted an operation with an old epoch. Either there is a newer
producer with the same transactionalId, or the producer's transaction has
been expired by the broker.
Suppressed: java.lang.IllegalStateException: Pending record count
must be zero at this point: 1
at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
at org.apache.flink.api.common.functions.util.FunctionUtils
.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.
AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.disposeAllOperators(StreamTask.java:783)
at org.apache.flink.streaming.runtime.tasks.StreamT

Re: Stateful functions 2.2 and stop with savepoint

2021-02-28 Thread Kezhu Wang
Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in
deprecation. Comparing to take-savepoints and then cancel approach, there
will be no checkpoints in between. This may be important if there are two
phase commit operators in your job.


Best,
Kezhu Wang


On February 28, 2021 at 20:50:29, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking
two separate actions when stopping job: take-savepoints, then cancel.
--
*From:* Kezhu Wang 
*Sent:* Sunday, February 28, 2021 12:31 AM
*To:* user@flink.apache.org ; Meissner, Dylan <
dylan.t.meiss...@nordstrom.com>
*Subject:* Re: Stateful functions 2.2 and stop with savepoint

Hi,

Thanks for reporting. I think it is a Flink bug and have created
FLINK-21522 for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

I have an embedded function with a SinkFunction as an egress, implemented
as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required
to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema,
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain
", the operation never completes, reporting IN_PROGRESS until I hit
the "failure-cause:
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed
their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%)
| end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0%
| end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms |
data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?


Re: Setting max parallelism via properties

2021-02-28 Thread Kezhu Wang
Hi Padarn,

There is a configuration option “pipeline.max-parallelism”.

It is not a cluster wide configuration but client/job/pipeline side
configuration which means you should bring this configuration
from flink conf file to pipeline generation stage.


If I understand correctly, `flink-on-k8s-operator` uses `flink run`(I found
this in `flinkcluster_submit_job_script.go`) to submit job to cluster. This
command already cover the bridge work, so I think it should just work in
your case.


pipeline-max-parallelism:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#pipeline-max-parallelism


Best,
Kezhu Wang

On February 28, 2021 at 16:45:03, Padarn Wilson (pad...@gmail.com) wrote:

Hi all,

Sorry for the basic question, but is it possible to set max
parallelism using the flink conf file, rather than explicitly in code:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism

Need this for a PR I am working on for the flink operator:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/425


Re: Window Process function is not getting trigger

2021-02-28 Thread Kezhu Wang
Hi,

Glad to hear.

Normally, you would not encounter this if there are massive data.
`WatermarkStrategy.withIdleness` could be more appropriate in production.


Best,
Kezhu Wang


On February 24, 2021 at 22:35:11, sagar (sagarban...@gmail.com) wrote:

Thanks Kezhu, It worked!!!

On Wed, Feb 24, 2021 at 2:47 PM Kezhu Wang  wrote:

> Try `env.setParallelism(1)`. Default parallelism for local environment is
> `Runtime.getRuntime.availableProcessors`.
>
> You test data set are so small that when they are scatter cross multiple
> parallel instances, there will be no data with event time assigned to
> trigger downstream computation.
>
> Or you could try `WatermarkStrategy.withIdleness`.
>
>
> Best,
> Kezhu Wang
>
> On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote:
>
> It is fairly simple requirement, if I changed it to PRocessing time it
> works fine , but not working with event time..help appreciated!
>
> On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:
>
>> HI
>>
>> Corrected with below code, but still getting same issue
>>
>> Instant instant = 
>> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
>> long timeInMillis = instant.toEpochMilli();
>> System.out.println(timeInMillis);
>> return timeInMillis;
>>
>>
>> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>>
>>> I saw one potential issue. Your timestamp assigner returns timestamp in
>>> second resolution while Flink requires millisecond resolution.
>>>
>>>
>>> Best,
>>> Kezhu Wang
>>>
>>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>>
>>> I have simple flink stream program, where I am using socket as my
>>> continuous source
>>> I have window size of 2 seconds.
>>>
>>> Somehow my window process function is not triggering and even if I pass
>>> events in any order, flink is not ignoring
>>>
>>> I can see the output only when I kill my socket , please find the code
>>> snippet below
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>>
>>> DataStream price = env.socketTextStream("localhost",
>>> 9998).uid("price source").map(new MapFunction() {
>>> @Override
>>> public Price map(String s) throws Exception {
>>> return new Price(s.split(",")[0],
>>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>>> }
>>> }
>>> );
>>>
>>> DataStream priceStream = price
>>>
>>>  
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>>> .withTimestampAssigner((p,timestamp) ->
>>> {
>>> ZoneId zoneId = ZoneId.systemDefault();
>>> long epoch =
>>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>>> System.out.println(epoch);
>>>  return epoch;
>>> }))
>>> .keyBy(new KeySelector() {
>>> @Override
>>> public String getKey(Price price) throws Exception {
>>> return price.getPerformanceId();
>>> }
>>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>>> .process(new ProcessWindowFunction>> TimeWindow>() {
>>>
>>> @Override
>>> public void process(String s, Context context,
>>> Iterable iterable, Collector collector) throws Exception {
>>> System.out.println(context.window().getStart()+
>>> "Current watermark: "+context.window().getEnd());
>>> Price p1 = null ;
>>> for(Price p : iterable)
>>> {
>>> System.out.println(p.toString());
>>> p1= p;
>>> }
>>> collector.collect(p1);

Re: Stateful functions 2.2 and stop with savepoint

2021-02-28 Thread Kezhu Wang
Hi,

Thanks for reporting. I think it is a Flink bug and have created
FLINK-21522 for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

I have an embedded function with a SinkFunction as an egress, implemented
as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required
to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema,
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain
", the operation never completes, reporting IN_PROGRESS until I hit
the "failure-cause:
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed
their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%)
| end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0%
| end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms |
data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?


Re: Window Process function is not getting trigger

2021-02-24 Thread Kezhu Wang
Try `env.setParallelism(1)`. Default parallelism for local environment is
`Runtime.getRuntime.availableProcessors`.

You test data set are so small that when they are scatter cross multiple
parallel instances, there will be no data with event time assigned to
trigger downstream computation.

Or you could try `WatermarkStrategy.withIdleness`.


Best,
Kezhu Wang

On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote:

It is fairly simple requirement, if I changed it to PRocessing time it
works fine , but not working with event time..help appreciated!

On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:

> HI
>
> Corrected with below code, but still getting same issue
>
> Instant instant = 
> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
> long timeInMillis = instant.toEpochMilli();
> System.out.println(timeInMillis);
> return timeInMillis;
>
>
> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>
>> I saw one potential issue. Your timestamp assigner returns timestamp in
>> second resolution while Flink requires millisecond resolution.
>>
>>
>> Best,
>> Kezhu Wang
>>
>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>
>> I have simple flink stream program, where I am using socket as my
>> continuous source
>> I have window size of 2 seconds.
>>
>> Somehow my window process function is not triggering and even if I pass
>> events in any order, flink is not ignoring
>>
>> I can see the output only when I kill my socket , please find the code
>> snippet below
>>
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>>
>> DataStream price = env.socketTextStream("localhost",
>> 9998).uid("price source").map(new MapFunction() {
>> @Override
>> public Price map(String s) throws Exception {
>> return new Price(s.split(",")[0],
>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>> }
>> }
>> );
>>
>> DataStream priceStream = price
>>
>>  
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>> .withTimestampAssigner((p,timestamp) ->
>> {
>> ZoneId zoneId = ZoneId.systemDefault();
>> long epoch =
>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>> System.out.println(epoch);
>>  return epoch;
>> }))
>> .keyBy(new KeySelector() {
>> @Override
>> public String getKey(Price price) throws Exception {
>> return price.getPerformanceId();
>> }
>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>> .process(new ProcessWindowFunction> TimeWindow>() {
>>
>> @Override
>> public void process(String s, Context context,
>> Iterable iterable, Collector collector) throws Exception {
>> System.out.println(context.window().getStart()+
>> "Current watermark: "+context.window().getEnd());
>> Price p1 = null ;
>> for(Price p : iterable)
>> {
>> System.out.println(p.toString());
>> p1= p;
>> }
>> collector.collect(p1);
>> }
>> });
>>
>>
>> priceStream.writeAsText("c:\\ab.txt");
>>
>> also data I am inputting are
>>
>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>


-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Window Process function is not getting trigger

2021-02-23 Thread Kezhu Wang
I saw one potential issue. Your timestamp assigner returns timestamp in
second resolution while Flink requires millisecond resolution.


Best,
Kezhu Wang

On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:

I have simple flink stream program, where I am using socket as my
continuous source
I have window size of 2 seconds.

Somehow my window process function is not triggering and even if I pass
events in any order, flink is not ignoring

I can see the output only when I kill my socket , please find the code
snippet below

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);


DataStream price = env.socketTextStream("localhost",
9998).uid("price source").map(new MapFunction() {
@Override
public Price map(String s) throws Exception {
return new Price(s.split(",")[0],
LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
BigDecimal(s.split(",")[3]), s.split(",")[4], new
BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
}
}
);

DataStream priceStream = price

 
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner((p,timestamp) ->
{
ZoneId zoneId = ZoneId.systemDefault();
long epoch = p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
System.out.println(epoch);
 return epoch;
}))
.keyBy(new KeySelector() {
@Override
public String getKey(Price price) throws Exception {
return price.getPerformanceId();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction() {

@Override
public void process(String s, Context context,
Iterable iterable, Collector collector) throws Exception {
System.out.println(context.window().getStart()+
"Current watermark: "+context.window().getEnd());
Price p1 = null ;
for(Price p : iterable)
{
System.out.println(p.toString());
p1= p;
}
collector.collect(p1);
}
});


priceStream.writeAsText("c:\\ab.txt");

also data I am inputting are

p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: State Access Beyond RichCoFlatMapFunction

2021-02-22 Thread Kezhu Wang
Flink IT tests covers queryable state with mini cluster.

All tests:
https://github.com/apache/flink/tree/5c772f0a9cb5f8ac8e3f850a0278a75c5fa059d5/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases

Setup/Configs:
https://github.com/apache/flink/blob/5c772f0a9cb5f8ac8e3f850a0278a75c5fa059d5/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java#L67

Test case:
https://github.com/apache/flink/blob/5c772f0a9cb5f8ac8e3f850a0278a75c5fa059d5/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java#L912


Best,
Kezhu Wang

On February 19, 2021 at 20:27:41, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

Is there an example setup of Queryable State for a Local Embedded
Environment?

I am trying to execute Flink programs from within IntelliJ. Any help would
be appreciated!

Even if not, if there are other examples where QueryableState can be
executed in a standalone cluster, that would also be good help. Thanks.


On 10-Feb-2021, at 9:05 AM, Kezhu Wang  wrote:

(a) It is by design. For keyed state, you can only access state for that
key, not others. If you want one value per key, ValueState fits more
appropriate that MapState.
(b) state-processor-api aims to access/create/modify/upgrade offline
savepoint but not running state. Queryable state may meet your requirement,
but it is not actively developed for a while according to my observation
and still beta.

Queryable state:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html


On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

I am creating a class that extends RichCoFlatMapFunction. I need to
connect() two streams to basically share the state of one stream in
another.

This is what I do:

private transient MapState state;

@Override

public void open(Configuration parameters) throws Exception {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(“abc-saved-state",
Types.POJO(KeyClass.class), Types.POJO(ValueClass.class));
state = getRuntimeContext().getMapState(stateDescriptor);


This works correctly.


I have two questions:
(a) Whenever I debug, I can only see the current key in the MapState, not
all the possible keys that were created before and saved. Next time, I get
a hit for another key, I will only see the other key and not the rest of
previous keys. Is it by design or am I missing something?

(b) Can I somehow access this state beyond the class that holds the state?
I.e. can I access the state in some other class? If not, can I use the
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
to
do this? Is that the correct way to access the running state of one stream
elsewhere in the program without corrupting it?


Your response will be greatly appreciated. I will be happy to add more
details if required.

Thanks,
Sandeep Ramesh Khanzode


Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Kezhu Wang
Piotr is right. So just ignore my words. It is the price of going deep down
the rabbit hole:-).


Best,
Kezhu Wang


On February 17, 2021 at 23:48:30, Piotr Nowojski (pnowoj...@apache.org)
wrote:

Note^2: InputSelectable is `@PublicEvolving` API, so it can be used.
However as Timo pointed out, it would block the checkpointing. If I
remember correctly there is a checkState that will not allow to use
`InputSelectable` with enabled checkpointing.

Piotrek

śr., 17 lut 2021 o 16:46 Kezhu Wang  napisał(a):

> Hi all,
>
> Thanks Arvid and Timo for more candidates.
>
> I also think “buffering until control side ready” should be more canonical
> in current stage of Flink.
>
> Timo has created FLINK-21392 for exposing user friendly data stream api to
> block one input temporarily.
>
> If one really want go deep down the rabbit hole as Arvid said, I have one
> approach from the top of my head.
>
> Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`,
> `InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
> should achieve the goal and not interfering with checkpoint, but the
> control side must not be bounded before FLIP-147 delivered.
>
> [1] FLINK-21392: https://issues.apache.org/jira/browse/FLINK-21392
>
> Best,
> Kezhu Wang
>
> On February 17, 2021 at 22:58:23, Arvid Heise (ar...@apache.org) wrote:
>
> Note that the question is also posted on SO [1].
>
> [1]
> https://stackoverflow.com/questions/66236004/connectedstreams-paused-until-control-stream-ready/
>
> On Wed, Feb 17, 2021 at 3:31 PM Timo Walther  wrote:
>
>> Hi Kezhu,
>>
>> `InputSelectable` is currently not exposed in the DataStream API because
>> it might have side effects that need to be considered (e.g. are
>> checkpoints still go through?). In any case, we don't have a good story
>> for blocking a control stream yet. The best option is to buffer the
>> other stream in state until the control stream is ready. You can also
>> artifically slow down the other stream until then (e.g. by sleeping) to
>> not buffer too much state.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>>
>> On 17.02.21 14:35, Kezhu Wang wrote:
>> > A combination of `BoundedMultiInput` and `InputSelectable` could help.
>> > You could see
>> > `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
>> > for an usage example. The control topic have not to be bounded.
>> >
>> > There are maybe other approaches from later responses. I could not tell
>> > whether it is canonical or not.
>> >
>> > Best,
>> > Kezhu Wang
>> >
>> > On February 17, 2021 at 13:03:42, Salva Alcántara
>> > (salcantara...@gmail.com <mailto:salcantara...@gmail.com>) wrote:
>> >
>> >> What is the canonical way to accomplish this:
>> >>
>> >> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
>> >> processing of the data stream until >the control stream is "ready", so
>> to
>> >> speak
>> >>
>> >> My particular use case is as follows: I have a CoFlatMap function. The
>> >> data
>> >> stream contains elements that need to be enriched with additional
>> >> information (they come with some fields empty). The missing
>> >> information is
>> >> taken from the control stream, whose elements come through a kafka
>> >> source.
>> >> Essentially, what I want is to pause any processing until having read
>> the
>> >> full (control) topic, otherwise (at least initially) the output
>> elements
>> >> will not be enriched as expected.
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from:
>> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>
>>


Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Kezhu Wang
Hi all,

Thanks Arvid and Timo for more candidates.

I also think “buffering until control side ready” should be more canonical
in current stage of Flink.

Timo has created FLINK-21392 for exposing user friendly data stream api to
block one input temporarily.

If one really want go deep down the rabbit hole as Arvid said, I have one
approach from the top of my head.

Combination of `MultipleInputStreamOperator`, `BoundedMultiInput`,
`InputSeletable`, FLIP-27 source and `ChainingStrategy.HEAD_WITH_SOURCES`
should achieve the goal and not interfering with checkpoint, but the
control side must not be bounded before FLIP-147 delivered.

[1] FLINK-21392: https://issues.apache.org/jira/browse/FLINK-21392

Best,
Kezhu Wang

On February 17, 2021 at 22:58:23, Arvid Heise (ar...@apache.org) wrote:

Note that the question is also posted on SO [1].

[1]
https://stackoverflow.com/questions/66236004/connectedstreams-paused-until-control-stream-ready/

On Wed, Feb 17, 2021 at 3:31 PM Timo Walther  wrote:

> Hi Kezhu,
>
> `InputSelectable` is currently not exposed in the DataStream API because
> it might have side effects that need to be considered (e.g. are
> checkpoints still go through?). In any case, we don't have a good story
> for blocking a control stream yet. The best option is to buffer the
> other stream in state until the control stream is ready. You can also
> artifically slow down the other stream until then (e.g. by sleeping) to
> not buffer too much state.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 17.02.21 14:35, Kezhu Wang wrote:
> > A combination of `BoundedMultiInput` and `InputSelectable` could help.
> > You could see
> > `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
> > for an usage example. The control topic have not to be bounded.
> >
> > There are maybe other approaches from later responses. I could not tell
> > whether it is canonical or not.
> >
> > Best,
> > Kezhu Wang
> >
> > On February 17, 2021 at 13:03:42, Salva Alcántara
> > (salcantara...@gmail.com <mailto:salcantara...@gmail.com>) wrote:
> >
> >> What is the canonical way to accomplish this:
> >>
> >> >Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
> >> processing of the data stream until >the control stream is "ready", so
> to
> >> speak
> >>
> >> My particular use case is as follows: I have a CoFlatMap function. The
> >> data
> >> stream contains elements that need to be enriched with additional
> >> information (they come with some fields empty). The missing
> >> information is
> >> taken from the control stream, whose elements come through a kafka
> >> source.
> >> Essentially, what I want is to pause any processing until having read
> the
> >> full (control) topic, otherwise (at least initially) the output elements
> >> will not be enriched as expected.
> >>
> >>
> >>
> >> --
> >> Sent from:
> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>
>


Re: ConnectedStreams paused until control stream “ready”

2021-02-17 Thread Kezhu Wang
A combination of `BoundedMultiInput` and `InputSelectable` could help. You
could see `org.apache.flink.table.runtime.operators.join.HashJoinOperator`
for an usage example. The control topic have not to be bounded.

There are maybe other approaches from later responses. I could not tell
whether it is canonical or not.

Best,
Kezhu Wang

On February 17, 2021 at 13:03:42, Salva Alcántara (salcantara...@gmail.com)
wrote:

What is the canonical way to accomplish this:

>Given a ConnectedStreams, e.g., a CoFlatMap UDF, how to prevent any
processing of the data stream until >the control stream is "ready", so to
speak

My particular use case is as follows: I have a CoFlatMap function. The data
stream contains elements that need to be enriched with additional
information (they come with some fields empty). The missing information is
taken from the control stream, whose elements come through a kafka source.
Essentially, what I want is to pause any processing until having read the
full (control) topic, otherwise (at least initially) the output elements
will not be enriched as expected.



-- 
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to report metric based on keyed state piece

2021-02-17 Thread Kezhu Wang
With an initial `y`, I think you could compute new `y` on new stream value.
Upon recovering from checkpoint, may be `KeyedStateBackend.applyToAllKeys`
could help you to rebuild an initial `y`.

Best,
Kezhu Wang

On February 17, 2021 at 13:09:39, Salva Alcántara (salcantara...@gmail.com)
wrote:

I wonder what is the canonical way to accomplish the following:

Given a Flink UDF, how to report a metric `y` which is a function of some
(keyed) state `X`? That is, `y=f(X)`. Most commonly, we are interested in
the size of the state X.

For instance, consider a `CoFlatMap` function with:

- `X` being a `MapState`
- `y` (the metric) consisting of the aggregated size (i.e., the total size
of the `MapState`, for all keys)



-- 
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: State Access Beyond RichCoFlatMapFunction

2021-02-12 Thread Kezhu Wang
Hi Sandeep,

I must mislead you by inaccurate words. I did not mean using
CoGroupedStreams, but only CoGroupedStreams.apply as reference for how to
union streams together and keyBy them. This way you can have all three
streams’ states in downstream without duplication.

Best,
Kezhu Wang

On February 11, 2021 at 20:49:20, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

Can you please share if you have some example of CoGroupedStreams? Thanks!

On 10-Feb-2021, at 3:22 PM, Kezhu Wang  wrote:

> Actually, my use case is that I want to share the state of one stream in
two other streams. Right now, I can think of connecting this stream
independently with each of the two other streams and manage the state
twice, effectively duplicating it.

> Only the matching keys (with the two other streams) will do.

I assume that `ConnectedStreams` meets your requirements but your don’t
want duplicate that state twice ? Then, I think there are ways:
1. Union all three streams to one and then keyBy. You can see
`CoGroupedStreams` for reference.
2. You can try `MultipleInputStreamOperator` and
`AbstractStreamOperatorV2`. But most usages of these two are currently
Flink tests and internal.
 You could reach out `MultipleInputITCase.testKeyedState` for reference.


* CoGroupedStreams union:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369
* MultipleInputITCase.testKeyedState:
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113

On February 10, 2021 at 17:19:15, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hi,

Yes, but the stream, whose state I want to share, will be indefinite and
have a large volume. Also, not all keys from that stream have to go to
every Task Node. Only the matching keys (with the two other streams) will
do.

Please let me know if there is another cleaner way to achieve this. Thanks.


On 10-Feb-2021, at 12:44 PM, Kezhu Wang  wrote:

Flink has broadcast state to broadcast one stream to other in case you are
not aware of it. It actually duplicates state.

1. Broadcast state:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Best,
Kezhu Wang

On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

Thanks a lot for the response. I will try to check Queryable-state for this
purpose.

Actually, my use case is that I want to share the state of one stream in
two other streams. Right now, I can think of connecting this stream
independently with each of the two other streams and manage the state
twice, effectively duplicating it.

I was trying to check whether there are options where I can share this
state with both the streams but save it only once.


On 10-Feb-2021, at 9:05 AM, Kezhu Wang  wrote:

(a) It is by design. For keyed state, you can only access state for that
key, not others. If you want one value per key, ValueState fits more
appropriate that MapState.
(b) state-processor-api aims to access/create/modify/upgrade offline
savepoint but not running state. Queryable state may meet your requirement,
but it is not actively developed for a while according to my observation
and still beta.

Queryable state:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html


On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

I am creating a class that extends RichCoFlatMapFunction. I need to
connect() two streams to basically share the state of one stream in
another.

This is what I do:

private transient MapState state;

@Override

public void open(Configuration parameters) throws Exception {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(“abc-saved-state",
Types.POJO(KeyClass.class), Types.POJO(ValueClass.class));
state = getRuntimeContext().getMapState(stateDescriptor);


This works correctly.


I have two questions:
(a) Whenever I debug, I can only see the current key in the MapState, not
all the possible keys that were created before and saved. Next time, I get
a hit for another key, I will only see the other key and not the rest of
previous keys. Is it by design or am I missing something?

(b) Can I somehow access this state beyond the class that holds the state?
I.e. can I access the state in some other class? If not, can I use the
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
to
do this? Is that the correct way to access the running state of one stream
elsewhere in the program without corrupting it?


Your response will be greatly appreciated. I will be happy to add more
details if required.

Thanks,
Sandeep Ramesh Khanzode


Re: State Access Beyond RichCoFlatMapFunction

2021-02-10 Thread Kezhu Wang
> Actually, my use case is that I want to share the state of one stream in
two other streams. Right now, I can think of connecting this stream
independently with each of the two other streams and manage the state
twice, effectively duplicating it.

> Only the matching keys (with the two other streams) will do.

I assume that `ConnectedStreams` meets your requirements but your don’t
want duplicate that state twice ? Then, I think there are ways:
1. Union all three streams to one and then keyBy. You can see
`CoGroupedStreams` for reference.
2. You can try `MultipleInputStreamOperator` and
`AbstractStreamOperatorV2`. But most usages of these two are currently
Flink tests and internal.
 You could reach out `MultipleInputITCase.testKeyedState` for reference.


* CoGroupedStreams union:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369
* MultipleInputITCase.testKeyedState:
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113

On February 10, 2021 at 17:19:15, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hi,

Yes, but the stream, whose state I want to share, will be indefinite and
have a large volume. Also, not all keys from that stream have to go to
every Task Node. Only the matching keys (with the two other streams) will
do.

Please let me know if there is another cleaner way to achieve this. Thanks.


On 10-Feb-2021, at 12:44 PM, Kezhu Wang  wrote:

Flink has broadcast state to broadcast one stream to other in case you are
not aware of it. It actually duplicates state.

1. Broadcast state:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Best,
Kezhu Wang

On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

Thanks a lot for the response. I will try to check Queryable-state for this
purpose.

Actually, my use case is that I want to share the state of one stream in
two other streams. Right now, I can think of connecting this stream
independently with each of the two other streams and manage the state
twice, effectively duplicating it.

I was trying to check whether there are options where I can share this
state with both the streams but save it only once.


On 10-Feb-2021, at 9:05 AM, Kezhu Wang  wrote:

(a) It is by design. For keyed state, you can only access state for that
key, not others. If you want one value per key, ValueState fits more
appropriate that MapState.
(b) state-processor-api aims to access/create/modify/upgrade offline
savepoint but not running state. Queryable state may meet your requirement,
but it is not actively developed for a while according to my observation
and still beta.

Queryable state:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html


On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

I am creating a class that extends RichCoFlatMapFunction. I need to
connect() two streams to basically share the state of one stream in
another.

This is what I do:

private transient MapState state;

@Override

public void open(Configuration parameters) throws Exception {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(“abc-saved-state",
Types.POJO(KeyClass.class), Types.POJO(ValueClass.class));
state = getRuntimeContext().getMapState(stateDescriptor);


This works correctly.


I have two questions:
(a) Whenever I debug, I can only see the current key in the MapState, not
all the possible keys that were created before and saved. Next time, I get
a hit for another key, I will only see the other key and not the rest of
previous keys. Is it by design or am I missing something?

(b) Can I somehow access this state beyond the class that holds the state?
I.e. can I access the state in some other class? If not, can I use the
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
to
do this? Is that the correct way to access the running state of one stream
elsewhere in the program without corrupting it?


Your response will be greatly appreciated. I will be happy to add more
details if required.

Thanks,
Sandeep Ramesh Khanzode


Re: State Access Beyond RichCoFlatMapFunction

2021-02-09 Thread Kezhu Wang
Flink has broadcast state to broadcast one stream to other in case you are
not aware of it. It actually duplicates state.

1. Broadcast state:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Best,
Kezhu Wang

On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

Thanks a lot for the response. I will try to check Queryable-state for this
purpose.

Actually, my use case is that I want to share the state of one stream in
two other streams. Right now, I can think of connecting this stream
independently with each of the two other streams and manage the state
twice, effectively duplicating it.

I was trying to check whether there are options where I can share this
state with both the streams but save it only once.


On 10-Feb-2021, at 9:05 AM, Kezhu Wang  wrote:

(a) It is by design. For keyed state, you can only access state for that
key, not others. If you want one value per key, ValueState fits more
appropriate that MapState.
(b) state-processor-api aims to access/create/modify/upgrade offline
savepoint but not running state. Queryable state may meet your requirement,
but it is not actively developed for a while according to my observation
and still beta.

Queryable state:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html


On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

I am creating a class that extends RichCoFlatMapFunction. I need to
connect() two streams to basically share the state of one stream in
another.

This is what I do:

private transient MapState state;

@Override

public void open(Configuration parameters) throws Exception {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(“abc-saved-state",
Types.POJO(KeyClass.class), Types.POJO(ValueClass.class));
state = getRuntimeContext().getMapState(stateDescriptor);


This works correctly.


I have two questions:
(a) Whenever I debug, I can only see the current key in the MapState, not
all the possible keys that were created before and saved. Next time, I get
a hit for another key, I will only see the other key and not the rest of
previous keys. Is it by design or am I missing something?

(b) Can I somehow access this state beyond the class that holds the state?
I.e. can I access the state in some other class? If not, can I use the
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
to
do this? Is that the correct way to access the running state of one stream
elsewhere in the program without corrupting it?


Your response will be greatly appreciated. I will be happy to add more
details if required.

Thanks,
Sandeep Ramesh Khanzode


Re: State Access Beyond RichCoFlatMapFunction

2021-02-09 Thread Kezhu Wang
(a) It is by design. For keyed state, you can only access state for that
key, not others. If you want one value per key, ValueState fits more
appropriate that MapState.
(b) state-processor-api aims to access/create/modify/upgrade offline
savepoint but not running state. Queryable state may meet your requirement,
but it is not actively developed for a while according to my observation
and still beta.

Queryable state:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html


On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

I am creating a class that extends RichCoFlatMapFunction. I need to
connect() two streams to basically share the state of one stream in
another.

This is what I do:

private transient MapState state;

@Override

public void open(Configuration parameters) throws Exception {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(“abc-saved-state",
Types.POJO(KeyClass.class), Types.POJO(ValueClass.class));
state = getRuntimeContext().getMapState(stateDescriptor);


This works correctly.


I have two questions:
(a) Whenever I debug, I can only see the current key in the MapState, not
all the possible keys that were created before and saved. Next time, I get
a hit for another key, I will only see the other key and not the rest of
previous keys. Is it by design or am I missing something?

(b) Can I somehow access this state beyond the class that holds the state?
I.e. can I access the state in some other class? If not, can I use the
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
to
do this? Is that the correct way to access the running state of one stream
elsewhere in the program without corrupting it?


Your response will be greatly appreciated. I will be happy to add more
details if required.

Thanks,
Sandeep Ramesh Khanzode


Re: Watermarks on map operator

2021-02-04 Thread Kezhu Wang
> it is not clear to me if watermarks are also used by map/flatmpat
operators or just by window operators.

Watermarks are most liked only used by timing segmented aggregation
operator to trigger result materialization. In streaming, this “timing
segmentation” is usually called “windowing”, so in this sense, watermarks
are just used by window operators. But, there are other type of window,
say, count-window.

> My application reads from a kafka topic (with multiple partitions) and
extracts assigns timestamp on each tuple based on some fields of the kafka
records.

Watermarks depend on timestamps, but the two are different things.
Windowing operations use timestamps to segment/pane/bucket elements to
window, while watermarks signal time-progress to window operations, so they
can materialize memorized window results to downstream.

> it seems that the flatmap operator *does not* guaranteed that it will
process elements in an deterministic time order.

Most operators just pass timestamps/watermarks to downstream. All operators
including window operators process element arriving order. If you want
event time ordered elements, you need do window operation in upstream
operators.

Resources:
*
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html
*
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html
* https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/

Hope it could be helpful.

Best,
Kezhu Wang

On February 4, 2021 at 23:17:07, Antonis Papaioannou (papai...@ics.forth.gr)
wrote:

Hi,

reading through the documentation regarding waterrmarks, it is not clear to
me if watermarks are also used by map/flatmpat operators or just by window
operators.

My application reads from a kafka topic (with multiple partitions) and
extracts assigns timestamp on each tuple based on some fields of the kafka
records. A following keyBy operator creates partitions and sends the tuples
to the corresponding downstream map/flatmap operator. I have set the
timecharacteristic to EventTime.

However, it seems that the flatmap operator *does not* guaranteed that it
will process elements in an deterministic time order.
Is this correct?

Antonis


Re: 检查点成功,但从检查点恢复失败。使用了guava的bloomFilter,有人帮忙分析下吗?

2021-01-30 Thread Kezhu Wang
自定义 state  的 type-serializer,可以尝试下 SimpleVersionedSerializer/avro/protobuf,
etc.

复杂状态应该尽量避免使用依赖反射的 serializer,java Serializable 同理,这些都不好做 state upgrade。

On January 31, 2021 at 11:29:25, 赵一旦 (hinobl...@gmail.com) wrote:

这个问题有人知道吗?
我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。


赵一旦  于2021年1月28日周四 下午6:03写道:

> 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
> 报错堆栈如下,关键错误是什么无法访问public修饰的成员?
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:235)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:248)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:400)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:507)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:47)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:501)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:531)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5
> /30) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:316)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:155)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
Failed
> when trying to restore heap backend
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:116)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:540)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:100)
> at org.apache.flink.runtime.state.StateBackend
> .createKeyedStateBackend(StateBackend.java:178)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:299)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> instance of class: com.google.common.hash.LongAdder
> Serialization trace:
> bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray)
> bits (com.google.common.hash.BloomFilter)
> at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 136)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.create(
> FieldSerializer.java:547)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:523)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:113)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:143)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
> at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
> .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at org.apache.flink.runtime.state.
> KeyGroupPartitioner$PartitioningResultKeyGroupReader
> 

Re: 为什么window的reduceFunction不支持RichFunction呢

2021-01-28 Thread Kezhu Wang
reduceFunction 是和 ReducingStateDescriptor 配合的, 并不是
“window的”。“WindowOperator” 的 function 是 “InternalWindowFunction”,这个可以是
“RichFunction”。

Flink 中的 state 除了可以绑定 key,也可以绑定 namespace,只是 namespace
并没有直接暴露给用户。如果需要的话,可以自己写个 WindowOperator,暴露个 WindowFunction。

Interface WindowFunction {
// You could do incremental aggregation here.
void processElement(Context context, Window window, Element element);

void fireWindow(Context context, Window window);
}

interface WindowedRuntimeContext {
 State getWindowedState(StateDescriptor descriptor).
}

把所有 window concepts 交给 WindowOperator,WindowFunction 作具体业务。

On January 28, 2021 at 20:26:47, 赵一旦 (hinobl...@gmail.com) wrote:

问题如title,当然当前Flink的API还是可以实现RichReduceFunction的效果的,就是基于windowFunction或者processWindowFuntion。

但是,windowFunc和reduceFunc最大的区别在于windowFunc是窗口触发操作,而reduce是实时的增量操作。
如果说我的窗口计算对于每个record的到来都需要一个极复杂的操作,我更希望在reduce中完成,而不是windowFunc中完成,因为那样会导致整个窗口所有key的数据几乎在同一时刻触发,这回导致压力变高。


Re: 求一个简单的示例,flink写orc格式文件,对于Map复杂类型的写法。

2021-01-23 Thread Kezhu Wang
https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java#L259


On January 23, 2021 at 13:49:23, 赵一旦 (hinobl...@gmail.com) wrote:

目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。
对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。


serverTimeColumnVector.vector[rowId] = ele.getTimestamp();

MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];