Re: Any issues with reinterpretAsKeyedStream when scaling partitions?

2021-10-24 Thread JING ZHANG
>
> My top guess was that the mapping of keys to task managers changes
> depending on the operator.  E.g. if the same key (regardless of the
> operator) goes to the same task manager, then I'd assume it's fine to
> support multiple inputs into an operator with reinterpretAsKeyedStream.

I agree with you at this point.

My guess is after you change the parallelism, the partitioners between
those operators are not 'forward'?  Because the operator with two input
could not be chained with its previous operator. If you change the
parallelism, it is possible to introduce other partitioner instead of
'forward'.
Let's assume I1 is first input vertex of join, I2 is second input vertex of
join, J is the vertex contain join operator. Before stop with savepoint,
whether partitioner between l1 and J is 'forward'? whether partitioner
between l2 and J is 'forward? After recover with savepoint, you change the
parallelism, whether the partitioner l1-> J and l2-> J still be 'forward'?
Would you please check this? If they are still be 'forward', we need to
look for other reason for this problem.

Dan Hill  于2021年10月25日周一 上午10:43写道:

> On Sat, Oct 23, 2021 at 9:16 PM JING ZHANG  wrote:
>
>> I found the following link about this.  Still looks applicable.  In my
>>> case, I don't need to do a broadcast join.
>>>
>>> https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839
>>>
>> This is a good document. This document describes how to remove redundant
>> shuffles in batch jobs. You can find the operator implementation in
>> `BatchMultipleInputStreamOperator`[1]. It's a subclass of
>> `MultipleInputStreamOperator`[2] which we referred in the last email.
>>
>> I'm not sure I understand why having two inputs is an issue in my case.?
>>>
>> We guess there exists a network shuffle between your two operators
>> because your second operator has multiple inputs. The operator with
>> multiple inputs would not be chained with it's previous operator, please
>> see more information in Thias's last email.
>>
>
> Yea, I'm more curious "why?"  My top guess was that the mapping of keys to
> task managers changes depending on the operator.  E.g. if the same key
> (regardless of the operator) goes to the same task manager, then I'd assume
> it's fine to support multiple inputs into an operator with
> reinterpretAsKeyedStream.
>
>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java
>> [2]
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MultipleInputStreamOperator.java
>>
>>
>> Dan Hill  于2021年10月22日周五 下午1:34写道:
>>
>>> Probably worth restating.  I was hoping to avoid a lot of shuffles in my
>>> Flink job.  A bunch of the data is already keyed by the userId (and stays
>>> keyed by it most of the Flink job).
>>>
>>> I'm not sure I understand why having two inputs is an issue in my case.
>>> Does Flink send the same keys to different task managers depending on the
>>> operator (even if parallelism is the same)?  It's also weird the output
>>> looks fine as is and fails on rescaling partitions.
>>>
>>>
>>> On Thu, Oct 21, 2021 at 10:23 PM Dan Hill  wrote:
>>>
 I found the following link about this.  Still looks applicable.  In my
 case, I don't need to do a broadcast join.

 https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839

 On Thu, Oct 21, 2021 at 9:51 PM Dan Hill  wrote:

> Interesting.  Thanks, JING ZHANG!
>
> On Mon, Oct 18, 2021 at 12:16 AM JING ZHANG 
> wrote:
>
>> Hi Dan,
>> > I'm guessing I violate the "The second operator needs to be
>> single-input (i.e. no TwoInputOp nor union() before)" part.
>> I think.you are right.
>>
>> Do you want to remove shuffle of two inputs in your case? If yes,
>> Flink provides support for multiple input operators since 1.11
>> version. I think it might satisfy your need. You could find more in [1].
>> However, at present, this function does not provide a complete
>> interface of dataStream API. If users want to use it, they need to 
>> manually
>> create multipleInputTransformation and multipleConnectedStreams.
>>
>>> MultipleInputTransformation transform = new 
>>> MultipleInputTransformation<>(
>>>"My Operator",
>>>new SumAllInputOperatorFactory(),
>>>BasicTypeInfo.LONG_TYPE_INFO,
>>>1);
>>>
>>> env.addOperator(transform
>>>.addInput(source1.getTransformation())
>>>.addInput(source2.getTransformation())
>>>.addInput(source3.getTransformation()));
>>> new MultipleConnectedStreams(env)
>>>.transform(transform)
>>>.addSink(resultSink);
>>>
>>>
>> I would invite @Piotr to double check 

