RIC Incremental and GIC Incremental checkpoint use question

2023-03-25 Thread ConradJam
Hi Community . I would like to consult about some configurations of
Rocksdb incremental checkpoints and GIC. In Flink 1.17,I want to try
this feature . If Generic Incremental Checkpoint (GIC) enable, rocksdb
Incremental Checkpoint can be disable or enable, Do they both have
conflicting switches, does my turning on (GIC) mean I no longer need
enable rocksdb Incremental Checkpoint ? The community seems to have no
documentation to describe whether the two can be shared or only one of
them can be enabled, and the other does not need to be enabled


Re: Unit Testing onTimer() event function with TestHarness - onTimer() not being called

2023-03-25 Thread David Anderson
1. The timestamp passed to testHarness.processElement should be the
timestamp that would have been extracted from the element by the
timestamp extractor in your watermark strategy.

2. Your tests should call testHarness.processWatermark and pass in the
watermark(s) you want to work with.

processBroadcastWatermark is used for testing the behavior of a
(Keyed)BroadcastProcessFunction when a watermark arrives on the
broadcast channel.

Your test might look something like this:

// send in some data
testHarness.processElement(6L, 10L);

// verify that a timer was created
assertThat(testHarness.numEventTimeTimers(), is(1));

// should cause the fire timer to fire
testHarness.processWatermark(new Watermark(20L));
assertThat(testHarness.numEventTimeTimers(), is(0));

// verify the results
assertThat(testHarness.getOutput(), containsInExactlyThisOrder(6L));

Best,
David


On Wed, Mar 22, 2023 at 6:09 AM Gabriel Angel Amarista Rodrigues
 wrote:
>
> Hello dear Flink team
>
> We are rather new to Flink and having problems understanding how unit
> testing works overall.
> We want to test the* onTimer()* method after registering a timer in the
> *processElement()* function. However, the *onTimer() *is never called.
>
> We were investigating the documentation, blog posts, books, anything we
> could find and collected a few questions/doubts that we would like your
> input on:
>
> 1. We are using event time and when calling *testHarness.processElement. *The
> function through TestHarness however requires a *timestamp*.
> Is this *timestamp *supposed to be set as the *processingTimestamp*?
> We are, however, interested in an *EventTimeFunction*, not a
> *ProcessingTimerFunction.* Would it make sense in our use case to call
> testHarness.processElement(event, event.getTimestamp())?
>
> 2. We have added a debug call to *ctx.currentWaterMark()* call in the
> original *processElement* function but this always returns *Long.MIN value*
> even though we call *processWaterMark* before the *processElement* call.
>
> Is the *processWatermark *constrained in tests somehow?
>
> We noticed there is also a *processBroadcastWatermark*. Is this necessary
> when we are working with* KeyedProcessingFunctions? *Is it analogous to
> *processBroadcastElement()*?
>
> We are really excited about working with Flink, it is a great tool.
> Great job!
>
> We will be waiting for your input.
> Thanks beforehand for your support.
>
> Kind regards,
> Gabriel Rodrigues


[jira] [Created] (FLINK-31614) Upgrade autoscaling example to 1.17.0 image

2023-03-25 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-31614:
--

 Summary: Upgrade autoscaling example to 1.17.0 image
 Key: FLINK-31614
 URL: https://issues.apache.org/jira/browse/FLINK-31614
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora


The autoscaling example yaml uses 1.17 snapshot images, we should upgrade to 
the released one and update any related documentation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-25 Thread Jane Chan
Hi Benchao,

Thank you for sharing the discussion thread with us; it's very
informational and helpful. Let's set aside the semantics for a moment, and
let me respond to the rest of your points.

> SQL + hints may be the easiest way for users to write and maintain their
SQL jobs. Of course, hints are not perfect due to its propagation process
is not very clear for end users, IMO, this is not a problem since it's
similar to SQL itself, users are usually not that easy to understand the
physical execution graph compiled from the SQL too, especially new comers.

I think it is only partially correct to say that users do not understand or
need to care about the SQL compilation.

In fact, it is the user who does not need to use hints or other advanced
configurations like optimizing the state usage, that does not need to care
about it.

