Questions related to Autoscaler

2023-08-08 Thread Hou, Lijuan via user
Hi Flink team,

This is Lijuan. I am working on our flink job to realize autoscaling. We are 
currently using flink version of 1.16.1, and using flink operator version of 
1.5.0. I have some questions need to confirm with you.

1 - It seems for flink job using flink operator to realize autoscaling, the 
only option to realize autoscaling is to enable the Autoscaler feature, and 
KEDA won’t work, right?

2 - I noticed from the document that we need to upgrade to flink version of 
1.17 to use Autoscaler. But I also noticed that the updated version for flink 
operator is 1.7 now.
Shall we upgrade from 1.5.0 to 1.7 to enable Autoscaler?

3 – I have done a lot of search, and also read the Autoscaler Algorithm page. 
But I am still not very sure about the list of metrics observed automatically.

  *   Will it include CPU load, memory, throughput and kafka consumer lag? 
Could you please provide the whole list of monitored metrics?

-  Is this config related to kafka consumer lag?
kubernetes.operator.job.autoscaler.backlog-processing.lag-threshold
Thanks a lot for the help!

Best,
Lijuan




Re: Investigating use of Custom Trigger to smooth CPU usage

2023-08-08 Thread Tucker Harvey via user
Hi David and Xiangyu, 

For more context, 

We have a job running on our cluster aggregating StatsD metrics in tumbling 
window, which causes CPU usage spikes due to concurrent executions. While 
staggering the windows using StaggerWindow could help this will impact the 
job's accuracy. Instead, we want to investigate that by offsetting the trigger 
times without touching window start and end. we could potentially distribute 
the aligned windows fired executions over time to alleviate resource 
contention, without compromising the correctness of the computations.

> On Aug 3, 2023, at 10:01 PM, David Anderson  wrote:
> 
> There's already a built-in concept of WindowStagger that provides an
> interface for accomplishing this.
> 
> It's not as well integrated (or documented) as it might be, but the
> basic mechanism exists. To use it, I believe you would do something
> like this:
> 
> assigner = new TumblingEventTimeWindows(Time.seconds(5), 0,
> WindowStagger.RANDOM);
> 
> foo.keyBy(...)
>  .window(assigner)
>  ...
> 
> The different stagger strategies are documented in [1].
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/windowing/assigners/WindowStagger.html
> 
> On Wed, Aug 2, 2023 at 7:13 PM xiangyu feng  wrote:
>> 
>> Hi Tucker,
>> 
>> Can you describe more about your running job and how the trigger timer is 
>> configured? Also it would be better if you can attach a FlameGraph to show 
>> the CPU usage when the timer is triggered.
>> 
>> Best,
>> Xiangyu
>> 
>> Tucker Harvey via user  于2023年8月1日周二 05:51写道:
>>> 
>>> Hello Flink community! My team is trying to optimize CPU usage on a running 
>>> job, and we're exploring the option of offsetting the trigger time for 
>>> smoother CPU patterns. Since adjusting the window will compromise job 
>>> correctness, we plan to pursue a custom trigger implementation. We were 
>>> curious if the community had any thoughts or insights on this issue.
>>> 
>>> 



Re: Streaming join performance

2023-08-08 Thread David Anderson
This join optimization sounds promising, but I'm wondering why Flink
SQL isn't taking advantage of the N-Ary Stream Operator introduced in
FLIP-92 [1][2] to implement a n-way join in a single operator. Is
there something that makes this impossible/impractical?

[1] https://cwiki.apache.org/confluence/x/o4uvC
[2] https://issues.apache.org/jira/browse/FLINK-15688