Not cleanup Kubernetes Configmaps after execution success

2021-10-24 Thread Hua Wei Chen
Hi all,

We have Flink jobs run on batch mode and get the job status via JobHandler.
onJobExecuted

()[1].

Base on the thread[2], we expected the Configmaps will be cleaned up after
execution successfully.

But we found some Configmaps not be cleanup after job success. On the other
hand, the Configmaps contents and the labels are removed.

Here is one of the Configmaps.

```
apiVersion: v1
kind: ConfigMap
metadata:
  name: match-6370b6ab-de17-4c93-940e-0ce06d05a7b8-resourcemanager-leader
  namespace: app-flink
  selfLink: >-

/api/v1/namespaces/app-flink/configmaps/match-6370b6ab-de17-4c93-940e-0ce06d05a7b8-resourcemanager-leader
  uid: 80c79c87-d6e2-4641-b13f-338c3d3154b0
  resourceVersion: '578806788'
  creationTimestamp: '2021-10-21T17:06:48Z'
  annotations:
control-plane.alpha.kubernetes.io/leader: >-

{"holderIdentity":"3da40a4a-0346-49e5-8d18-b04a68239bf3","leaseDuration":15.0,"acquireTime":"2021-10-21T17:06:48.092264Z","renewTime":"2021-10-21T17:06:48.092264Z","leaderTransitions":0}
  managedFields:
- manager: okhttp
  operation: Update
  apiVersion: v1
  time: '2021-10-21T17:06:48Z'
  fieldsType: FieldsV1
  fieldsV1:
'f:metadata':
  'f:annotations':
.: {}
'f:control-plane.alpha.kubernetes.io/leader': {}
data: {}
```


Our Flink apps run on ver. 1.14.0.
Thanks!

BR,
Oscar


Reference:
[1] JobListener (Flink : 1.15-SNAPSHOT API) (apache.org)

[2]
https://lists.apache.org/list.html?user@flink.apache.org:lte=1M:High%20availability%20data%20clean%20up%20


flink yarn-per-job???? ynm????????????

2021-10-24 Thread Lawulu
??
bin/flink run -ynm flink-test -t yarn-per-job --detached 
./examples/streaming/TopSpeedWindowing.jar


??yarn ui??nameFlink per-job cluster

??????????

2021-10-24 Thread JasonLee
Hi


?? user-zh-unsubscr...@flink.apache.org 


Best
JasonLee


??2021??10??25?? 10:52??zdj<1361776...@qq.com.INVALID> ??


????

2021-10-24 Thread zdj


Re: Any issues with reinterpretAsKeyedStream when scaling partitions?

2021-10-24 Thread Dan Hill
On Sat, Oct 23, 2021 at 9:16 PM JING ZHANG  wrote:

> I found the following link about this.  Still looks applicable.  In my
>> case, I don't need to do a broadcast join.
>>
>> https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839
>>
> This is a good document. This document describes how to remove redundant
> shuffles in batch jobs. You can find the operator implementation in
> `BatchMultipleInputStreamOperator`[1]. It's a subclass of
> `MultipleInputStreamOperator`[2] which we referred in the last email.
>
> I'm not sure I understand why having two inputs is an issue in my case.?
>>
> We guess there exists a network shuffle between your two operators
> because your second operator has multiple inputs. The operator with
> multiple inputs would not be chained with it's previous operator, please
> see more information in Thias's last email.
>

Yea, I'm more curious "why?"  My top guess was that the mapping of keys to
task managers changes depending on the operator.  E.g. if the same key
(regardless of the operator) goes to the same task manager, then I'd assume
it's fine to support multiple inputs into an operator with
reinterpretAsKeyedStream.