However, once users need to use hints, it means that they are confident
that they're smarter than the optimizer, know their data distribution
better, and are able to use hints to guide the optimizer to obtain a better
plan. I don't think these users will be novice users. In this case, would
they prefer to use a hint with an unclear propagation mechanism to
configure fine-grained TTL with a guess, or would they prefer to have a
complete file to present which operators are stateful and then configure
them?

If a novice user doesn't understand anything, how could he have a need to
tune fine-grained TTL? He may not even know what hint/TTL is. This seems to
be a paradox. If by any chance,  he is required to do such a complex task,
presenting him with a structured JSON file and listing clearly which
operator has a TTL JSON key (which implies "what you need to do is just
fill in the blank aka. modify the corresponding JSON value") is much better
than making himself lost in the doc pages of join/agg/row_number/etc. and
trying to understand which operation may contain a stateful computation.

> I think Shuo's idea is very good that hints and json plan may not
contradict each other. Finally we may have multi level configurations,
e.g., hints -> json plan -> job level configurations -> cluster level
configurations, top level configuration overrides low level's.

Before rushing to discuss whether to introduce multiple levels of state TTL
configuration, I think we should first discuss how to clearly map query
blocks to the underlying stateful operators and let users understand. Maybe
in another FLIP.

Best,
Jane

On Sat, Mar 25, 2023 at 4:12 PM Benchao Li  wrote:

> Thanks Jane for starting this discussion. Finally we are considering
> fine-grained configurations for SQL jobs, this is very exciting!
>
> I see a lot of opinions that "SQL hints should not affect the results", and
> this indeed has been discussed extensively while we introduced
> FLIP-113[1][2]. After three years, I still think that in streaming SQL,
> there are a lot of new things that are different from traditional bounded
> SQL, and hint may be one of them.
>
> Most people have raised a valid concern about the ease of use. SQL + hints
> may be the easiest way for users to write and maintain their SQL jobs. Of
> course, hints are not perfect due to its propagation process is not very
> clear for end users, IMO, this is not a problem since it's similar to SQL
> itself, users are usually not that easy to understand the physical
> execution graph compiled from the SQL too, especially new comers.
>
> I think Shuo's idea is very good that hints and json plan may not
> contradict each other. Finally we may have multi level configurations,
> e.g., hints -> json plan -> job level configurations -> cluster level
> configurations, top level configuration overrides low level's.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
> [2] https://lists.apache.org/thread/78s8bqdql65o61742yb2rtjff66m166r
>
> Jing Ge  于2023年3月25日周六 07:26写道:
>
> > Thanks Jane for driving this FLIP.
> >
> > The FLIP is quite interesting. Since the execution plan has finer
> > granularity than the plain SQL script, Hints at SQL level might not be
> able
> > to touch specific operators, which turns out that the idea of leveraging
> > the compiled execution plan is brilliant.
> >
> > However, there are some concerns that might need to be considered.
> >
> > - One thing I didn't fully understand. I might be wrong. Could those ttl
> > configs be survived when SQL jobs are restarted? Does that mean that,
> once
> > I modified the compiled sql plan, the json file will become the sql job?
> I
> > have to always call the EXECUTE PLAN every time when the job needs to be
> > restarted? In case that the original SQL script has been changed, we need
> > to compile a version2 sql plan and copy the ttl configs from version1 sql
> > plan to version2 and drop version1. This means we have to keep the
> compiled
> > json file and create a link with the original SQL script. I am not sure
> if
> > I understood it 

Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-25 Thread Jane Chan
Hi Leonard, Jing and Shengkai,

Thanks so much for your insightful comments. Here are my thoughts

@Shengkai
> 1. How the Gateway users use this feature? As far as I know, the EXEUCTE
PLAN only supports local file right now. Is it possible to extend this
syntax to allow for reading plan files from remote file systems?

Nice catch! Currently, the "COMPILE PLAN" and "EXECUTE PLAN" statements
only support a local file path without the scheme (see
TableEnvironmentImpl.java#L773
).
It's reasonable to extend the support to Flink's FileSystem. Besides, the
JSON plan should also be added to the resource cleaning mechanism for the
Gateway mode, just like we do with the "ADD JAR" operation, cleaning it up
when the session ends. I will take your suggestion and make changes to FLIP.

> 2. I would like to inquire if there are any limitations on this feature?
I have encountered several instances where the data did not expire in the
upstream operator, but it expired in the downstream operator, resulting in
abnormal calculation results or direct exceptions thrown by the operator
(e.g. rank operator). Can we limit that the expiration time of downstream
operator data should be greater than or equal to the expiration time of
upstream operator data?

This is an excellent point. In fact, the current state TTL is based on the
initialization time of each operator, which is inherently unaligned. The
probability of such unalignment is magnified now that fine-grained
operator-level TTL is supported. While on the other hand, this FLIP is not
the root cause of this issue. To systematically solve the problem of TTL
unalignment between operators, I understand that we need a larger FLIP to
accomplish this. And I'll mention this point in the FLIP doc. WDYT?

Back to your suggestions, in most scenarios, the TTL between multiple state
operators should be non-monotonically decreasing, but there may be some
exceptions, such as the SinkUpsertMaterializer introduced to solve the
changelog disorder problem. It may not be appropriate if we block it at the
implementation level. But it does happen that the users misconfigure the
TTL, so in this case, my idea is that, since FLIP-280

introduces an experimental feature "EXPLAIN PLAN_ADVICE", and FLIP-190

also
introduces a new syntax "EXPLAIN PLAN FOR '/foo/bar/sql.json'", what if we
add a new plan analyzer, which will analyze the compiled plan to perform
detection. The analyzer gives a warning attached to the optimized physical
plan when the TTL of the predecessor is larger than the TTL of the
posterior.  Will it draw the user's attention and make troubleshooting
easier?

@Leonard and @Jing
You both expressed the same concern about the high cost of understanding
and changing the behavior of users using SQL. IMO as opposed to the usual
features, fine-grained TTL configuration is a feature for advanced users. I
draw a pic to illustrate this. You can see this pic to estimate the funnel
conversion rate, from SQL jobs that involve stateful and TTL-controlled
operators to jobs that require only one TTL configuration to meet the
requirements, to jobs that eventually require multiple TTL configurations,
which is in a decreasing distribution. The first and second-tier users
should not feel bothered about this.
[image: image.png]
We will explain in detail in the documentation how to use this feature, how
to do it, and it is a feature that needs to be used carefully. Also, in
conjunction with FLIP-280 and FLIP-190, we can print out the SQL-optimized
physical and execution plan for the JSON file (with tree style just like
the normal EXPLAIN statement), would this help the advanced users
understand the compiled JSON plan represents?


@Jing
> One thing I didn't fully understand. I might be wrong. Could those ttl
configs be survived when SQL jobs are restarted? I have to always call the
EXECUTE PLAN every time when the job needs to be restarted?

If it's a new SQL job and has never been submitted before, and users want
to enable the fine-grained state TTL control, then they will first use
COMPILE PLAN statement to generate the JSON file and modify the stateful
operator's state metadata as needed, then submit the job via EXECUTE PLAN
statement. By the word "restarted", I assume there are historical instances
before and users want to restore from some checkpoints or savepoints.
Without SQL changes, users can directly use Flink CLI $ bin/flink run -s
:savepointPath -restoreMode :mode -n [:runArgs]

[jira] [Created] (FLINK-31613) Some default operator config values are overwritten by values.yaml

2023-03-25 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-31613:


 Summary: Some default operator config values are overwritten by 
values.yaml
 Key: FLINK-31613
 URL: https://issues.apache.org/jira/browse/FLINK-31613
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Mate Czagany


It's a bit confusing that in the documentation it's stated that 
'kubernetes.operator.reconcile.interval' is 1 min by default and 
'kubernetes.operator.observer.progress-check.interval' is 10 sec when they are 
being overwritten to 15 sec and 5 sec respectively in the default values.yaml.

 

A possible solution might be to change the default values to 15 and 5 sec in 
the configuration values and remove/comment them in values.yaml, however this 
will introduce a change in configuration for users that have set a custom 
'defaultConfiguration.flink-conf.yaml' value.

 

Please let me know what you think and if the solution sounds good feel free to 
assign me this ticket and I'll create a PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31612) ClassNotFoundException when using GCS path as HA directory.

2023-03-25 Thread Mohit Aggarwal (Jira)
Mohit Aggarwal created FLINK-31612:
--

 Summary: ClassNotFoundException when using GCS path as HA 
directory.
 Key: FLINK-31612
 URL: https://issues.apache.org/jira/browse/FLINK-31612
 Project: Flink
  Issue Type: Bug
 Environment: Flink Kuberenetes operator: 1.4

Flink version: 1.17

GKE Kubernetes cluster.

 
Reporter: Mohit Aggarwal


Hi,

When I am trying to run Flink job in HA mode with GCS path as a HA directory 
(eg: [gs://flame-poc/ha]) or while starting a job from checkpoints in GCS I am 
getting following exception:
{code:java}
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.(Groups.java:107)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.(Groups.java:102)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300)
 ~[?:?]
at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575)
 ~[?:?]
at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226)
 ~[?:?]