On Sat, Aug 5, 2023 at 3:54 AM shuai xu  wrote:
>
> Hi, we are also paying attention to this issue and have completed the 
> validation of the minibatch join optimization including the intermediate 
> message folding you mentioned. We plan to officially release it in Flink 
> 1.19. This optimization could significantly improves the performance of join 
> operations and we are looking forward to the arrival of Flink 1.19 to help 
> solve your problem.
>
> On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote:
> > Hello, Flink community!
> >
> > I have some important use case for me, which shows extremely bad 
> > performance:
> >
> >   *   Streaming application
> >   *   sql table api
> >   *   10 normal joins (state should be kept forever)
> >
> > Join rules are simple, i have 10 ten tables, which have same primary key. I 
> > want to join result table from 10 pieces.
> >
> > But Flink joins sequentionally, so i have a chain with 10 joins.
> >
> > What happens if i generate update message for first table in chain:
> >
> >
> >   *   first join operator will produce 2 records: delete+insert
> >   *   second operator will double incoming messages. result=2*2=4 messages
> >   *   ...
> >   *   last operator will produce 2**10=1024 messages.
> >
> > Perfomance become extremely slow and resources are wasting away.
> >
> > I've made some simple compaction operator, which compacts records after 
> > join:
> >
> >
> >   *   join operator after receiving delete message, generates 2 messages
> >   *   after receiving insert message, generate 2 more messges
> >   *   but two of the four are compacted. So operator receives 2 
> > messages->sends 2 messages
> >
> > I wonder if this approach is right? Why it is not implemented in Flink yet?
> >
> > And i've got some problem how should i implement it on cluster, because i 
> > have changed some flink sources, which are not pluggable?
> >
> > I have modified StreamExecJoin class and added this code as a proof of 
> > concept:
> >
> >
> > final OneInputTransformation compactTransform =
> > ExecNodeUtil.createOneInputTransformation(
> > transform,
> > "compact join results",
> > "description",
> > new ProcessOperator<>(new CompactStreamOperator(equaliser)),
> > InternalTypeInfo.of(returnType),
> > leftTransform.getParallelism()
> > );
> >
> > return compactTransform;
> >
> > Transform operator:
> > @Override
> >
> > public void processElement(
> > RowData value,
> > ProcessFunction.Context ctx,
> > Collector collector) throws Exception {
> >
> > counter++;
> >
> > boolean compacted=false;
> > if (value.getRowKind()==RowKind.DELETE) {
> > value.setRowKind(RowKind.INSERT);
> > for (int i = buffer.size() - 1; i >= 0; i--) {
> > RowData x = buffer.get(i);
> > if (x.getRowKind() == RowKind.INSERT && 
> > recordEqualiser.equals(x, value)) {
> > buffer.remove(i);
> > compacted = true;
> > break;
> > }
> > }
> > value.setRowKind(RowKind.DELETE);
> > }
> >
> > if (!compacted) {
> > buffer.add(value);
> > }
> >
> > if (counter>=10)
> > {
> > buffer.forEach(collector::collect);
> > buffer.clear();
> > counter=0;
> > }
> > }
> >
> >
> > [cid:f886301c-4708-494e-a6df-d81137150774]
> >
> >
> >
> > Regards,
> > Artem
> >
> >
> >


Re: Easiest way to do a batch outer join

2023-08-08 Thread liu ron
Hi, Flavio

IMO, the current DataStream API is not aligned with DataSet in terms of
capabilities, I think you can try it with GlobalWindow. Another possible
solution is to convert the DataStream to a table[1] first and then try it
with a join on the Table API.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/

Best,
Ron

Flavio Pompermaier  于2023年8月8日周二 00:23写道:

> Hello everybody,
> I have a use case where I need to exclude from a DataStream (that is
> technically a DataSet since I work in batch mode) all already-indexed
> documents.
> My idea is to perfrom an outer join but I didn't find any simple example
> on DataStream working on batch mode..I've tried using coGroup() but then it
> requires me to specify a windows strategy..in batch mode I would't expect
> that..can I use global window?
>
> Thanks in advance,
> Flavio
>


Re: Streaming join performance

2023-08-08 Thread liu ron
Hi, David

Regarding the N-way join, this feature aims to address the issue of state
simplification, it is on the roadmap. Technically there are no limitations,
but we'll need some time to find a sensible solution.

