[DISCUSS] FLIP-363: Unify the Representation of TaskManager Location in REST API and Web UI

2023-09-08 Thread Chen Zhanghao
Hi Devs,

I would like to start a discussion on FLIP-363: Unify the Representation of 
TaskManager Location in REST API and Web UI [1].

The TaskManager location of subtasks is important for identifying TM-related 
problems. There are a number of places in REST API and Web UI where TaskManager 
location is returned/displayed.

Problems:

  *   Only hostname is provided to represent TaskManager location in some 
places (e.g. SubtaskCurrentAttemptDetailsHandler). However, in a containerized 
era, it is common to have multiple TMs on the same host, and port info is 
crucial to distinguish different TMs.
  *   Inconsistent naming of the field to represent TaskManager location: 
"host" is used in most places but "location" is also used in 
JobExceptions-related places.
  *   Inconsistent semantics of the "host" field: The semantics of the host 
field are inconsistent, sometimes it denotes hostname only while in other times 
it denotes hostname + port (which is also inconsistent with the name of "host").

We propose to improve the current situation by:

  *   Use a field named "location" that represents TaskManager location in the 
form of "${hostname}:${port}" in a consistent manner across REST APIs and the 
front-end.
  *   Rename the column name from "Host" to "Location" on the Web UI to reflect 
the change that both hostname and port are displayed.
  *   Keep the old "host" fields untouched for compatibility. They can be 
removed in the next major version.

Looking forward to your feedback.

[1] FLIP-363: Unify the Representation of TaskManager Location in REST API and 
Web UI - Apache Flink - Apache Software 
Foundation

Best,
Zhanghao Chen


[jira] [Created] (FLINK-33068) Handle DNS errors in AWS connectors as recoverable

2023-09-08 Thread Elphas Toringepi (Jira)
Elphas Toringepi created FLINK-33068:


 Summary: Handle DNS errors in AWS connectors as recoverable
 Key: FLINK-33068
 URL: https://issues.apache.org/jira/browse/FLINK-33068
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / AWS
Affects Versions: 1.17.1, 1.16.2, 1.15.4
Reporter: Elphas Toringepi
 Fix For: 1.17.1, 1.16.2, 1.15.4


AWS connectors are not treating DNS errors as recoverable using exponential 
back-off resulting in Flink job restarts on transient errors and small timeouts.

 

DNS errors typically manifest as UnknownHostException which is not handled as a 
recoverable exception and potentially not retried.

 

Here is an [example of an 
exception|https://github.com/apache/flink-connector-aws/blob/21c64ce38d3a6fd4fd9b4abda416c2de1a926780/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L459]
 that is recoverable with exponential-backoff. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] Flink annotation strategy/consensus

2023-09-08 Thread Jing Ge
Hi devs,

While I was joining the flink-avro enhancement and cleanup discussion
driven by Becket[1], I realized that there are some issues with the current
Flink API annotation usage in the source code.

As far as I am concerned, Flink wants to control the access/visibility of
APIs across modules and for downstreams. Since no OSGI is used(it should
not be used because of its complexity, IMHO), Flink decided to use a very
lightweight but manual solution: customized annotation like @Internal,
@Experimental, @PublicEvolving,
etc. This is a Flink only concept on top of JDK annotation and is therefore
orthogonal to @Deprecated or any other annotations offered by JDK. After
this concept has been used, APIs without one of these annotations are in
the kind of gray area which means they have no contract in the context of
this new concept. Without any given metadata they could be considered
as @Internal or @Experimental, because changes are allowed to be applied at
any time. But there is no clear definition and therefore different people
will understand it differently.

There are two options to improve it, as far as I could figure out:

option 1: All APIs must have one of those annotations. We should put some
effort into going through all source code and add missing annotations.
There were discussions[2] and activities going in this direction.
option 2: the community comes to a new consensus that APIs without
annotation equals one of @Internal, @Experimental, or @PublicEvolving. I
personally will choose @Internal, because it is the safest one. And if
@Internal is chosen as the default one, it could also be deprecated,
because no annotation equals @Internal. If it makes sense, I can create a
FLIP and help the community reach this consensus.

Both options have their own pros and cons. I would choose option 2, since
we will not end up with a lot of APIs marked as @Internal.

Looking forward to hearing your thoughts.

Best regards
Jing


[1] https://lists.apache.org/thread/7zsv528swbjxo5zk0bxq33hrkvd77d6f
[2] https://lists.apache.org/thread/zl2rmodsjsdb49tt4hn6wv3gdwo0m31o