at 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858)
 ~[?:?]
at 
org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170)
 ~[?:?] {code}
{*}Observations{*}:

While using File system as a HA path and GCS as checkpointing directory the job 
is able to write checkpoints to GCS checkpoint path. 

After debugging what I found was all the *org.apache.hadoop* paths are shaded 
to {*}org.apache.flink.fs.shaded.hadoop3.org{*}{*}.apache.hadoop{*}. Ideally 
the code should look for  
{*}org.apache.flink.fs.shaded.hadoop3.org{*}{*}.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback{*}
 instead of  *org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.*
I think it is not getting shaded over here
[https://github.com/apache/hadoop/blob/branch-3.3.4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java#L108]

As a workaround I rebuilt flink-gs-fs-hadoop plugin removing this relocation 
and it worked for me.
{code:java}

org.apache.hadoop
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop
 {code}
 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-292: Support configuring state TTL at operator level for Table API & SQL programs

2023-03-25 Thread Benchao Li
Thanks Jane for starting this discussion. Finally we are considering
fine-grained configurations for SQL jobs, this is very exciting!

I see a lot of opinions that "SQL hints should not affect the results", and
this indeed has been discussed extensively while we introduced
FLIP-113[1][2]. After three years, I still think that in streaming SQL,
there are a lot of new things that are different from traditional bounded
SQL, and hint may be one of them.

Most people have raised a valid concern about the ease of use. SQL + hints
may be the easiest way for users to write and maintain their SQL jobs. Of
course, hints are not perfect due to its propagation process is not very
clear for end users, IMO, this is not a problem since it's similar to SQL
itself, users are usually not that easy to understand the physical
execution graph compiled from the SQL too, especially new comers.

I think Shuo's idea is very good that hints and json plan may not
contradict each other. Finally we may have multi level configurations,
e.g., hints -> json plan -> job level configurations -> cluster level
configurations, top level configuration overrides low level's.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
[2] https://lists.apache.org/thread/78s8bqdql65o61742yb2rtjff66m166r

Jing Ge  于2023年3月25日周六 07:26写道:

> Thanks Jane for driving this FLIP.
>
> The FLIP is quite interesting. Since the execution plan has finer
> granularity than the plain SQL script, Hints at SQL level might not be able
> to touch specific operators, which turns out that the idea of leveraging
> the compiled execution plan is brilliant.
>
> However, there are some concerns that might need to be considered.
>
> - One thing I didn't fully understand. I might be wrong. Could those ttl
> configs be survived when SQL jobs are restarted? Does that mean that, once
> I modified the compiled sql plan, the json file will become the sql job? I
> have to always call the EXECUTE PLAN every time when the job needs to be
> restarted? In case that the original SQL script has been changed, we need
> to compile a version2 sql plan and copy the ttl configs from version1 sql
> plan to version2 and drop version1. This means we have to keep the compiled
> json file and create a link with the original SQL script. I am not sure if
> I understood it correctly, it seems like a lot of maintenance effort.
> - If I am not mistaken, the compiled sql plan introduced by FLIP-190 is
> only used for SQL job migration/update. Common stages that Flink uses to
> produce the execution plan from SQL does not contain the compiling step.
> This makes one tool do two different jobs[1], upgrade + ttl tuning.
> and tighten the dependency on compiling sql plans. Flink SQL users have to
> deal with a compiled sql plan for performance optimization that is not
> designed for it.
> - The regular working process for Flink SQL users is changed, from only
> dealing with SQL like scripts to moving between SQL like scripts and file
> modifications back and forth. This is a big change for user behaviours. One
> option could be that we upgrade/extend the COMPILE PLAN to allow users
> update ttl for operators at the script level. But I am not sure if it is
> possible to point out specific operators at this level. Another option is
> to print out the result of COMPILE PLAN and enable EXECUTE PLAN 'json plan
> as string'. Third option is to leverage a data platform to virtualize the
> compiled sql plan and provide related interactions for updating ttl and
> submit(execute) the modified compiled sql plan.
>
> On the other side, there is one additional benefit with this proposal: we
> could fine tune SQL jobs while we migrate/upgrade them. That is nice!
>
> Best regards,
> Jing
>
> [1] https://en.wikipedia.org/wiki/Single-responsibility_principle
>
> On Fri, Mar 24, 2023 at 4:02 PM Leonard Xu  wrote:
>
> > Thanks Jane for the proposal.
> >
> > TTL of state is an execution phase configuration, serialized json graph
> > file is the graph for execution phase, supporting the operator level
> state
> > TTL in the execution json file makes sense to me.
> >
> > From the user's perspective, I have two concerns:
> > 1. By modifying the execution graph node configuration, this raises the
> > cost for users to understand, especially for SQL users.
> > 2. Submitting a SQL job through `exec plan json file` is not so intuitive
> > as users cannot see the SQL detail of the job
> >
> > Best,
> > Leonard
> >
> > On Fri, Mar 24, 2023 at 5:07 PM Shengkai Fang  wrote:
> >
> > > Hi, Jane.
> > >
> > > Thanks for driving this FLIP and this feature are very useful to many
> > > users. But I have two problems about the FLIP:
> > >
> > > 1. How the Gateway users use this feature? As far as I know, the
> EXEUCTE
> > > PLAN only supports local file right now.  Is it possible to extend this
> > > syntax to allow for reading plan files from remote file systems?
> > >
> > > 2. I would 

Re: [ANNOUNCE] Apache Flink 1.17.0 released

2023-03-25 Thread Hang Ruan
Thanks for the great work ! Congrats all!

Best,
Hang

Panagiotis Garefalakis  于2023年3月25日周六 03:22写道:

> Congrats all! Well done!
>
> Cheers,
> Panagiotis
>
> On Fri, Mar 24, 2023 at 2:46 AM Qingsheng Ren  wrote:
>
> > I'd like to say thank you to all contributors of Flink 1.17. Your support
> > and great work together make this giant step forward!
> >
> > Also like Matthias mentioned, feel free to leave us any suggestions and
> > let's improve the releasing procedure together.
> >
> > Cheers,
> > Qingsheng
> >
> > On Fri, Mar 24, 2023 at 5:00 PM Etienne Chauchot 
> > wrote:
> >
> >> Congrats to all the people involved!
> >>
> >> Best
> >>
> >> Etienne
> >>
> >> Le 23/03/2023 à 10:19, Leonard Xu a écrit :
> >> > The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.17.0, which is the first release for the Apache Flink
> 1.17
> >> series.
> >> >
> >> > Apache Flink® is an open-source unified stream and batch data
> >> processing framework for distributed, high-performing, always-available,
> >> and accurate data applications.
> >> >
> >> > The release is available for download at:
> >> > https://flink.apache.org/downloads.html
> >> >
> >> > Please check out the release blog post for an overview of the
> >> improvements for this release:
> >> >
> >>
> https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/
> >> >
> >> > The full release notes are available in Jira:
> >> >
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351585
> >> >
> >> > We would like to thank all contributors of the Apache Flink community
> >> who made this release possible!
> >> >
> >> > Best regards,
> >> > Qingsheng, Martijn, Matthias and Leonard
> >>
> >
>