Best,
Ron

David Anderson  于2023年8月9日周三 10:38写道:

> This join optimization sounds promising, but I'm wondering why Flink
> SQL isn't taking advantage of the N-Ary Stream Operator introduced in
> FLIP-92 [1][2] to implement a n-way join in a single operator. Is
> there something that makes this impossible/impractical?
>
> [1] https://cwiki.apache.org/confluence/x/o4uvC
> [2] https://issues.apache.org/jira/browse/FLINK-15688
>
> On Sat, Aug 5, 2023 at 3:54 AM shuai xu  wrote:
> >
> > Hi, we are also paying attention to this issue and have completed the
> validation of the minibatch join optimization including the intermediate
> message folding you mentioned. We plan to officially release it in Flink
> 1.19. This optimization could significantly improves the performance of
> join operations and we are looking forward to the arrival of Flink 1.19 to
> help solve your problem.
> >
> > On 2023/08/04 08:21:51 Сыроватский Артем Иванович wrote:
> > > Hello, Flink community!
> > >
> > > I have some important use case for me, which shows extremely bad
> performance:
> > >
> > >   *   Streaming application
> > >   *   sql table api
> > >   *   10 normal joins (state should be kept forever)
> > >
> > > Join rules are simple, i have 10 ten tables, which have same primary
> key. I want to join result table from 10 pieces.
> > >
> > > But Flink joins sequentionally, so i have a chain with 10 joins.
> > >
> > > What happens if i generate update message for first table in chain:
> > >
> > >
> > >   *   first join operator will produce 2 records: delete+insert
> > >   *   second operator will double incoming messages. result=2*2=4
> messages
> > >   *   ...
> > >   *   last operator will produce 2**10=1024 messages.
> > >
> > > Perfomance become extremely slow and resources are wasting away.
> > >
> > > I've made some simple compaction operator, which compacts records
> after join:
> > >
> > >
> > >   *   join operator after receiving delete message, generates 2
> messages
> > >   *   after receiving insert message, generate 2 more messges
> > >   *   but two of the four are compacted. So operator receives 2
> messages->sends 2 messages
> > >
> > > I wonder if this approach is right? Why it is not implemented in Flink
> yet?
> > >
> > > And i've got some problem how should i implement it on cluster,
> because i have changed some flink sources, which are not pluggable?
> > >
> > > I have modified StreamExecJoin class and added this code as a proof of
> concept:
> > >
> > >
> > > final OneInputTransformation compactTransform =
> > > ExecNodeUtil.createOneInputTransformation(
> > > transform,
> > > "compact join results",
> > > "description",
> > > new ProcessOperator<>(new
> CompactStreamOperator(equaliser)),
> > > InternalTypeInfo.of(returnType),
> > > leftTransform.getParallelism()
> > > );
> > >
> > > return compactTransform;
> > >
> > > Transform operator:
> > > @Override
> > >
> > > public void processElement(
> > > RowData value,
> > > ProcessFunction.Context ctx,
> > > Collector collector) throws Exception {
> > >
> > > counter++;
> > >
> > > boolean compacted=false;
> > > if (value.getRowKind()==RowKind.DELETE) {
> > > value.setRowKind(RowKind.INSERT);
> > > for (int i = buffer.size() - 1; i >= 0; i--) {
> > > RowData x = buffer.get(i);
> > > if (x.getRowKind() == RowKind.INSERT &&
> recordEqualiser.equals(x, value)) {
> > > buffer.remove(i);
> > > compacted = true;
> > > break;
> > > }
> > > }
> > > value.setRowKind(RowKind.DELETE);
> > > }
> > >
> > > if (!compacted) {
> > > buffer.add(value);
> > > }
> > >
> > > if (counter>=10)
> > > {
> > > buffer.forEach(collector::collect);
> > > buffer.clear();
> > > counter=0;
> > > }
> > > }
> > >
> > >
> > > [cid:f886301c-4708-494e-a6df-d81137150774]
> > >
> > >
> > >
> > > Regards,
> > > Artem
> > >
> > >
> > >
>