Re: [VOTE] FLIP-323: Support Attached Execution on Flink Application Completion for Batch Jobs

2023-09-08 Thread Venkatakrishnan Sowrirajan
Thanks for driving this FLIP, Allison.

+1 (non-binding

Regards
Venkata krishnan


On Thu, Sep 7, 2023 at 1:20 PM Allison Chang 
wrote:

> Hi everyone,
>
> Would like to start the VOTE for FLIP-323<
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-323*3A*Support*Attached*Execution*on*Flink*Application*Completion*for*Batch*Jobs__;JSsrKysrKysrKys!!IKRxdwAv5BmarQ!br6f9zExsFG_B6er-BaEYOK4Bg7TAuZWQXYESve2S8Yycp5DlJ1O8CA_kD8gTlXdh7yk-BjosN7uJiVcLx3U7g8B$
> > which proposes to introduce attached execution for batch jobs. The
> discussion thread can be found here<
> https://urldefense.com/v3/__https://lists.apache.org/thread/d3toldk6qqjh2fnbmqthlfkj9rc6lwgl__;!!IKRxdwAv5BmarQ!br6f9zExsFG_B6er-BaEYOK4Bg7TAuZWQXYESve2S8Yycp5DlJ1O8CA_kD8gTlXdh7yk-BjosN7uJiVcL54ExJ3e$
> >:
>
>
> Best,
>
> Allison Chang
>
>


Re: How to test "Build snapshot binary release" nightly job against a PR

2023-09-08 Thread Gabor Somogyi
Thanks for the description. Will try to hack it around :)

G


On Thu, Sep 7, 2023 at 4:56 PM Matthias Pohl 
wrote:

> You should be able to adapt tools/azure-pipelines/build-apache-repo.yml [1]
> in your branch as a "debug" commit to add the steps/stages that you're
> planning to include in a test CI run of your PR (this debug commit should
> be removed before merging the PR). The ci stage will be executed as part of
> your PR. You can get inspired by the cron_build stage which is enabled in
> the nightly runs.
>
> [1]
>
> https://github.com/apache/flink/blob/master/tools/azure-pipelines/build-apache-repo.yml
>
> On Thu, Sep 7, 2023 at 10:54 AM Gabor Somogyi 
> wrote:
>
> > Hi All,
> >
> > I've created a PR  which
> > changes "tools/releasing/create_binary_release.sh" which is called by
> > "Build snapshot binary release" nightly job. Is there a possibility to
> > double check that the PR is
> > not breaking it?
> >
> > The question can be generalized however. There are surrounding jobs (for
> > ex. python wheel build)
> > which are not executed in a PR (which is fine because that would be
> > overkill). But the question is
> > how to test them to minimize code breaks?
> >
> > Thanks in advance!
> >
> > BR,
> > G
> >
>


Re: [DISCUSS] FLIP-360: Merging ExecutionGraphInfoStore and JobResultStore into a single component

2023-09-08 Thread Gyula Fóra
Hi Matthias!

Thank you for the detailed proposal, overall I am in favor of making this
unification to simplify the logic and make the integration for external
components more straightforward.
I will try to read through the proposal more carefully next week and
provide some detailed feedback.

+1

Thanks
Gyula

On Fri, Sep 8, 2023 at 8:36 AM Matthias Pohl 
wrote:

> Just a bit more elaboration on the question that we need to answer here: Do
> we want to expose the internal ArchivedExecutionGraph data structure
> through JSON?
>
> - The JSON approach allows the user to have (almost) full access to the
> information (that would be otherwise derived from the REST API). Therefore,
> there's no need to spin up a cluster to access this information.
> Any information that shall be exposed through the REST API needs to be
> well-defined in this JSON structure, though. Large parts of the
> ArchivedExecutionGraph data structure (essentially anything that shall be
> used to populate the REST API) become public domain, though, which puts
> more constraints on this data structure and makes it harder to change it in
> the future.
>
> - The binary data approach allows us to keep the data structure itself
> internal. We have more control over what we want to expose by providing
> access points in the ClusterClient (e.g. just add a command to extract the
> external storage path from the file).
>
> - The compromise (i.e. keeping ExecutionGraphInfoStore and JobResultStore
> separate and just expose the checkpoint information next to the JobResult
> in the JobResultStore file) would keep us the closest to the current state,
> requires the least code changes and the least exposure of internal data
> structures. It would allow any system (like the Kubernetes Operator) to
> extract the checkpoint's external storage path. But we would still be stuck
> with kind-of redundant components.
>
> From a user's perspective, I feel like the JSON approach is the best one
> because it gives him/her the most freedom to be independent of Flink
> binaries when handling completed jobs. But I see benefits from a Flink
> developer's perspective to not expose the entire data structure but use the
> ClusterClient as an access point.
>
> The last option is my least favorite one: Moving the ExecutionGraphInfo out
> of the JobManager seems to be the right thing to do when thinking about
> Flink's vision to become cloud-native.
>
> Just my 2cts on that topic.
> Matthias
>
> On Mon, Sep 4, 2023 at 1:11 PM Matthias Pohl 
> wrote:
>
> > Hi everyone,
> > I want to open the discussion on FLIP-360 [1]. The goal of this FLIP is
> to
> > combine the two very similar components ExecutionGraphInfoStore and
> > JobResultStore into a single component.
> >
> > The benefit of this effort would be to expose the metadata of a
> > globally-terminated job even in cases where the JobManager fails shortly
> > after the job finished. This is relevant for external checkpoint
> management
> > (like it's done in the Kubernetes Operator) which relies on the
> checkpoint
> > information to be available.
> >
> > More generally, it would allow completed jobs to be listed as part of the
> > Flink cluster even after a JM failover. This would allow users to gain
> more
> > control over finished jobs.
> >
> > The current state of the FLIP doesn't come up with a final conclusion on
> > the serialization format of the data (JSON vs binary). I want to
> emphasize
> > that there's also a third option which keeps both components separate and
> > only exposes the additional checkpoint information through the
> > JobResultStore.
> >
> > I'm looking forward to feedback.
> > Best,
> > Matthias
> >
> > PS: I might be less responsive in the next 2-3 weeks but want to initiate
> > the discussion, anyway.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-360%3A+Merging+the+ExecutionGraphInfoStore+and+the+JobResultStore+into+a+single+component+CompletedJobStore
> >
>


[VOTE] Apache Flink Stateful Functions Release 3.3.0, release candidate #1

2023-09-08 Thread Martijn Visser
Hi everyone,

Please review [1] and vote on the release candidate #1 for the version
3.3.0 of Apache Flink Stateful Functions,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

**Release Overview**

As an overview, the release consists of the following:
a) Stateful Functions canonical source distribution, to be deployed to the
release repository at dist.apache.org
b) Stateful Functions Python SDK distributions to be deployed to PyPI
c) Maven artifacts to be deployed to the Maven Central Repository
d) Dockerfiles for new images to be deployed to Docker Hub

**Staging Areas to Review**

The staging areas containing the above mentioned artifacts are as follows,
for your review:
* All artifacts for a) and b) can be found in the corresponding dev
repository at dist.apache.org [2]
* All artifacts for c) can be found at the Apache Nexus Repository [3]
* PR for new Dockerfiles for this release [4]

All artifacts are signed with the key with fingerprint
A5F3BCE4CBE993573EC5966A65321B8382B219AF [5]

Other links for your review:
* JIRA release notes [6]
* source code tag "release-3.3.0-rc1" [7]
* PR to update the website Downloads page to include Stateful Functions
links [8]

**Vote Duration**

The voting time will run for at least 72 hours.
It is adopted by majority approval, with at least 3 PMC affirmative votes.

Thanks,
Release manager

[1]
https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release
[2] https://dist.apache.org/repos/dist/dev/flink/flink-statefun-3.3.0-rc1/
[3] https://repository.apache.org/content/repositories/orgapacheflink-1652/
[4] https://github.com/apache/flink-statefun-docker/pull/20
[5] https://dist.apache.org/repos/dist/release/flink/KEYS
[6]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351276
[7] https://github.com/apache/flink-statefun/tree/release-3.3.0-rc1
[8] https://github.com/apache/flink-web/pull/674


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-09-08 Thread Xuannan Su
Hi Jark and Leonard,

Thanks for the comments. Please see my reply below.

@Jark

> I think a better API doesn't compete with itself. Therefore, I'm in favor of
> supporting the watermark lag threshold for each source without introducing
> any framework API and configuration.

I don't think supporting the watermark lag threshold for each source
can avoid the competition problem. In the case where a user wants to
use a CDC source and also determine backlog status based on watermark
lag, we still need to define the rule when that occurs. With that
said, I think it is more intuitive to combine it with the logical OR
operation, as the strategies (FLIP-309, FLIP-328) only determine when
the source's backlog status should be True. What do you think?

> Besides, this can address another concern that the watermark may be
> generated by DataStream#assignTimestampsAnd
> Watermarks which doesn't
> work with the backlog.watermark-lag-threshold job config