> [1]
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperator.java
> [2]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MultipleInputStreamOperator.java
>
>
> Dan Hill  于2021年10月22日周五 下午1:34写道:
>
>> Probably worth restating.  I was hoping to avoid a lot of shuffles in my
>> Flink job.  A bunch of the data is already keyed by the userId (and stays
>> keyed by it most of the Flink job).
>>
>> I'm not sure I understand why having two inputs is an issue in my case.
>> Does Flink send the same keys to different task managers depending on the
>> operator (even if parallelism is the same)?  It's also weird the output
>> looks fine as is and fails on rescaling partitions.
>>
>>
>> On Thu, Oct 21, 2021 at 10:23 PM Dan Hill  wrote:
>>
>>> I found the following link about this.  Still looks applicable.  In my
>>> case, I don't need to do a broadcast join.
>>>
>>> https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839
>>>
>>> On Thu, Oct 21, 2021 at 9:51 PM Dan Hill  wrote:
>>>
 Interesting.  Thanks, JING ZHANG!

 On Mon, Oct 18, 2021 at 12:16 AM JING ZHANG 
 wrote:

> Hi Dan,
> > I'm guessing I violate the "The second operator needs to be
> single-input (i.e. no TwoInputOp nor union() before)" part.
> I think.you are right.
>
> Do you want to remove shuffle of two inputs in your case? If yes,
> Flink provides support for multiple input operators since 1.11
> version. I think it might satisfy your need. You could find more in [1].
> However, at present, this function does not provide a complete
> interface of dataStream API. If users want to use it, they need to 
> manually
> create multipleInputTransformation and multipleConnectedStreams.
>
>> MultipleInputTransformation transform = new 
>> MultipleInputTransformation<>(
>>"My Operator",
>>new SumAllInputOperatorFactory(),
>>BasicTypeInfo.LONG_TYPE_INFO,
>>1);
>>
>> env.addOperator(transform
>>.addInput(source1.getTransformation())
>>.addInput(source2.getTransformation())
>>.addInput(source3.getTransformation()));
>> new MultipleConnectedStreams(env)
>>.transform(transform)
>>.addSink(resultSink);
>>
>>
> I would invite @Piotr to double check this conclusion. He is more
> professional on this topic.
>
> @Piotr, Would you please check Dan's question? Please correct me if
> I'm wrong.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR
>
> Best,
> JING ZHANG
>
> Dan Hill  于2021年10月16日周六 上午6:28写道:
>
>> Thanks Thias and JING ZHANG!
>>
>> Here's a Google drive folder link
>> 
>> with the execution plan and two screenshots from the job graph.
>>
>> I'm guessing I violate the "The second operator needs to be
>> single-input (i.e. no TwoInputOp nor union() before)" part.
>>
>> After I do a transformation on a KeyedStream (so it goes back to a
>> SingleOutputStreamOperator), even if I do a simple map, it usually
>> disallows operator chaining.  Even with reinterpretAsKeyedStream, it
>> doesn't work.
>>
>>
>> On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG 
>> wrote:
>>
>>> Hi Dan,
>>> Sorry for tipos,  I meant to provide the code to reproduce the
>>> 

Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-24 Thread Caizhi Weng
Hi!

这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
metrics 读取,见 [1]。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors

杨浩  于2021年10月25日周一 上午10:20写道:

> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度


Re: how to delete all rows one by one in batch execution mode;shutdown cluster after all tasks finished

2021-10-24 Thread Caizhi Weng
Hi!

Thanks for the clarification. Flink currently does not have the
functionality to "revert all operations till some point". What I would
suggest is still to discard the resulting tables and run the pipeline from
the point when the filtering logic is changed. If the pipeline has
processed some data from that point you can pause the pipeline and run a
batch job to catch up with all changed logic in one go, then resume the
pipeline and it will process records as normal.

vtygoss  于2021年10月22日周五 下午4:31写道:

> Hi!
>
> Thanks for your reply! I think i didn't make myself clear for problem 1,
> so i draw a picture.
>
>
> 1."tables in DB": things start at the database, and we sync the tables'
> changelog to dynamic tables by CDC tool. Each changelog includes RowData
> and RowKind such as INSERT / UPDATE / DELETE.
>
> 2. "logics": modeling like ods / dwd 
>
> 3. "Table 1": a table which has some downstream tables. Table 2 is
> produced by "count(1) from table_1" and input Table 1; Table 3 is produced
> by "udf(...) from table_1 where a>0" and input Table 1. And when an insert
> event or delete event occurs in Table 1, Table 2 and Table 3 will change
> accordingly, as does the downstream tables of Table 2 and Table 3.
>
>
> - problem: The logic which generates Table 1 changes from "select * from
> table_0 where a>0" to "select * from table_0 where a<0". The old data in
> Table 1 generated by filter "a>0" is error now, and all downstream tables
> of Table 1 are error too.  So I want to find an easy way to truncate
> error data in Table 1 and all downstream tables of Table 1, but truncating
> Table 1 does not emit deletion event of each record in Table 1, so
> truncating doesn't trigger the deletion of corresponding records in all
> downstream tables which i think is the most important. Now I want to read
> all records in Table 1 and modify the rowkind of each Row from
> RowKind.INSERT to RowKind.DELETE, but i didn't find correspond API in
> BatchTableEnvironment or BatchExecutionEnvironment, code as below.
>
>
> ```
>
> TableEnvironemnt tenv;
>
> Table t1 = tenv.from("table_1
> /*+OPTIONS('read.streaming.enabled'='false')*/")
>
> 
>
> Table t2 = t1.map(row->row.setRowKind(RowKind.DELETE))
>
> t2.insertInto("table_1")
>
> ```
>
>
> The suggestion creating a new table based on new logic, "new Table 1' " as
> shown in pic. I think creating new table will not solute this problem
> unless createing all downstream tables of Table 1 for example Table 2', but
> it's too heavy.
>
>
> Thanks for your suggestion. Do you have any other suggestions?
>
>
> Best Regards!
>
>
>
>
>
>
>
>
>
> 在 2021年10月22日 10:55,Caizhi Weng 写道:
>
> Hi!
>
> For problem 1, Flink does not support deleting specific records. As you're
> running a batch job, I suggest creating a new table based on the new filter
> condition. Even if you can delete the old records you'll still have to
> generate the new ones, so why not generate them directly into a new place?
>
> For problem 2, yarn-cluster is the mode for a yarn session cluster, which
> means the cluster will remain even after the job is finished. If you want
> to finish the Flink job as well as the yarn job, use yarn-per-job mode
> instead.
>
> vtygoss  于2021年10月21日周四 下午2:46写道:
>
>>
>> Hi, community!
>>
>>
>> I am working on building data processing pipeline based on changelog(CDC)
>> and i met two problems.
>>
>>
>> --(sql_0)--> Table A --(sql_1)---> Table B --->other tables downstream
>>
>>   --(sql_2)--->Table C---> other tables downstream
>>
>>
>> Table A is generated based on sql_0; Table B is generated based on sql_1
>> and input Table A; Table C is generated based on sql_2 and input Table A;
>> Table B and C have some downstream tables based on modeling.
>>
>>
>> - problem 1. When sql_0 logic is changed, e.g. from "select * from xx
>> where a>0" to " from xx where a<0", the data produced by filter "a>0" is
>> error. I want to find a way to clear the error data in Table A and
>> trigger the corresponding deletions of all tables downstream, then produce
>> new data by new filter a<0. So how to change the rowkind of each row in
>> Table A to RowKind.DELETE in Flink Batch execution mode? It will be very
>> nice if there is an use case of Flink 1.12.0.
>>
>>
>> - problem 2.  I found that Flink will launch a session cluster even
>> runtime mode is "yarn-cluster". In batch execution mode, the cluster still
>> run after all tasks finished. How to shutdown the cluster?
>>
>>
>>
>> Thanks for your any suggestion or reply!
>>
>>
>> Best Regards!
>>
>


在开启checkpoint后如何设置offset的自动提交以方便监控

2021-10-24 Thread 杨浩
请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度