Re: Questions related to Autoscaler

2023-08-08 Thread liu ron
Hi,
Lijuan

> 1 - It seems for flink job using flink operator to realize autoscaling,
the only option to realize autoscaling is to enable the Autoscaler feature,
and KEDA won’t work, right?


What is KEDA mean?

> 2 - I noticed from the document that we need to upgrade to flink version
of 1.17 to use Autoscaler. But I also noticed that the updated version for
flink operator is 1.7 now.

Shall we upgrade from 1.5.0 to 1.7 to enable Autoscaler?


I have checked the flink-kubernetes-operator projection pom for release-1.5
branch, the dependency flink version is 1.16.1. So I recommend you update
your flink-kubernetes-operator to 1.6. The latest stable release is 1.6.


Best,

Ron

Hou, Lijuan via user  于2023年8月9日周三 03:04写道:

> Hi Flink team,
>
>
>
> This is Lijuan. I am working on our flink job to realize autoscaling. We
> are currently using flink version of 1.16.1, and using flink operator
> version of 1.5.0. I have some questions need to confirm with you.
>
>
>
> 1 - It seems for flink job using flink operator to realize autoscaling,
> the only option to realize autoscaling is to enable the Autoscaler feature,
> and KEDA won’t work, right?
>
>
>
> 2 - I noticed from the document that we need to upgrade to flink version
> of 1.17 to use Autoscaler. But I also noticed that the updated version for
> flink operator is 1.7 now.
>
> Shall we upgrade from 1.5.0 to 1.7 to enable Autoscaler?
>
>
>
> 3 – I have done a lot of search, and also read the Autoscaler Algorithm
> page. But I am still not very sure about the list of metrics observed
> automatically.
>
>- Will it include CPU load, memory, throughput and kafka consumer lag?
>Could you please provide the whole list of monitored metrics?
>
> -  Is this config related to kafka consumer lag?
> kubernetes.operator.job.autoscaler.backlog-processing.lag-threshold Thanks
> a lot for the help!   Best, Lijuan
>
>
>
>
>
>


Re: About[jobmanager.memory.off-heap.size] not work on Flink(flink-session on yarn)

2023-08-08 Thread liu ron
Hi, wenjiang

It does seem a bit odd that it could theoretically work either way. I think
you can check the Flink JobManager log to find more information.


Best,
Ron


傅文江  于2023年7月24日周一 17:37写道:

>
> When I use ESSink, I find that I need to set Flink’s
> jobmanager.memory.off-heap.size to 256MB. The default 128MB does not meet
> the running requirements. It can be successful by modifying the
> configuration file, but I want to dynamically adjust the
> jobmanager.memory.off-heap.size through the command line submission mode.
> According to the official document 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/
> docs/deployment/memory/mem_setup_jobmanager/#configure-off-heap-memory,
> my commit command is like this:
>
> ./yarn-session.sh --detached -tm 12288 -jm 2048 -d -nm Name1Process -s 12
> -yD jobmanager.memory.enable-jvm-direct-memory-limit=true -yD
> jobmanager.memory.off-heap.size=256m
> ./flink run --detached -p 1 --class com.fuwenjaing.demo1
> xx/name1-message.jar
>
> Then after running, I found that the Off-Heap Memory of JobManager is
> still 128M. I would like to ask how to solve this problem. The command line
> cannot dynamically adjust the jobmanager.memory.off-heap.size, can it only
> be modified through the configuration file?
>
> My environment:
> Flink version is 1.14.4
> Yarn version Hadoop 3.0.0-cdh6.3.2
>
> Best wishes
>
>


Integarting Apache Flink with Apache Ranger

2023-08-08 Thread arjun s
Hi,
I'm interested to know if there is an available feature for integrating
Apache Flink with Apache Ranger. If so, could you kindly share the relevant
documentation with me?

Thanks and regards,
Arjun S