The description of the configuration explicitly states that "a source
would report isProcessingBacklog=true if its watermark lag exceeds the
configured value". It should not confuse the user that
DataStream#assignTimestampsAndWatermarks doesn't work with
backlog.watermark-lag-threshold, as it is not a source.

> Does that mean the job can never back to streaming mode once switches into
> backlog mode? It sounds like not a complete FLIP to me. Is it possible to
> support switching back in this FLIP?

I think the description in the FLIP actually means the other way
around, where the job can never switch back to batch mode once it has
switched into streaming mode. This is to align with the current state
of FLIP-327[1], where only switching from batch to stream mode is
supported.

@Leonard

> > The FLIP describe that: And it should report isProcessingBacklog=false at 
> > the beginning of the snapshot stage.
> This should be “changelog stage”

I think the description is in FLIP-309. Thanks for pointing out. I
updated the description.

> I'm not sure if it's enough to support this feature only in FLIP-27 Source. 
> Although we are pushing the sourceFunction API to be removed, these APIs will 
> be survive one or two versions in flink repo before they are actually removed.

I agree that it is good to support the SourceFunction API. However,
given that the SourceFunction API is marked as deprecated, I think I
will prioritize supporting the FLIP-27 Source. We can support the
SourceFunction API after the
FLIP-27 source. What do you think?

Best regards,
Xuannan

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+switching+from+batch+to+stream+mode+to+improve+throughput+when+processing+backlog+data




On Fri, Sep 8, 2023 at 1:02 AM Leonard Xu  wrote:
>
> Thanks Xuannan for driving this FLIP !
>
> The proposal generally looks good to me, but I still left some comments:
>
> > One more question about the FLIP is that the FLIP says "Note that this
> > config does not support switching source's isProcessingBacklog from false 
> > to true
> > for now.” Does that mean the job can never back to streaming mode once 
> > switches into
> > backlog mode? It sounds like not a complete FLIP to me. Is it possible to
> > support switching back in this FLIP?
> +1 for Jark’s concern, IIUC, the state transition of IsProcessingBacklog 
> depends on whether the data in the source is processing backlog data or not. 
> Different sources will have different backlog status and which may change 
> over time. From a general perspective, we should not have this restriction.
>
> > The FLIP describe that: And it should report isProcessingBacklog=false at 
> > the beginning of the snapshot stage.
> This should be “changelog stage”
>
> I'm not sure if it's enough to support this feature only in FLIP-27 Source. 
> Although we are pushing the sourceFunction API to be removed, these APIs will 
> be survive one or two versions in flink repo before they are actually removed.
>
> Best,
> Leonard
>
> >
> > Best,
> > Jark
> >
> >
> > On Thu, 7 Sept 2023 at 13:51, Xuannan Su  wrote:
> >
> >> Hi all,
> >>
> >> Thank you for all the reviews and suggestions.
> >>
> >> I believe all the comments have been addressed. If there are no
> >> further comments, I plan to open the voting thread for this FLIP early
> >> next week.
> >>
> >> Best regards,
> >> Xuannan
> >>
> >> On Thu, Sep 7, 2023 at 12:09 AM Jing Ge 
> >> wrote:
> >>>
> >>> Hi Xuannan,
> >>>
> >>> I thought FLIP-328 will compete with FLIP-309 while setting the value of
> >>> the backlog. Understood. Thanks for the hint.
> >>>
> >>> Best regards,
> >>> Jing
> >>>
> >>> On Wed, Sep 6, 2023 at 12:12 PM Xuannan Su 
> >> wrote:
> >>>
>  Hi Jing,
> 
>  Thank you for the clarification.
> 
>  For the use case you mentioned, I believe we can utilize the
>  HybridSource, as updated in FLIP-309[1], to determine the backlog
>  status. For example, if the user wants to process data before time T
>  in batch mode and 

[jira] [Created] (FLINK-33067) Expose JODK rate limiter config and enable by default

2023-09-08 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-33067:
--

 Summary: Expose JODK rate limiter config and enable by default
 Key: FLINK-33067
 URL: https://issues.apache.org/jira/browse/FLINK-33067
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
Assignee: Gyula Fora
 Fix For: kubernetes-operator-1.7.0


There are certain cases currently when the operator can stuck in an infinite no 
wait retry loop under certain circumstances (see 
[https://github.com/operator-framework/java-operator-sdk/issues/2046] for 
details).

The underlying problems that cause this in some cases are not easily solved 
generically and it is a better practice to enable rate limiting to safeguard 
against these problems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33066) Enable to inject environment variable from secret/configmap to operatorPod

2023-09-08 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-33066:
---

 Summary: Enable to inject environment variable from 
secret/configmap to operatorPod
 Key: FLINK-33066
 URL: https://issues.apache.org/jira/browse/FLINK-33066
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: dongwoo.kim


Hello, I've been working with the Flink Kubernetes operator and noticed that 
the {{operatorPod.env}} only allows for simple key-value pairs and doesn't 
support Kubernetes {{valueFrom}} syntax.

How about changing template to support more various k8s syntax? 

*Current template*
{code:java}
{{- range $k, $v := .Values.operatorPod.env }}
  - name: {{ $v.name | quote }}
    value: {{ $v.value | quote }}
{{- end }}{code}
 

*Proposed template*
1) 
{code:java}
{{- with .Values.operatorPod.env }} 
{{- toYaml . | nindent 12 }} 
{{- end }} 
{code}
2) create extra config, *Values.operatorPod.envFrom* and utilize this

 

 

I'd be happy to implement this update if it's approved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33065) Optimize the exception message when the program plan could not be fetched

2023-09-08 Thread Rui Fan (Jira)
Rui Fan created FLINK-33065:
---

 Summary: Optimize the exception message when the program plan 
could not be fetched
 Key: FLINK-33065
 URL: https://issues.apache.org/jira/browse/FLINK-33065
 Project: Flink
  Issue Type: Improvement
Reporter: Rui Fan
Assignee: Rui Fan


When the program plan could not be fetched, the root cause may be: the main 
method doesn't call the `env.execute()`.

 

We can optimize the message to help user find this root cause.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-360: Merging ExecutionGraphInfoStore and JobResultStore into a single component

2023-09-08 Thread Matthias Pohl
Just a bit more elaboration on the question that we need to answer here: Do
we want to expose the internal ArchivedExecutionGraph data structure
through JSON?

- The JSON approach allows the user to have (almost) full access to the
information (that would be otherwise derived from the REST API). Therefore,
there's no need to spin up a cluster to access this information.
Any information that shall be exposed through the REST API needs to be
well-defined in this JSON structure, though. Large parts of the
ArchivedExecutionGraph data structure (essentially anything that shall be
used to populate the REST API) become public domain, though, which puts
more constraints on this data structure and makes it harder to change it in
the future.

- The binary data approach allows us to keep the data structure itself
internal. We have more control over what we want to expose by providing
access points in the ClusterClient (e.g. just add a command to extract the
external storage path from the file).

- The compromise (i.e. keeping ExecutionGraphInfoStore and JobResultStore
separate and just expose the checkpoint information next to the JobResult
in the JobResultStore file) would keep us the closest to the current state,
requires the least code changes and the least exposure of internal data
structures. It would allow any system (like the Kubernetes Operator) to
extract the checkpoint's external storage path. But we would still be stuck
with kind-of redundant components.

>From a user's perspective, I feel like the JSON approach is the best one
because it gives him/her the most freedom to be independent of Flink
binaries when handling completed jobs. But I see benefits from a Flink
developer's perspective to not expose the entire data structure but use the
ClusterClient as an access point.

The last option is my least favorite one: Moving the ExecutionGraphInfo out
of the JobManager seems to be the right thing to do when thinking about
Flink's vision to become cloud-native.

Just my 2cts on that topic.
Matthias

On Mon, Sep 4, 2023 at 1:11 PM Matthias Pohl  wrote:

> Hi everyone,
> I want to open the discussion on FLIP-360 [1]. The goal of this FLIP is to
> combine the two very similar components ExecutionGraphInfoStore and
> JobResultStore into a single component.
>
> The benefit of this effort would be to expose the metadata of a
> globally-terminated job even in cases where the JobManager fails shortly
> after the job finished. This is relevant for external checkpoint management
> (like it's done in the Kubernetes Operator) which relies on the checkpoint
> information to be available.
>
> More generally, it would allow completed jobs to be listed as part of the
> Flink cluster even after a JM failover. This would allow users to gain more
> control over finished jobs.
>
> The current state of the FLIP doesn't come up with a final conclusion on
> the serialization format of the data (JSON vs binary). I want to emphasize
> that there's also a third option which keeps both components separate and
> only exposes the additional checkpoint information through the
> JobResultStore.
>
> I'm looking forward to feedback.
> Best,
> Matthias
>
> PS: I might be less responsive in the next 2-3 weeks but want to initiate
> the discussion, anyway.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-360%3A+Merging+the+ExecutionGraphInfoStore+and+the+JobResultStore+into+a+single+component+CompletedJobStore
>