[ANNOUNCE] Apache Flink Stateful Functions 3.2.0 released

2022-02-01 Thread Till Rohrmann
The Apache Flink community is very happy to announce the release of Apache
Flink Stateful Functions 3.2.0.

Stateful Functions is an API that simplifies building distributed stateful
applications.
It's based on functions with persistent state that can interact dynamically
with strong consistency guarantees.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/01/31/release-statefun-3.2.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at: https://pypi.org/project/apache-flink-statefun/

JavaScript SDK for Stateful Functions published to the NPM registry can be
found at: https://www.npmjs.com/package/apache-flink-statefun

Official Docker image for building Stateful Functions applications can be
found at:
https://hub.docker.com/r/apache/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350540

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Till


Re: [ANNOUNCE] Apache Flink ML 2.0.0 released

2022-01-10 Thread Till Rohrmann
This is really great news. Thanks a lot for all the work Dong, Yun, Zhipeng
and others!

Cheers,
Till

On Fri, Jan 7, 2022 at 2:36 PM David Morávek  wrote:

> Great job! <3 Thanks Dong and Yun for managing the release and big thanks
> to everyone who has contributed!
>
> Best,
> D.
>
> On Fri, Jan 7, 2022 at 2:27 PM Yun Gao  wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink ML 2.0.0.
>>
>>
>>
>> Apache Flink ML provides API and infrastructure that simplifies
>> implementing distributed ML algorithms,
>>
>> and it also provides a library of off-the-shelf ML algorithms.
>>
>>
>>
>> Please check out the release blog post for an overview of the release:
>>
>> https://flink.apache.org/news/2022/01/07/release-ml-2.0.0.html
>>
>>
>>
>> The release is available for download at:
>>
>> https://flink.apache.org/downloads.html
>>
>>
>>
>> Maven artifacts for Flink ML can be found at:
>>
>> https://search.maven.org/search?q=g:org.apache.flink%20ml
>>
>>
>>
>> Python SDK for Flink ML published to the PyPI index can be found at:
>>
>> https://pypi.org/project/apache-flink-ml/
>>
>>
>>
>> The full release notes are available in Jira:
>>
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351079
>>
>>
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>>
>>
>> Regards,
>>
>> Dong and Yun
>>
>


Re: [DISCUSS] Deprecate MapR FS

2022-01-05 Thread Till Rohrmann
+1 for dropping the MapR FS.

Cheers,
Till

On Wed, Jan 5, 2022 at 10:11 AM Martijn Visser 
wrote:

> Hi everyone,
>
> Thanks for your input. I've checked the MapR implementation and it has no
> annotation at all. Given the circumstances that we thought that MapR was
> already dropped, I would propose to immediately remove MapR in Flink 1.15
> instead of first marking it as deprecated and removing it in Flink 1.16.
>
> Please let me know what you think.
>
> Best regards,
>
> Martijn
>
> On Thu, 9 Dec 2021 at 17:27, David Morávek  wrote:
>
>> +1, agreed with Seth's reasoning. There has been no real activity in MapR
>> FS module for years [1], so the eventual users should be good with using
>> the jars from the older Flink versions for quite some time
>>
>> [1]
>> https://github.com/apache/flink/commits/master/flink-filesystems/flink-mapr-fs
>>
>> Best,
>> D.
>>
>> On Thu, Dec 9, 2021 at 4:28 PM Konstantin Knauf 
>> wrote:
>>
>>> +1 (what Seth said)
>>>
>>> On Thu, Dec 9, 2021 at 4:15 PM Seth Wiesman  wrote:
>>>
>>> > +1
>>> >
>>> > I actually thought we had already dropped this FS. If anyone is still
>>> > relying on it in production, the file system abstraction in Flink has
>>> been
>>> > incredibly stable over the years. They should be able to use the 1.14
>>> MapR
>>> > FS with later versions of Flink.
>>> >
>>> > Seth
>>> >
>>> > On Wed, Dec 8, 2021 at 10:03 AM Martijn Visser 
>>> > wrote:
>>> >
>>> >> Hi all,
>>> >>
>>> >> Flink supports multiple file systems [1] which includes MapR FS. MapR
>>> as
>>> >> a company doesn't exist anymore since 2019, the technology and
>>> intellectual
>>> >> property has been sold to Hewlett Packard.
>>> >>
>>> >> I don't think that there's anyone who's using MapR anymore and
>>> therefore
>>> >> I think it would be good to deprecate this for Flink 1.15 and then
>>> remove
>>> >> it in Flink 1.16. Removing this from Flink will slightly shrink the
>>> >> codebase and CI runtime.
>>> >>
>>> >> I'm also cross posting this to the User mailing list, in case there's
>>> >> still anyone who's using MapR.
>>> >>
>>> >> Best regards,
>>> >>
>>> >> Martijn
>>> >>
>>> >> [1]
>>> >>
>>> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
>>> >>
>>> >
>>>
>>> --
>>>
>>> Konstantin Knauf
>>>
>>> https://twitter.com/snntrable
>>>
>>> https://github.com/knaufk
>>>
>>


Re: [DISCUSS] Drop Gelly

2022-01-03 Thread Till Rohrmann
I haven't seen any changes or requests to/for Gelly in ages. Hence, I would
assume that it is not really used and can be removed.

+1 for dropping Gelly.

Cheers,
Till

On Mon, Jan 3, 2022 at 2:20 PM Martijn Visser  wrote:

> Hi everyone,
>
> Flink is bundled with Gelly, a Graph API library [1]. This has been marked
> as approaching end-of-life for quite some time [2].
>
> Gelly is built on top of Flink's DataSet API, which is deprecated and
> slowly being phased out [3]. It only works on batch jobs. Based on the
> activity in the Dev and User mailing lists, I don't see a lot of questions
> popping up regarding the usage of Gelly. Removing Gelly would reduce CI
> time and resources because we won't need to run tests for this anymore.
>
> I'm cross-posting this to the User mailing list to see if there are any
> users of Gelly at the moment.
>
> Let me know your thoughts.
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/libs/gelly/overview/
>
> [2] https://flink.apache.org/roadmap.html
>
> [3] https://lists.apache.org/thread/b2y3xx3thbcbtzdphoct5wvzwogs9sqz
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>


Re: [ANNOUNCE] Apache Flink Stateful Functions 3.1.1 released

2021-12-23 Thread Till Rohrmann
Thanks a lot for being our release manager and swiftly addressing the log4j
CVE Igal!

Cheers,
Till

On Wed, Dec 22, 2021 at 5:41 PM Igal Shilman  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Stateful Functions (StateFun) 3.1.1.
>
> This is a bugfix release that addresses the recent log4j vulnerabilities,
> users are encouraged to upgrade.
>
> StateFun is a cross-platform stack for building Stateful Serverless
> applications, making it radically simpler to develop scalable, consistent,
> and elastic distributed applications.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2021/12/22/log4j-statefun-release.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for StateFun can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>
> Python SDK for StateFun published to the PyPI index can be found at:
> https://pypi.org/project/apache-flink-statefun/
>
> Official Docker images for StateFun are published to Docker Hub:
> https://hub.docker.com/r/apache/flink-statefun
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351096&styleName=&projectId=12315522
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Thanks!
>


Re: [DISCUSS] Changing the minimal supported version of Hadoop

2021-12-23 Thread Till Rohrmann
If there are no users strongly objecting to dropping Hadoop support for <
2.8, then I am +1 for this since otherwise we won't gain a lot as Xintong
said.

Cheers,
Till

On Wed, Dec 22, 2021 at 10:33 AM David Morávek  wrote:

> Agreed, if we drop the CI for lower versions, there is actually no point
> of having safeguards as we can't really test for them.
>
> Maybe one more thought (it's more of a feeling), I feel that users running
> really old Hadoop versions are usually slower to adopt (they most likely
> use what the current HDP / CDH version they use offers) and they are less
> likely to use Flink 1.15 any time soon, but I don't have any strong data to
> support this.
>
> D.
>


Re: [ANNOUNCE] Apache Flink 1.14.2 / 1.13.5 / 1.12.7 / 1.11.6 released

2021-12-17 Thread Till Rohrmann
Thanks a lot for driving these releases Chesnay! This is super helpful for
the community.

For the synchronization problem, I guess we just have to wait a bit longer.

Cheers,
Till

On Fri, Dec 17, 2021 at 7:39 AM Leonard Xu  wrote:

> I guess this is related to publishers everywhere are updating their
> artifacts in response to the log4shell vulnerability[1].
>
> All we can do and need to do is wait. ☕️
>
> Best,
> Leonard
> [1] https://issues.sonatype.org/browse/OSSRH-76300 <
> https://issues.sonatype.org/browse/OSSRH-76300>
>
>
>
> > 2021年12月17日 下午2:21,Jingsong Li  写道:
> >
> > Not found in
> https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java/
> >
> > I guess too many people sent versions, resulting in maven central
> > repository synchronization being slower.
> >
> > Best,
> > Jingsong
> >
> > On Fri, Dec 17, 2021 at 2:00 PM casel.chen  wrote:
> >>
> >> I can NOT find flink 1.13.5 related jar in maven central repository,
> did you upload them onto there already? Thanks!
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> At 2021-12-17 01:26:19, "Chesnay Schepler"  wrote:
> >>> The Apache Flink community has released emergency bugfix versions of
> >>> Apache Flink for the 1.11, 1.12, 1.13 and 1.14 series.
> >>>
> >>> These releases include a version upgrade for Log4j to address
> >>> [CVE-2021-44228](https://nvd.nist.gov/vuln/detail/CVE-2021-44228) and
> >>> [CVE-2021-45046](https://nvd.nist.gov/vuln/detail/CVE-2021-45046).
> >>>
> >>> We highly recommend all users to upgrade to the respective patch
> release.
> >>>
> >>> The releases are available for download at:
> >>> https://flink.apache.org/downloads.html
> >>>
> >>> Please check out the release blog post for further details:
> >>> https://flink.apache.org/news/2021/12/16/log4j-patch-releases.html
> >>>
> >>>
> >>> Regards,
> >>> Chesnay
> >>
> >>
> >>
> >>
> >
> >
> >
> > --
> > Best, Jingsong Lee
>
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-14 Thread Till Rohrmann
As part of this FLIP, does it make sense to also extend the documentation
for the sort shuffle [1] to include a tuning guide? I am thinking of a more
in depth description of what things you might observe and how to influence
them with the configuration options.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle

Cheers,
Till

On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li  wrote:

> Hi Yingjie,
>
> Thanks for your explanation. I have no more questions. +1
>
> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao 
> wrote:
> >
> > Hi Jingsong,
> >
> > Thanks for your feedback.
> >
> > >>> My question is, what is the maximum parallelism a job can have with
> the default configuration? (Does this break out of the box)
> >
> > Yes, you are right, these two options are related to network memory and
> framework off-heap memory. Generally, these changes will not break out of
> the box experience, but for some extreme cases, for example, there are too
> many ResultPartitions per task, users may need to increase network memory
> to avoid "insufficient network buffer" error. For framework off-head, I
> believe that user do not need to change the default value.
> >
> > In fact, I have a basic goal when changing these config values: when
> running TPCDS of medium parallelism with the default value, all queries
> must pass without any error and at the same time, the performance can be
> improved. I think if we achieve this goal, most common use cases can be
> covered.
> >
> > Currently, for the default configuration, the exclusive buffers required
> at input gate side is still parallelism relevant (though since 1.14, we can
> decouple the network buffer consumption from parallelism by setting a
> config value, it has slight performance influence on streaming jobs), which
> means that no large parallelism can be supported by the default
> configuration. Roughly, I would say the default value can support jobs of
> several hundreds of parallelism.
> >
> > >>> I do feel that this correspondence is a bit difficult to control at
> the moment, and it would be best if a rough table could be provided.
> >
> > I think this is a good suggestion, we can provide those suggestions in
> the document.
> >
> > Best,
> > Yingjie
> >
> > Jingsong Li  于2021年12月14日周二 14:39写道:
> >>
> >> Hi  Yingjie,
> >>
> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
> >> of batch jobs.
> >>
> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
> >> network memory and framework.off-heap.size.
> >>
> >> My question is, what is the maximum parallelism a job can have with
> >> the default configuration? (Does this break out of the box)
> >>
> >> How much network memory and framework.off-heap.size are required for
> >> how much parallelism in the default configuration?
> >>
> >> I do feel that this correspondence is a bit difficult to control at
> >> the moment, and it would be best if a rough table could be provided.
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao 
> wrote:
> >> >
> >> > Hi Jiangang,
> >> >
> >> > Thanks for your suggestion.
> >> >
> >> > >>> The config can affect the memory usage. Will the related memory
> configs be changed?
> >> >
> >> > I think we will not change the default network memory settings. My
> best expectation is that the default value can work for most cases (though
> may not the best) and for other cases, user may need to tune the memory
> settings.
> >> >
> >> > >>> Can you share the tpcds results for different configs? Although
> we change the default values, it is helpful to change them for different
> users. In this case, the experience can help a lot.
> >> >
> >> > I did not keep all previous TPCDS results, but from the results, I
> can tell that on HDD, always using the sort-shuffle is a good choice. For
> small jobs, using sort-shuffle may not bring much performance gain, this
> may because that all shuffle data can be cached in memory (page cache),
> this is the case if the cluster have enough resources. However, if the
> whole cluster is under heavy burden or you are running large scale jobs,
> the performance of those small jobs can also be influenced. For large-scale
> jobs, the configurations suggested to be tuned are
> taskmanager.network.sort-shuffle.min-buffers and
> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
> these values for large-scale batch jobs.
> >> >
> >> > BTW, I am still running TPCDS tests these days and I can share these
> results soon.
> >> >
> >> > Best,
> >> > Yingjie
> >> >
> >> > 刘建刚  于2021年12月10日周五 18:30写道:
> >> >>
> >> >> Glad to see the suggestion. In our test, we found that small jobs
> with the changing configs can not improve the performance much just as your
> test. I have some suggestions:
> >> >>
> >> >> The config can affect the memory usage. Will the relat

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-03 Thread Till Rohrmann
Thanks for starting this discussion Yingjie,

How will our tests be affected by these changes? Will Flink require more
resources and, thus, will it risk destabilizing our testing infrastructure?

I would propose to create a FLIP for these changes since you propose to
change the default behaviour. It can be a very short one, though.

Cheers,
Till

On Fri, Dec 3, 2021 at 10:02 AM Yingjie Cao  wrote:

> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Till Rohrmann
Great news, Yingjie. Thanks a lot for sharing this information with the
community and kudos to all the contributors of the external shuffle service
:-)

Cheers,
Till

On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:

> Hi dev & users,
>
> We are happy to announce the open source of remote shuffle project [1] for
> Flink. The project is originated in Alibaba and the main motivation is to
> improve batch data processing for both performance & stability and further
> embrace cloud native. For more features about the project, please refer to
> [1].
>
> Before going open source, the project has been used widely in production
> and it behaves well on both stability and performance. We hope you enjoy
> it. Collaborations and feedbacks are highly appreciated.
>
> Best,
> Yingjie on behalf of all contributors
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>


Re: [DISCUSS] Drop Zookeeper 3.4

2021-11-26 Thread Till Rohrmann
According to this SO question [1], it seems that Zk 3.5 clients cannot talk
to 3.4 servers. I also tried it out with a local deployment and Flink was
not able to start.

Newer Zk versions can talk to older Zk servers if no new APIs are used [2].

[1] https://stackoverflow.com/a/61630617/4815083
[2] https://zookeeper.apache.org/releases.html

Cheers,
Till

On Thu, Nov 25, 2021 at 10:39 PM Chesnay Schepler 
wrote:

> I included the user ML in the thread.
>
> @users Are you still using Zookeeper 3.4? If so, were you planning to
> upgrade Zookeeper in the near future?
>
> I'm not sure about ZK compatibility, but we'd also upgrade Curator to
> 5.x, which doesn't support ookeeperK 3.4 anymore.
>
> On 25/11/2021 21:56, Till Rohrmann wrote:
> > Should we ask on the user mailing list whether anybody is still using
> > ZooKeeper 3.4 and thus needs support for this version or can a ZooKeeper
> > 3.5/3.6 client talk to a ZooKeeper 3.4 cluster? I would expect that not a
> > lot of users depend on it but just to make sure that we aren't annoying a
> > lot of our users with this change. Apart from that +1 for removing it if
> > not a lot of user depend on it.
> >
> > Cheers,
> > Till
> >
> > On Wed, Nov 24, 2021 at 11:03 AM Matthias Pohl 
> > wrote:
> >
> >> Thanks for starting this discussion, Chesnay. +1 from my side. It's
> time to
> >> move forward with the ZK support considering the EOL of 3.4 you already
> >> mentioned. The benefits we gain from upgrading Curator to 5.x as a
> >> consequence is another plus point. Just for reference on the
> inconsistent
> >> state issue you mentioned: FLINK-24543 [1].
> >>
> >> Matthias
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-24543
> >>
> >> On Wed, Nov 24, 2021 at 10:19 AM Chesnay Schepler 
> >> wrote:
> >>
> >>> Hello,
> >>>
> >>> I'd like to drop support for Zookeeper 3.4 in 1.15, upgrading the
> >>> default to 3.5 with an opt-in for 3.6.
> >>>
> >>> Supporting Zookeeper 3.4 (which is already EOL) prevents us from
> >>> upgrading Curator to 5.x, which would allow us to properly fix an issue
> >>> with inconsistent state. It is also required to eventually support ZK
> >> 3.6.
>
>
>


Re: Reactive mode in 1.13

2021-11-03 Thread Till Rohrmann
Hi Ravi,

I think you can pass the arguments to the job via `./bin/standalone-job.sh
start  -Dscheduler-mode=reactive -Dexecution.checkpointing.interval="3000s"
lib/tornado.jar myArguments`.

Cheers,
Till

On Wed, Nov 3, 2021 at 5:20 AM Ravi Sankar Reddy Sangana 
wrote:

> Thanks a lot working fine now. Also you also explain how to pass
> parameters to the job. In the session cluster I am passing arguments using
> api.
>
>
>
> Here how can I pass the arguments to the job?
>
>
>
>
>
> Regards,
>
> Ravi Sankar Reddy.
>
>
>
> *From:* Till Rohrmann 
> *Sent:* 02 November 2021 07:33 PM
> *To:* Ravi Sankar Reddy Sangana 
> *Cc:* user 
> *Subject:* Re: Reactive mode in 1.13
>
>
>
> Hi Ravi,
>
>
>
> I think you also need to make the tornado.jar available to the
> TaskExecutor processes (e.g. putting them into the usrlib or lib directory
> where you started the process). When using the application mode, then Flink
> assumes that all processes have access to the user code jar. That's why
> Flink won't ship the user code jars to the other processes unlike when
> using the session cluster mode, for example. The idea is that the user code
> is bundled together with the application cluster deployment.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Tue, Nov 2, 2021 at 1:01 PM Ravi Sankar Reddy Sangana <
> ra...@radware.com> wrote:
>
> Set up:
>
>
>
> 1 job manager in 2 core 6 GB
>
> 2 task managers in 4 core 12 GB
>
>
>
> Created fat jar and copied the jar to jobmanager lib folder.
>
>
>
> ./bin/standalone-job.sh start  -Dscheduler-mode=reactive
> -Dexecution.checkpointing.interval="3000s" lib/tornado.jar
>
>
>
> *Build logs in job manager:*
>
>
>
> [INFO] --- maven-shade-plugin:3.0.0:shade (default) @ clive ---
>
> [INFO] Including org.apache.flink:flink-java:jar:1.13.1 in the shaded jar.
>
> [INFO] Including org.apache.flink:flink-core:jar:1.13.1 in the shaded jar.
>
> [INFO] Including org.apache.flink:flink-annotations:jar:1.13.1 in the
> shaded jar.
>
> [INFO] Including org.apache.flink:flink-metrics-core:jar:1.13.1 in the
> shaded jar.
>
> [INFO] Including org.apache.flink:flink-shaded-asm-7:jar:7.1-13.0 in the
> shaded jar.
>
> [INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded
> jar.
>
> [INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded
> jar.
>
> [INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar.
>
> [INFO] Including commons-collections:commons-collections:jar:3.2.2 in the
> shaded jar.
>
> [INFO] Including org.apache.commons:commons-compress:jar:1.20 in the
> shaded jar.
>
> [INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-13.0 in the
> shaded jar.
>
> [INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded
> jar.
>
> [INFO] Including org.apache.commons:commons-math3:jar:3.5 in the shaded
> jar.
>
> [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
>
> [INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded
> jar.
>
> [INFO] Excluding org.apache.flink:force-shading:jar:1.13.1 from the shaded
> jar.
>
> [INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.13.1 in
> the shaded jar.
>
> [INFO] Including org.apache.kafka:kafka-clients:jar:2.4.1 in the shaded
> jar.
>
> [INFO] Including com.github.luben:zstd-jni:jar:1.4.3-1 in the shaded jar.
>
> [INFO] Including org.lz4:lz4-java:jar:1.6.0 in the shaded jar.
>
> [INFO] Including org.xerial.snappy:snappy-java:jar:1.1.7.3 in the shaded
> jar.
>
> [INFO] Including org.apache.flink:flink-connector-base:jar:1.13.1 in the
> shaded jar.
>
> [INFO] Replacing original artifact with shaded artifact.
>
>
>
> *LOGS:*
>
>
>
> 2021-11-02 11:02:36,224 INFO
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] -
> Restarting job.
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
> user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
>
> ClassLoader info: URL ClassLoader:
>
> Class not resolvable through given classloader.
>
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:154)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:548)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>

Re: Reactive mode in 1.13

2021-11-02 Thread Till Rohrmann
dClass(ClassLoader.java:418)
> ~[?:1.8.0_312]
>
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> ~[clive-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
> ~[clive-1.0-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> ~[clive-1.0-SNAPSHOT.jar:?]
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ~[?:1.8.0_312]
>
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>
> at java.lang.Class.forName0(Native Method) ~[?:1.8.0_312]
>
> at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_312]
>
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
> ~[clive-1.0-SNAPSHOT.jar:?]
>
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1985)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1849)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2159)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
> ~[?:1.8.0_312]
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1666)
> ~[?:1.8.0_312]
>
>
>
> *From:* Till Rohrmann 
> *Sent:* 02 November 2021 03:16 PM
> *To:* user 
> *Cc:* Ravi Sankar Reddy Sangana 
> *Subject:* Re: Reactive mode in 1.13
>
>
>
> Hi Ravi,
>
>
>
> The reactive mode shouldn't do things differently compared to a normal
> application cluster deployment. Maybe you can show us exactly how you
> submit a job, the contents of the bundled jar, how you build the fat jar
> and the logs of the failed Flink run.
>
>
>
> Moving this discussion to the user ML because it better fits there.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Tue, Nov 2, 2021 at 10:22 AM Ravi Sankar Reddy Sangana <
> ra...@radware.com> wrote:
>
> Hi team,
>
> We are planning to move to reactive mode with standalone job for scaling
> options.
>
> While submitting the jobs getting errors saying the kafka conusmers and
> client related jars are missing even they packed in the fat jar and the
> same jar is running with normal session cluster.
>
> Can anyone help on how to add the jars while using standalone ?? Thanks in
> advance
>
>
> Regards,
> Ravi Sankar Reddy
>
>
>


Re: Reactive mode in 1.13

2021-11-02 Thread Till Rohrmann
Hi Ravi,

The reactive mode shouldn't do things differently compared to a normal
application cluster deployment. Maybe you can show us exactly how you
submit a job, the contents of the bundled jar, how you build the fat jar
and the logs of the failed Flink run.

Moving this discussion to the user ML because it better fits there.

Cheers,
Till

On Tue, Nov 2, 2021 at 10:22 AM Ravi Sankar Reddy Sangana 
wrote:

> Hi team,
>
> We are planning to move to reactive mode with standalone job for scaling
> options.
>
> While submitting the jobs getting errors saying the kafka conusmers and
> client related jars are missing even they packed in the fat jar and the
> same jar is running with normal session cluster.
>
> Can anyone help on how to add the jars while using standalone ?? Thanks in
> advance
>
>
> Regards,
> Ravi Sankar Reddy
>
>
>
>


Re: Issue in deploying Flink application in AKS 1.21

2021-11-01 Thread Till Rohrmann
Hi folks,

the exception says
that 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
cannot be loaded. The ProgramAbortException has been moved in Flink 1.11.0
[1] to a different place. Not sure what cloudflow.flink.Flinkstreamlet does
but it seems to rely on some older Flink 1.11.0 binaries. I'd recommend
taking a look at this component.

[1] https://issues.apache.org/jira/browse/FLINK-15090

Cheers,
Till

On Mon, Nov 1, 2021 at 12:34 PM Varatharajan, Esakkisundar <
esakkisundar.varathara...@westpac.com.au> wrote:

> Hi Till,
>
> We are getting the attached exception when running the
> application written in Flink version 1.13.2.  AKS is 1.21. Please let us
> know if you need additional information.
>
>
>
> Regards,
>
> Sundar.
>
> *From:* Till Rohrmann 
> *Sent:* Monday, 1 November 2021 4:02 PM
> *To:* user 
> *Cc:* Varatharajan, Esakkisundar ;
> Padmanabhuni, Manojkumar ; Gupta,
> Rahul 1 ; Naidu, Alagumurugan <
> alagumurugan.na...@westpac.com.au>; Shravan, Sai <
> sai.shra...@westpac.com.au>
> *Subject:* Re: Issue in deploying Flink application in AKS 1.21
>
>
>
> Hi Deepa,
>
>
>
> I cannot see the attached image. Maybe you can post the description of the
> problem.
>
>
>
> I am also moving this discussion to the user ML.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Mon, Nov 1, 2021 at 9:05 AM Sekaran, Sreedeepa <
> sreedeepa.seka...@westpac.com.au> wrote:
>
> Hi Team,
>
>
>
> We are working with Flink application (1.13.2) deployed in AKS (1.19) and
> it is working fine. At present we have upgraded the AKS version to 1.21 and
> deployed the same Flink application but facing the below issue during start
> up and the pods are going to crash loopback because of this error.
>
>
>
> Could you please let us know is it something to check with compatibilities?
>
>
>
>
>
> Thanks,
>
> Deepa
>
> +61470256508
>
>
> --
>
> Confidential communication
> Westpac Banking Corporation (ABN 33 007 457 141, AFSL 233714)
> Westpac Institutional Bank is a division of Westpac Banking Corporation
> --
>
>


Re: Issue in deploying Flink application in AKS 1.21

2021-11-01 Thread Till Rohrmann
Hi Deepa,

I cannot see the attached image. Maybe you can post the description of the
problem.

I am also moving this discussion to the user ML.

Cheers,
Till

On Mon, Nov 1, 2021 at 9:05 AM Sekaran, Sreedeepa <
sreedeepa.seka...@westpac.com.au> wrote:

> Hi Team,
>
>
>
> We are working with Flink application (1.13.2) deployed in AKS (1.19) and
> it is working fine. At present we have upgraded the AKS version to 1.21 and
> deployed the same Flink application but facing the below issue during start
> up and the pods are going to crash loopback because of this error.
>
>
>
> Could you please let us know is it something to check with compatibilities?
>
>
>
>
>
> Thanks,
>
> Deepa
>
> +61470256508
>
>
> --
> Confidential communication
> Westpac Banking Corporation (ABN 33 007 457 141, AFSL 233714)
> Westpac Institutional Bank is a division of Westpac Banking Corporation
> --
>


Re: [ANNOUNCE] Apache Flink 1.13.3 released

2021-10-22 Thread Till Rohrmann
Thanks Chesnay and Martijn for managing this release and to everyone who
contributed to it.

Cheers,
Till

On Fri, Oct 22, 2021 at 11:04 AM Yangze Guo  wrote:

> Thank Chesnay, Martijn, and everyone involved!
>
> Best,
> Yangze Guo
>
> On Fri, Oct 22, 2021 at 4:25 PM Yun Tang  wrote:
> >
> > Thanks for Chesnay & Martijn and everyone who made this release happen.
> >
> > Best
> > Yun Tang
> > 
> > From: JING ZHANG 
> > Sent: Friday, October 22, 2021 10:17
> > To: dev 
> > Cc: Martijn Visser ; Jingsong Li <
> jingsongl...@gmail.com>; Chesnay Schepler ; user <
> user@flink.apache.org>
> > Subject: Re: [ANNOUNCE] Apache Flink 1.13.3 released
> >
> > Thank Chesnay, Martijn and every contributor for making this happen!
> >
> >
> > Thomas Weise  于2021年10月22日周五 上午12:15写道:
> >
> > Thanks for making the release happen!
> >
> > On Thu, Oct 21, 2021 at 5:54 AM Leonard Xu  wrote:
> > >
> > > Thanks to Chesnay & Martijn and everyone who made this release happen.
> > >
> > >
> > > > 在 2021年10月21日,20:08,Martijn Visser  写道:
> > > >
> > > > Thank you Chesnay, Leonard and all contributors!
> > > >
> > > > On Thu, 21 Oct 2021 at 13:40, Jingsong Li  > wrote:
> > > > Thanks, Chesnay & Martijn
> > > >
> > > > 1.13.3 really solves many problems.
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Thu, Oct 21, 2021 at 6:46 PM Konstantin Knauf  > wrote:
> > > > >
> > > > > Thank you, Chesnay & Martijn, for managing this release!
> > > > >
> > > > > On Thu, Oct 21, 2021 at 10:29 AM Chesnay Schepler <
> ches...@apache.org >
> > > > > wrote:
> > > > >
> > > > > > The Apache Flink community is very happy to announce the release
> of
> > > > > > Apache Flink 1.13.3, which is the third bugfix release for the
> Apache
> > > > > > Flink 1.13 series.
> > > > > >
> > > > > > Apache Flink® is an open-source stream processing framework for
> > > > > > distributed, high-performing, always-available, and accurate data
> > > > > > streaming applications.
> > > > > >
> > > > > > The release is available for download at:
> > > > > > https://flink.apache.org/downloads.html <
> https://flink.apache.org/downloads.html>
> > > > > >
> > > > > > Please check out the release blog post for an overview of the
> > > > > > improvements for this bugfix release:
> > > > > > https://flink.apache.org/news/2021/10/19/release-1.13.3.html <
> https://flink.apache.org/news/2021/10/19/release-1.13.3.html>
> > > > > >
> > > > > > The full release notes are available in Jira:
> > > > > >
> > > > > >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350329
> <
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350329
> >
> > > > > >
> > > > > > We would like to thank all contributors of the Apache Flink
> community
> > > > > > who made this release possible!
> > > > > >
> > > > > > Regards,
> > > > > > Chesnay
> > > > > >
> > > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Konstantin Knauf
> > > > >
> > > > > https://twitter.com/snntrable 
> > > > >
> > > > > https://github.com/knaufk 
> > > >
> > > >
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > >
>


Re: [ANNOUNCE] Flink mailing lists archive service has migrated to Apache Archive service

2021-09-30 Thread Till Rohrmann
Thanks for the hint with the managed search engines Matthias. I think this
is quite helpful.

Cheers,
Till

On Wed, Sep 15, 2021 at 4:27 PM Matthias Pohl 
wrote:

> Thanks Leonard for the announcement. I guess that is helpful.
>
> @Robert is there any way we can change the default setting to something
> else (e.g. greater than 0 days)? Only having the last month available as a
> default is kind of annoying considering that the time setting is quite
> hidden.
>
> Matthias
>
> PS: As a workaround, one could use the gte=0d parameter which is encoded in
> the URL (e.g. if you use managed search engines in Chrome or Firefox's
> bookmark keywords:
> https://lists.apache.org/x/list.html?user@flink.apache.org:gte=0d:%s).
> That
> will make all posts available right-away.
>
> On Mon, Sep 6, 2021 at 3:16 PM JING ZHANG  wrote:
>
> > Thanks Leonard for driving this.
> > The information is helpful.
> >
> > Best,
> > JING ZHANG
> >
> > Jark Wu  于2021年9月6日周一 下午4:59写道:
> >
> >> Thanks Leonard,
> >>
> >> I have seen many users complaining that the Flink mailing list doesn't
> >> work (they were using Nabble).
> >> I think this information would be very helpful.
> >>
> >> Best,
> >> Jark
> >>
> >> On Mon, 6 Sept 2021 at 16:39, Leonard Xu  wrote:
> >>
> >>> Hi, all
> >>>
> >>> The mailing list archive service Nabble Archive was broken at the end
> of
> >>> June, the Flink community has migrated the mailing lists archives[1] to
> >>> Apache Archive service by commit[2], you can refer [3] to know more
> mailing
> >>> lists archives of Flink.
> >>>
> >>> Apache Archive service is maintained by ASF thus the stability is
> >>> guaranteed, it’s a web-based mail archive service which allows you to
> >>> browse, search, interact, subscribe, unsubscribe, etc. with mailing
> lists.
> >>>
> >>> Apache Archive service shows mails of the last month by default, you
> can
> >>> specify the date range to browse, search the history mails.
> >>>
> >>>
> >>> Hope it would be helpful.
> >>>
> >>> Best,
> >>> Leonard
> >>>
> >>> [1] The Flink mailing lists in Apache archive service
> >>> dev mailing list archives:
> >>> https://lists.apache.org/list.html?d...@flink.apache.org
> >>> user mailing list archives :
> >>> https://lists.apache.org/list.html?user@flink.apache.org
> >>> user-zh mailing list archives :
> >>> https://lists.apache.org/list.html?user...@flink.apache.org
> >>> [2]
> >>>
> https://github.com/apache/flink-web/commit/9194dda862da00d93f627fd315056471657655d1
> >>> [3] https://flink.apache.org/community.html#mailing-lists
> >>
> >>
>


Re: Job leader ... lost leadership with version 1.13.2

2021-09-02 Thread Till Rohrmann
Forwarding the discussion back to the user mailing list.

On Thu, Sep 2, 2021 at 12:25 PM Till Rohrmann  wrote:

> The stack trace looks ok. This happens whenever the leader loses
> leadership and this can have different reasons. What's more interesting is
> what happens before and after and what's happening on the system you use
> for HA (probably ZooKeeper). Maybe the connection to ZooKeeper is unstable
> or there is some other problem.
>
> Cheers,
> Till
>
> On Thu, Sep 2, 2021 at 12:20 PM Xiangyu Su  wrote:
>
>> Hi Till,
>> thank you very much for this fast reply!
>> This issue happens very randomly, I did some tries to reproduce that, but
>> not easy...
>> and here is the exception stacktrace from JM logs and TM logs:
>>
>> java.lang.Exception: Job leader for job id
>> 6fd38dedbca7bf65bfa57cb306930fa9 lost leadership.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2189)
>> at java.util.Optional.ifPresent(Optional.java:159)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2187)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> On Thu, 2 Sept 2021 at 12:14, Till Rohrmann  wrote:
>>
>>> Hi Xiangyu,
>>>
>>> Do you have the logs of the problematic test run available? Ideally, we
>>> can enable the DEBUG log level to get some more information. I think this
>>> information would be needed to figure out the problem.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Sep 2, 2021 at 11:47 AM Xiangyu Su  wrote:
>>>
>>>> Hello Everyone,
>>>> Hello Till,
>>>> We upgrade flink to 1.13.2, and we were facing randomly the "Job leader
>>>> ... lost leadership" error, the job keep restarting and failing...
>>>> It behaviours like this ticket
>>>> https://issues.apache.org/jira/browse/FLINK-14316
>>>>
>>>> Did anybody had same issue or any suggestions?
>>>>
>>>> Best Regards,
>>>>
>>>> --
>>>> Xiangyu Su
>>>> Java Developer
>>>> xian...@smaato.com
>>>>
>>>> Smaato Inc.
>>>> San Francisco - New York - Hamburg - Singapore
>>>> www.smaato.com
>>>>
>>>> Germany:
>>>>
>>>> Barcastraße 5
>>>>
>>>> 22087 Hamburg
>>>>
>>>> Germany
>>>> M 0049(176)43330282
>>>>
>>>> The information contained in this communication may be CONFIDENTIAL and
>>>> is intended only for the use of the recipient(s) named above. If you are
>>>> not the intended recipient, you are hereby notified that any dissemination,
>>>> distribution, or copying of this communication, or any of its contents, is
>>>> strictly prohibited. If you have received this communication in error,
>>>> please notify the sender and delete/destroy the original message and any
>>>> copy of it from your computer or paper files.
>>>>
>>>
>>
>> --
>> Xiangyu Su
>> Java Developer
>> xian...@smaato.com
>>
>> Smaato Inc.
>> San Francisco - New York - Hamburg - Singapore
>> www.smaato.com
>>
>> Germany:
>>
>> Barcastraße 5
>>
>> 22087 Hamburg
>>
>> Germany
>> M 0049(176)43330282
>>
>> The information contained in this communication may be CONFIDENTIAL and
>> is intended only for the use of the recipient(s) named above. If you are
>> not the intended recipient, you are hereby notified that any dissemination,
>> distribution, or copying of this communication, or any of its contents, is
>> strictly prohibited. If you have received this communication in error,
>> please notify the sender and delete/destroy the original message and any
>> copy of it from your computer or paper files.
>>
>


Re: Checkpointing failure, subtasks get stuck

2021-09-02 Thread Till Rohrmann
Hi Xiangyu,

Can you provide us with more information about your job, which state
backend you are using and how you've configured the checkpointing? Can you
also provide some information about the problematic checkpoints (e.g.
alignment time, async/sync duration) that you find on the checkpoint
details page? If you have access to the logs, then this could also help
better understand what is going on.

In general, such a problem can be caused by backpressure and long alignment
times. Backpressure can come from skewed data or if the user code is
performing very lengthy operations. What you could try is to enable
unaligned checkpoints if the problem is long alignment times caused by
backpressure.

Cheers,
Till

On Thu, Sep 2, 2021 at 11:48 AM Xiangyu Su  wrote:

> Hello Everyone,
> Hello Till,
> We were facing checkpointing failure issue since version 1.9, currently we
> are using  version 1.13.2
>
> We are using filesystem(s3) as statebackend, 10 mins checkpoint timeout,
> usually the checkpoint takes 10-30 seconds.
> But sometimes I have seen Job failed and restarted due to checkpoint
> timeout without huge increasing of incoming data... and also seen the
> checkpointing progress of some subtasks get stuck by e.g 7% for 10 mins.
> My guess would be somehow the thread for doing checkpointing get blocked...
>
> Any suggestions? idea will be helpful, thanks
>
>
> Best Regards,
>
> --
> Xiangyu Su
> Java Developer
> xian...@smaato.com
>
> Smaato Inc.
> San Francisco - New York - Hamburg - Singapore
> www.smaato.com
>
> Germany:
>
> Barcastraße 5
>
> 22087 Hamburg
>
> Germany
> M 0049(176)43330282
>
> The information contained in this communication may be CONFIDENTIAL and is
> intended only for the use of the recipient(s) named above. If you are not
> the intended recipient, you are hereby notified that any dissemination,
> distribution, or copying of this communication, or any of its contents, is
> strictly prohibited. If you have received this communication in error,
> please notify the sender and delete/destroy the original message and any
> copy of it from your computer or paper files.
>


Re: Job leader ... lost leadership with version 1.13.2

2021-09-02 Thread Till Rohrmann
Hi Xiangyu,

Do you have the logs of the problematic test run available? Ideally, we can
enable the DEBUG log level to get some more information. I think this
information would be needed to figure out the problem.

Cheers,
Till

On Thu, Sep 2, 2021 at 11:47 AM Xiangyu Su  wrote:

> Hello Everyone,
> Hello Till,
> We upgrade flink to 1.13.2, and we were facing randomly the "Job leader
> ... lost leadership" error, the job keep restarting and failing...
> It behaviours like this ticket
> https://issues.apache.org/jira/browse/FLINK-14316
>
> Did anybody had same issue or any suggestions?
>
> Best Regards,
>
> --
> Xiangyu Su
> Java Developer
> xian...@smaato.com
>
> Smaato Inc.
> San Francisco - New York - Hamburg - Singapore
> www.smaato.com
>
> Germany:
>
> Barcastraße 5
>
> 22087 Hamburg
>
> Germany
> M 0049(176)43330282
>
> The information contained in this communication may be CONFIDENTIAL and is
> intended only for the use of the recipient(s) named above. If you are not
> the intended recipient, you are hereby notified that any dissemination,
> distribution, or copying of this communication, or any of its contents, is
> strictly prohibited. If you have received this communication in error,
> please notify the sender and delete/destroy the original message and any
> copy of it from your computer or paper files.
>


Re: [ANNOUNCE] Apache Flink Stateful Functions 3.1.0 released

2021-09-01 Thread Till Rohrmann
Great news! Thanks a lot for all your work on the new release :-)

Cheers,
Till

On Wed, Sep 1, 2021 at 9:07 AM Johannes Moser  wrote:

> Congratulations, great job. 🎉
>
> On 31.08.2021, at 17:09, Igal Shilman  wrote:
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink Stateful Functions (StateFun) 3.1.0.
>
> StateFun is a cross-platform stack for building Stateful Serverless
> applications, making it radically simpler to develop scalable, consistent,
> and elastic distributed applications.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2021/08/31/release-statefun-3.1.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for StateFun can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>
> Python SDK for StateFun published to the PyPI index can be found at:
> https://pypi.org/project/apache-flink-statefun/
>
> Official Docker images for StateFun are published to Docker Hub:
> https://hub.docker.com/r/apache/flink-statefun
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350038&projectId=12315522
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Thanks,
> Igal
>
>
>


Re: 【Announce】Zeppelin 0.10.0 is released, Flink on Zeppelin Improved

2021-08-26 Thread Till Rohrmann
Cool, thanks for letting us know Jeff. Hopefully, many users use Zeppelin
together with Flink.

Cheers,
Till

On Thu, Aug 26, 2021 at 4:47 AM Leonard Xu  wrote:

> Thanks Jeff for the great work !
>
> Best,
> Leonard
>
> 在 2021年8月25日,22:48,Jeff Zhang  写道:
>
> Hi Flink users,
>
> We (Zeppelin community) are very excited to announce Zeppelin 0.10.0 is
> officially released. In this version, we made several improvements on Flink
> interpreter.  Here's the main features of Flink on Zeppelin:
>
>- Support multiple versions of Flink
>- Support multiple versions of Scala
>- Support multiple languages
>- Support multiple execution modes
>- Support Hive
>- Interactive development
>- Enhancement on Flink SQL
>- Multi-tenancy
>- Rest API Support
>
> Take a look at this document for more details:
> https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html
> The quickest way to try Flink on Zeppelin is via its docker image
> https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html#play-flink-in-zeppelin-docker
>
> Besides these, here’s one blog about how to run Flink sql cookbook on
> Zeppelin,
> https://medium.com/analytics-vidhya/learn-flink-sql-the-easy-way-d9d48a95ae57
> The easy way to learn Flink Sql.
>
> Hope it would be helpful for you and welcome to join our community to
> discuss with others. http://zeppelin.apache.org/community.html
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>


Re: AdaptiveScheduler stopped without exception

2021-08-24 Thread Till Rohrmann
Hi Yanjie,

The observed exception in the logs is just a side effect of the shut down
procedure. It is a bug that shutting down the Dispatcher will result in a
fatal exception coming from the ApplicationDispatcherBootstrap. I've
created a ticket in order to fix it [1].

The true reason for stopping the SessionDispatcherLeaderProcess is that the
DefaultDispatcherRunner lost its leadership. Unfortunately, we don't log
this event on info. If you enable debug log level then you should see it.
What happens when the Dispatcher loses leadership is that the Dispatcher
component will be stopped. I will improve the logging of the
DefaultDispatcherRunner to better state when it gains and loses leadership
[2]. I hope this will make the logs easier to understand.

In the second job manager log, it is effectively the same. Just with the
difference that first the ResourceManager loses its leadership. It seems as
if the cause for the leadership loss could be that 172.18.0.1:443 is no
longer reachable (probably the K8s API server).

[1] https://issues.apache.org/jira/browse/FLINK-23946
[2] https://issues.apache.org/jira/browse/FLINK-23947

Cheers,
Till

On Tue, Aug 24, 2021 at 9:56 AM yanjie  wrote:

> Hi all,
>
> I run a Application Cluster on Azure K8s, the job works fine for a
> duration, then jobmanager catches an exception:
>
> org.apache.flink.util.FlinkException: AdaptiveScheduler is being stopped.
>
> at
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.closeAsync(AdaptiveScheduler.java:415)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> .. omit
>
>
> without any other exception before. Then jobmanager executes stopping
> steps and shutdown.
> Because there's no other exception before, I don't know why
> 'AdaptiveScheduler is being stopped'.
>
> *My question:*
> What causes this issue(flink-jobmanager-1593852-jgwjt.log)?
> Is network issuse caused this exception?(as encountered in
> flink-jobmanager-1593852-kr22z.log)?
> Why first jobmanager(flink-jobmanager-1593852-jgwjt) doesn't throw any
> exception before?
>
> *Logs:*
> Attached log files contain jobmanager&taskmanager's log. I configure
> k8s-HA with jobmanager's parallelism=1 (Whether set jobmangert's
> parallelism=1 or 2, both will recurrent)
> *flink-jobmanager-1593852-jgwjt.log*:
> works fine until '2021-08-23 05:08:25'
>
> *flink-jobmanager-1593852-kr22z.log*:
> start from '2021-08-23 05:08:35' and restore my job, works fine for a
> duration, then at '2021-08-23 14:24:15'
> , jobmanager looks like occur network issue (may be Azure k8s's network
> issue, lead to flink cann't operate configmap, loose leader after k8s-ha
> lease duration).
> Until '2021-08-23 14:24:32', this jobmanager catch exception
> 'AdaptiveScheduler is being stopped' again, and then shutdown.
>
> *flink--taskexecutor-0-flink-taskmanager-1593852-56dfcd95bc-hvnps.log*:
> Contains taskmanager's logs from beginning to '2021-08-23 09:15:24'.
> Covered the first jobmanager's (flink-jobmanager-1593852-jgwjt) lifecircle.
>
>
> *Background*:
> *Deployment&Configuration*
> I follow this page :
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster
> deploy a Application Cluster to run my job. And add configurations for
> high availability on Kubernetes and use reactive scheduler mode.
> Attached yaml files contain 'flink-config' & 'flink-jobmanager' &
> 'flink-taskmanager' configurations.
>
> *Other experiences*
> In the previous test, when deploy my flink job on Azure K8s cluster, I
> encounter 'network issue' once, this issue will lead to master jobmanager
> can't renew configmap for a while,
> and then the standby jobmanager will be elected as leader, then when
> previous leader's network recovered, it knows it is not a leader any more,
> then shutdown. Because k8s's default
> configuration '*backoffLimit=6*', my flink job will be removed finally.
> I'm fixing this issue by increasing k8s ha's configurations, as this
> official docment says:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#advanced-high-availability-kubernetes-options
>
>
> *My analyse*:
> Both jobmanager's log files contain same exception: 'AdaptiveScheduler is
> being stopped'. First jobmanager doesn't print any exception before.
> The second jobmanager's print network exception, this may infer that this
> is caused by a network issue.
> And I really encounter 'network issue' in the previous test and the fix
> job is on going, May be this exception is also caused by 'network issue'.
>
> The reason why I raised this i

Re: [ANNOUNCE] Apache Flink 1.11.4 released

2021-08-10 Thread Till Rohrmann
This is great news. Thanks a lot for being our release manager Godfrey and
also to everyone who made this release possible.

Cheers,
Till

On Tue, Aug 10, 2021 at 11:09 AM godfrey he  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.11.4, which is the fourth bugfix release for the Apache Flink 1.11
> series.
>
>
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
>
>
> The release is available for download at:
>
> https://flink.apache.org/downloads.html
>
>
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
>
> https://flink.apache.org/news/2021/08/09/release-1.11.4.html
>
>
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349404
>
>
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
>
>
> Regards,
>
> Godfrey
>


Re: [ANNOUNCE] Apache Flink 1.12.5 released

2021-08-10 Thread Till Rohrmann
This is great news. Thanks a lot for being our release manager Jingsong and
also to everyone who made this release possible.

Cheers,
Till

On Tue, Aug 10, 2021 at 10:57 AM Jingsong Lee 
wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.12.5, which is the fifth bugfix release for the Apache Flink 1.12
> series.
>
>
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
>
>
> The release is available for download at:
>
> https://flink.apache.org/downloads.html
>
>
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
>
> https://flink.apache.org/news/2021/08/06/release-1.12.5.html
>
>
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350166
>
>
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
>
>
> Regards,
>
> Jingsong Lee
>


Re: [ANNOUNCE] Apache Flink 1.13.2 released

2021-08-09 Thread Till Rohrmann
Thanks Yun Tang for being our release manager and the great work! Also
thanks a lot to everyone who contributed to this release.

Cheers,
Till

On Mon, Aug 9, 2021 at 9:48 AM Yu Li  wrote:

> Thanks Yun Tang for being our release manager and everyone else who made
> the release possible!
>
> Best Regards,
> Yu
>
>
> On Fri, 6 Aug 2021 at 13:52, Yun Tang  wrote:
>
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.13.2, which is the second bugfix release for the Apache
>> Flink 1.13 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2021/08/06/release-1.13.2.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218&styleName=&projectId=12315522
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Yun Tang
>>
>


Re: Flink job failure during yarn node termination

2021-08-03 Thread Till Rohrmann
Hi Rainie,

It looks to me as if Yarn is causing this problem. Which Yarn node are you
terminating? Have you configured your Yarn cluster to be highly available
in case you are terminating the ResourceManager?

Flink should retry the operation of starting a new container in case it
fails. If this is not the case, then please upgrade to one of the actively
maintained Flink versions (1.12 or 1.13) and try whether it works with this
version.

Cheers,
Till

On Tue, Aug 3, 2021 at 9:56 AM Rainie Li  wrote:

> Hi Flink Community,
>
> My flink application is running version 1.9 and it failed to recover
> (application was running but checkpoint failed and job stopped to process
> data) during hadoop yarn node termination.
>
> *Here is job manager log error:*
> *2021-07-26 18:02:58,605 INFO
>  org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over
> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
> . Trying to fail over immediately.*
> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category READ is not supported in state standby*
> at
> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
> at
> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
> at
> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(Pa

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread Till Rohrmann
Thanks for your inputs Gen and Arnaud.

I do agree with you, Gen, that we need better guidance for our users on
when to change the heartbeat configuration. I think this should happen in
any case. I am, however, not so sure whether we can give hard threshold
like 5000 tasks, for example, because as Arnaud said it strongly depends on
the workload. Maybe we can explain it based on symptoms a user might
experience and what to do then.

Concerning your workloads, Arnaud, I'd be interested to learn a bit more.
The user code runs in its own thread. This means that its operation won't
block the main thread/heartbeat. The only thing that can happen is that the
user code starves the heartbeat in terms of CPU cycles or causes a lot of
GC pauses. If you are observing the former problem, then we might think
about changing the priorities of the respective threads. This should then
improve Flink's stability for these workloads and a shorter heartbeat
timeout should be possible.

Also for the RAM-cached repositories, what exactly is causing the heartbeat
to time out? Is it because you have a lot of GC or that the heartbeat
thread does not get enough CPU cycles?

Cheers,
Till

On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud 
wrote:

> Hello,
>
>
>
> From a user perspective: we have some (rare) use cases where we use
> “coarse grain” datasets, with big beans and tasks that do lengthy operation
> (such as ML training). In these cases we had to increase the time out to
> huge values (heartbeat.timeout: 50) so that our app is not killed.
>
> I’m aware this is not the way Flink was meant to be used, but it’s a
> convenient way to distribute our workload on datanodes without having to
> use another concurrency framework (such as M/R) that would require the
> recoding of sources and sinks.
>
>
>
> In some other (most common) cases, our tasks do some R/W accesses to
> RAM-cached repositories backed by a key-value storage such as Kudu (or
> Hbase). If most of those calls are very fast, sometimes when the system is
> under heavy load they may block more than a few seconds, and having our app
> killed because of a short timeout is not an option.
>
>
>
> That’s why I’m not in favor of very short timeouts… Because in my
> experience it really depends on what user code does in the tasks. (I
> understand that normally, as user code is not a JVM-blocking activity such
> as a GC, it should have no impact on heartbeats, but from experience, it
> really does)
>
>
>
> Cheers,
>
> Arnaud
>
>
>
>
>
> *De :* Gen Luo 
> *Envoyé :* jeudi 22 juillet 2021 05:46
> *À :* Till Rohrmann 
> *Cc :* Yang Wang ; dev ;
> user 
> *Objet :* Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval
> default values
>
>
>
> Hi,
>
> Thanks for driving this @Till Rohrmann  . I would
> give +1 on reducing the heartbeat timeout and interval, though I'm not sure
> if 15s and 3s would be enough either.
>
>
>
> IMO, except for the standalone cluster, where the heartbeat mechanism in
> Flink is totally relied, reducing the heartbeat can also help JM to find
> out faster TaskExecutors in abnormal conditions that can not respond to the
> heartbeat requests, e.g., continuously Full GC, though the process of
> TaskExecutor is alive and may not be known by the deployment system. Since
> there are cases that can benefit from this change, I think it could be done
> if it won't break the experience in other scenarios.
>
>
>
> If we can address what will block the main threads from processing
> heartbeats, or enlarge the GC costs, we can try to get rid of them to have
> a more predictable response time of heartbeat, or give some advices to
> users if their jobs may encounter these issues. For example, as far as I
> know JM of a large scale job will be more busy and may not able to process
> heartbeats in time, then we can give a advice that users working with job
> large than 5000 tasks should enlarge there heartbeat interval to 10s and
> timeout to 50s. The numbers are written casually.
>
>
>
> As for the issue in FLINK-23216, I think it should be fixed and may not be
> a main concern for this case.
>
>
>
> On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann 
> wrote:
>
> Thanks for sharing these insights.
>
>
>
> I think it is no longer true that the ResourceManager notifies the
> JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.
>
>
>
> Given the GC pauses, would you then be ok with decreasing the heartbeat
> timeout to 20 seconds? This should give enough time to do the GC and then
> still send/receive a heartbeat request.
>
>
>
> I also wanted to add that we are about to get rid of one big cause of
> blocking I/O operat

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-21 Thread Till Rohrmann
Thanks for sharing these insights.

I think it is no longer true that the ResourceManager notifies the
JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.

Given the GC pauses, would you then be ok with decreasing the heartbeat
timeout to 20 seconds? This should give enough time to do the GC and then
still send/receive a heartbeat request.

I also wanted to add that we are about to get rid of one big cause of
blocking I/O operations from the main thread. With FLINK-22483 [2] we will
get rid of Filesystem accesses to retrieve completed checkpoints. This
leaves us with one additional file system access from the main thread which
is the one completing a pending checkpoint. I think it should be possible
to get rid of this access because as Stephan said it only writes
information to disk that is already written before. Maybe solving these two
issues could ease concerns about long pauses of unresponsiveness of Flink.

[1] https://issues.apache.org/jira/browse/FLINK-23216
[2] https://issues.apache.org/jira/browse/FLINK-22483

Cheers,
Till

On Wed, Jul 21, 2021 at 4:58 AM Yang Wang  wrote:

> Thanks @Till Rohrmann   for starting this discussion
>
> Firstly, I try to understand the benefit of shorter heartbeat timeout.
> IIUC, it will make the JobManager aware of
> TaskManager faster. However, it seems that only the standalone cluster
> could benefit from this. For Yarn and
> native Kubernetes deployment, the Flink ResourceManager should get the
> TaskManager lost event in a very short time.
>
> * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
> * Less than 1 second, Flink RM has a watch for all the TaskManager pods
>
> Secondly, I am not very confident to decrease the timeout to 15s. I have
> quickly checked the TaskManager GC logs
> in the past week of our internal Flink workloads and find more than 100
> 10-seconds Full GC logs, but no one is bigger than 15s.
> We are using CMS GC for old generation.
>
>
> Best,
> Yang
>
> Till Rohrmann  于2021年7月17日周六 上午1:05写道:
>
>> Hi everyone,
>>
>> Since Flink 1.5 we have the same heartbeat timeout and interval default
>> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
>> 10s. These values were mainly chosen to compensate for lengthy GC pauses
>> and blocking operations that were executed in the main threads of Flink's
>> components. Since then, there were quite some advancements wrt the JVM's
>> GCs and we also got rid of a lot of blocking calls that were executed in
>> the main thread. Moreover, a long heartbeat.timeout causes long recovery
>> times in case of a TaskManager loss because the system can only properly
>> recover after the dead TaskManager has been removed from the scheduler.
>> Hence, I wanted to propose to change the timeout and interval to:
>>
>> heartbeat.timeout: 15s
>> heartbeat.interval: 3s
>>
>> Since there is no perfect solution that fits all use cases, I would really
>> like to hear from you what you think about it and how you configure these
>> heartbeat options. Based on your experience we might actually come up with
>> better default values that allow us to be resilient but also to detect
>> failed components fast. FLIP-185 can be found here [1].
>>
>> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>>
>> Cheers,
>> Till
>>
>


[DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-16 Thread Till Rohrmann
Hi everyone,

Since Flink 1.5 we have the same heartbeat timeout and interval default
values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
10s. These values were mainly chosen to compensate for lengthy GC pauses
and blocking operations that were executed in the main threads of Flink's
components. Since then, there were quite some advancements wrt the JVM's
GCs and we also got rid of a lot of blocking calls that were executed in
the main thread. Moreover, a long heartbeat.timeout causes long recovery
times in case of a TaskManager loss because the system can only properly
recover after the dead TaskManager has been removed from the scheduler.
Hence, I wanted to propose to change the timeout and interval to:

heartbeat.timeout: 15s
heartbeat.interval: 3s

Since there is no perfect solution that fits all use cases, I would really
like to hear from you what you think about it and how you configure these
heartbeat options. Based on your experience we might actually come up with
better default values that allow us to be resilient but also to detect
failed components fast. FLIP-185 can be found here [1].

[1] https://cwiki.apache.org/confluence/x/GAoBCw

Cheers,
Till


Re: savepoint failure

2021-07-14 Thread Till Rohrmann
Hi Dan,

Can you provide us with more information about your job (maybe even the job
code or a minimally working example), the Flink configuration, the exact
workflow you are doing and the corresponding logs and error messages?

Cheers,
Till

On Tue, Jul 13, 2021 at 9:39 PM Dan Hill  wrote:

> Could this be caused by mixing of configuration settings when running?
> Running a job with one parallelism, stop/savepointing and then recovering
> with a different parallelism?  I'd assume that's fine and wouldn't put
> create bad state.
>
> On Tue, Jul 13, 2021 at 12:34 PM Dan Hill  wrote:
>
>> I checked my code.  Our keys for streams and map state only use either
>> (1) string, (2) long IDs that don't change or (3) Tuple of 1 and 2.
>>
>> I don't know why my current case is breaking.  Our job partitions and
>> parallelism settings have not changed.
>>
>>
>>
>> On Tue, Jul 13, 2021 at 12:11 PM Dan Hill  wrote:
>>
>>> Hey.  I just hit a similar error in production when trying to
>>> savepoint.  We also use protobufs.
>>>
>>> Has anyone found a better fix to this?
>>>
>>> On Fri, Oct 23, 2020 at 5:21 AM Till Rohrmann 
>>> wrote:
>>>
>>>> Glad to hear that you solved your problem. Afaik Flink should not read
>>>> the fields of messages and call hashCode on them.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
>>>> radoslav.smilya...@smule.com> wrote:
>>>>
>>>>> Hi Till,
>>>>>
>>>>> I found my problem. It was indeed related to a mutable hashcode.
>>>>>
>>>>> I was using a protobuf message in the key selector function and one of
>>>>> the protobuf fields was enum. I checked the implementation of the hashcode
>>>>> of the generated message and it is using the int value field of the
>>>>> protobuf message so I assumed that it is ok and it's immutable.
>>>>>
>>>>> I replaced the key selector function to use Tuple[Long, Int] (since my
>>>>> protobuf message has only these two fields where the int parameter stands
>>>>> for the enum value field). After changing my code to use the Tuple it
>>>>> worked.
>>>>>
>>>>> I am not sure if Flink somehow reads the protobuf message fields and
>>>>> uses the hashcode of the fields directly since the generated protobuf enum
>>>>> indeed has a mutable hashcode (Enum.hashcode).
>>>>>
>>>>> Nevertheless it's ok with the Tuple key.
>>>>>
>>>>> Thanks for your response!
>>>>>
>>>>> Best Regards,
>>>>> Rado
>>>>>
>>>>>
>>>>> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann 
>>>>> wrote:
>>>>>
>>>>>> Hi Rado,
>>>>>>
>>>>>> it is hard to tell the reason w/o a bit more details. Could you share
>>>>>> with us the complete logs of the problematic run? Also the job you are
>>>>>> running and the types of the state you are storing in RocksDB and use as
>>>>>> events in your job are very important. In the linked SO question, the
>>>>>> problem was a type whose hashcode was not immutable.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>>>>>> radoslav.smilya...@smule.com> wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I am running a Flink job that performs data enrichment. My job has 7
>>>>>>> kafka consumers that receive messages for dml statements performed for 
>>>>>>> 7 db
>>>>>>> tables.
>>>>>>>
>>>>>>> Job setup:
>>>>>>>
>>>>>>>- Flink is run in k8s in a similar way as it is described here
>>>>>>>
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>>>>>.
>>>>>>>- 1 job manager and 2 task managers
>>>>>>>- parallelism is set to 4 and 2 task slots
>>>>>>>- rocksdb as state backend
>

Re: Job Recovery Time on TM Lost

2021-07-09 Thread Till Rohrmann
text.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:331)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> at java.lang.Thread.run(Thread.java:748)
>> ```
>> 1. It's a bit inconvenient to debug such an exception because it doesn't
>> report the exact container id. Right now we have to look for `
>> xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539`
>> <http://xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539>
>> in JobMananger log to find that.
>> 2. The task manager log doesn't show anything suspicious. Also, no major
>> GC. So it might imply a flack connection in this case.
>> 3. Is there any short term workaround we can try? any config tuning?
>> Also, what's the long term solution?
>>
>> Best
>> Lu
>>
>>
>>
>>
>> On Tue, Jul 6, 2021 at 11:45 PM 刘建刚  wrote:
>>
>>> It is really helpful to find the lost container quickly. In our inner
>>> flink version, we optimize it by task's report and jobmaster's probe. When
>>> a task fails because of the connection, it reports to the jobmaster. The
>>> jobmaster will try to confirm the liveness of the unconnected
>>> taskmanager for certain times by config. If the jobmaster find the
>>> taskmanager unconnected or dead, it releases the taskmanger. This will work
>>> for most cases. For an unstable environment, config needs adjustment.
>>>
>>> Gen Luo  于2021年7月6日周二 下午8:41写道:
>>>
>>>> Yes, I have noticed the PR and commented there with some consideration
>>>> about the new option. We can discuss further there.
>>>>
>>>> On Tue, Jul 6, 2021 at 6:04 PM Till Rohrmann 
>>>> wrote:
>>>>
>>>> > This is actually a very good point Gen. There might not be a lot to
>>>> gain
>>>> > for us by implementing a fancy algorithm for figuring out whether a
>>>> TM is
>>>> > dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
>>>> > communication does not tolerate failures and directly fails the
>>>> affected
>>>> > tasks. This assumes that the JM and TM run in the same environment.
>>>> >
>>>> > One simple approach could be to make the number of failed heartbeat
>>>> RPCs
>>>> > until a target is marked as unreachable configurable because what
>>>> > represents a good enough criterion in one user's environment might
>>>> produce
>>>> > too many false-positives in somebody else's environment. Or even
>>>> simpler,
>>>> > one could say that one can disable reacting to a failed heartbeat RPC
>>>> as it
>>>> > is currently the case.
>>>> >
>>>> > We currently have a discussion about this on this PR [1]. Maybe you
>>>> wanna
>>>> > join the discussion there and share your insights.
>>>> >
>>>> > [1] https://github.com/apache/flink/pull/16357
>>>> >
>>>> > Cheers,
>>>> > Till
>>>> >
>>>> > On Tu

Re: Job Recovery Time on TM Lost

2021-07-06 Thread Till Rohrmann
This is actually a very good point Gen. There might not be a lot to gain
for us by implementing a fancy algorithm for figuring out whether a TM is
dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
communication does not tolerate failures and directly fails the affected
tasks. This assumes that the JM and TM run in the same environment.

One simple approach could be to make the number of failed heartbeat RPCs
until a target is marked as unreachable configurable because what
represents a good enough criterion in one user's environment might produce
too many false-positives in somebody else's environment. Or even simpler,
one could say that one can disable reacting to a failed heartbeat RPC as it
is currently the case.

We currently have a discussion about this on this PR [1]. Maybe you wanna
join the discussion there and share your insights.

[1] https://github.com/apache/flink/pull/16357

Cheers,
Till

On Tue, Jul 6, 2021 at 4:37 AM Gen Luo  wrote:

> I know that there are retry strategies for akka rpc frameworks. I was just
> considering that, since the environment is shared by JM and TMs, and the
> connections among TMs (using netty) are flaky in unstable environments,
> which will also cause the job failure, is it necessary to build a
> strongly guaranteed connection between JM and TMs, or it could be as flaky
> as the connections among TMs?
>
> As far as I know, connections among TMs will just fail on their first
> connection loss, so behaving like this in JM just means "as flaky as
> connections among TMs". In a stable environment it's good enough, but in an
> unstable environment, it indeed increases the instability. IMO, though a
> single connection loss is not reliable, a double check should be good
> enough. But since I'm not experienced with an unstable environment, I can't
> tell whether that's also enough for it.
>
> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann  wrote:
>
>> I think for RPC communication there are retry strategies used by the
>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
>> ActorSystem and resume communication. Moreover, there are also
>> reconciliation protocols in place which reconcile the states between the
>> components because of potentially lost RPC messages. So the main question
>> would be whether a single connection loss is good enough for triggering the
>> timeout or whether we want a more elaborate mechanism to reason about the
>> availability of the remote system (e.g. a couple of lost heartbeat
>> messages).
>>
>> Cheers,
>> Till
>>
>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo  wrote:
>>
>>> As far as I know, a TM will report connection failure once its connected
>>> TM is lost. I suppose JM can believe the report and fail the tasks in the
>>> lost TM if it also encounters a connection failure.
>>>
>>> Of course, it won't work if the lost TM is standalone. But I suppose we
>>> can use the same strategy as the connected scenario. That is, consider it
>>> possibly lost on the first connection loss, and fail it if double check
>>> also fails. The major difference is the senders of the probes are the same
>>> one rather than two different roles, so the results may tend to be the same.
>>>
>>> On the other hand, the fact also means that the jobs can be fragile in
>>> an unstable environment, no matter whether the failover is triggered by TM
>>> or JM. So maybe it's not that worthy to introduce extra configurations for
>>> fault tolerance of heartbeat, unless we also introduce some retry
>>> strategies for netty connections.
>>>
>>>
>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann 
>>> wrote:
>>>
>>>> Could you share the full logs with us for the second experiment, Lu? I
>>>> cannot tell from the top of my head why it should take 30s unless you have
>>>> configured a restart delay of 30s.
>>>>
>>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>>>>
>>>> I've now implemented FLINK-23209 [1] but it somehow has the problem
>>>> that in a flakey environment you might not want to mark a TaskExecutor dead
>>>> on the first connection loss. Maybe this is something we need to make
>>>> configurable (e.g. introducing a threshold which admittedly is similar to
>>>> the heartbeat timeout) so that the user can configure it for her
>>>> environment. On the upside, if you mark the TaskExecutor dead on the first
>>>> connection loss (assuming you have a stable network environment), then it
>>&g

Re: Job Recovery Time on TM Lost

2021-07-05 Thread Till Rohrmann
I think for RPC communication there are retry strategies used by the
underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
ActorSystem and resume communication. Moreover, there are also
reconciliation protocols in place which reconcile the states between the
components because of potentially lost RPC messages. So the main question
would be whether a single connection loss is good enough for triggering the
timeout or whether we want a more elaborate mechanism to reason about the
availability of the remote system (e.g. a couple of lost heartbeat
messages).

Cheers,
Till

On Mon, Jul 5, 2021 at 10:00 AM Gen Luo  wrote:

> As far as I know, a TM will report connection failure once its connected
> TM is lost. I suppose JM can believe the report and fail the tasks in the
> lost TM if it also encounters a connection failure.
>
> Of course, it won't work if the lost TM is standalone. But I suppose we
> can use the same strategy as the connected scenario. That is, consider it
> possibly lost on the first connection loss, and fail it if double check
> also fails. The major difference is the senders of the probes are the same
> one rather than two different roles, so the results may tend to be the same.
>
> On the other hand, the fact also means that the jobs can be fragile in an
> unstable environment, no matter whether the failover is triggered by TM or
> JM. So maybe it's not that worthy to introduce extra configurations for
> fault tolerance of heartbeat, unless we also introduce some retry
> strategies for netty connections.
>
>
> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann  wrote:
>
>> Could you share the full logs with us for the second experiment, Lu? I
>> cannot tell from the top of my head why it should take 30s unless you have
>> configured a restart delay of 30s.
>>
>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>>
>> I've now implemented FLINK-23209 [1] but it somehow has the problem that
>> in a flakey environment you might not want to mark a TaskExecutor dead on
>> the first connection loss. Maybe this is something we need to make
>> configurable (e.g. introducing a threshold which admittedly is similar to
>> the heartbeat timeout) so that the user can configure it for her
>> environment. On the upside, if you mark the TaskExecutor dead on the first
>> connection loss (assuming you have a stable network environment), then it
>> can now detect lost TaskExecutors as fast as the heartbeat interval.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>
>> Cheers,
>> Till
>>
>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:
>>
>>> Thanks for sharing, Till and Yang.
>>>
>>> @Lu
>>> Sorry but I don't know how to explain the new test with the log. Let's
>>> wait for others' reply.
>>>
>>> @Till
>>> It would be nice if JIRAs could be fixed. Thanks again for proposing
>>> them.
>>>
>>> In addition, I was tracking an issue that RM keeps allocating and
>>> freeing slots after a TM lost until its heartbeat timeout, when I found the
>>> recovery costing as long as heartbeat timeout. That should be a minor bug
>>> introduced by declarative resource management. I have created a JIRA about
>>> the problem [1] and  we can discuss it there if necessary.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23216
>>>
>>> Lu Niu  于2021年7月2日周五 上午3:13写道:
>>>
>>>> Another side question, Shall we add metric to cover the complete
>>>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
>>>> covers phase 1. Thanks!
>>>>
>>>> Best
>>>> Lu
>>>>
>>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>>>>
>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>>>
>>>>> I did another test yesterday. In this test, I intentionally throw
>>>>> exception from the source operator:
>>>>> ```
>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>>> && errorFrenquecyInMin > 0
>>>>> && System.currentTimeMillis() - lastStartTime >=
>>>>> errorFrenquecyInMin * 60 * 1000) {
>>>>>   lastStartTime = System.currentTimeMillis();
>>>>>   throw new RuntimeException(
>>>>>   "Trigger expected exception at: " + lastStartTime);
>>>>> }
>>>>> ```
>>>>> In this case, I found phase 1 st

Re: Job Recovery Time on TM Lost

2021-07-02 Thread Till Rohrmann
Could you share the full logs with us for the second experiment, Lu? I
cannot tell from the top of my head why it should take 30s unless you have
configured a restart delay of 30s.

Let's discuss FLINK-23216 on the JIRA ticket, Gen.

I've now implemented FLINK-23209 [1] but it somehow has the problem that in
a flakey environment you might not want to mark a TaskExecutor dead on the
first connection loss. Maybe this is something we need to make configurable
(e.g. introducing a threshold which admittedly is similar to the heartbeat
timeout) so that the user can configure it for her environment. On the
upside, if you mark the TaskExecutor dead on the first connection loss
(assuming you have a stable network environment), then it can now detect
lost TaskExecutors as fast as the heartbeat interval.

[1] https://issues.apache.org/jira/browse/FLINK-23209

Cheers,
Till

On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:

> Thanks for sharing, Till and Yang.
>
> @Lu
> Sorry but I don't know how to explain the new test with the log. Let's
> wait for others' reply.
>
> @Till
> It would be nice if JIRAs could be fixed. Thanks again for proposing them.
>
> In addition, I was tracking an issue that RM keeps allocating and freeing
> slots after a TM lost until its heartbeat timeout, when I found the
> recovery costing as long as heartbeat timeout. That should be a minor bug
> introduced by declarative resource management. I have created a JIRA about
> the problem [1] and  we can discuss it there if necessary.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23216
>
> Lu Niu  于2021年7月2日周五 上午3:13写道:
>
>> Another side question, Shall we add metric to cover the complete
>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
>> covers phase 1. Thanks!
>>
>> Best
>> Lu
>>
>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>>
>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>
>>> I did another test yesterday. In this test, I intentionally throw
>>> exception from the source operator:
>>> ```
>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>> && errorFrenquecyInMin > 0
>>> && System.currentTimeMillis() - lastStartTime >=
>>> errorFrenquecyInMin * 60 * 1000) {
>>>   lastStartTime = System.currentTimeMillis();
>>>   throw new RuntimeException(
>>>   "Trigger expected exception at: " + lastStartTime);
>>> }
>>> ```
>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>> to 1s (because no need for container allocation). Why phase 1 still takes
>>> 30s even though no TM is lost?
>>>
>>> Related logs:
>>> ```
>>> 2021-06-30 00:55:07,463 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>>> 2021-06-30 00:55:07,509 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>>> RESTARTING.
>>> 2021-06-30 00:55:37,596 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>>> RUNNING.
>>> 2021-06-30 00:55:38,678 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
>>> all tasks switch from CREATED to RUNNING)
>>> ```
>>> Best
>>> Lu
>>>
>>>
>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:
>>>
>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>>
>>>> I did another test yesterday. In this test, I intentionally throw
>>>> exception from the source operator:
>>>> ```
>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>> && errorFrenquecyInMin > 0
>>>> && System.currentTimeMillis() - lastStartTime >=
>>>> errorFrenquecyInMin * 60 * 1000) {
>>>>   lastStartTime = System.currentTimeMillis();
>>>>   throw new RuntimeException(
>>>>   "Trigger expected exception at: " + lastStartTime);
>>>> }
>>>> ```
>>>

Re: Job Recovery Time on TM Lost

2021-07-01 Thread Till Rohrmann
A quick addition, I think with FLINK-23202 it should now also be possible
to improve the heartbeat mechanism in the general case. We can leverage the
unreachability exception thrown if a remote target is no longer reachable
to mark an heartbeat target as no longer reachable [1]. This can then be
considered as if the heartbeat timeout has been triggered. That way we
should detect lost TaskExecutors as fast as our heartbeat interval is.

[1] https://issues.apache.org/jira/browse/FLINK-23209

Cheers,
Till

On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:

> Since you are deploying Flink workloads on Yarn, the Flink ResourceManager
> should get the container
> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which
> is 8 seconds by default.
> And Flink ResourceManager will release the dead TaskManager container once
> received the completion event.
> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>
>
> I think most of the time cost in Phase 1 might be cancelling the tasks on
> the dead TaskManagers.
>
>
> Best,
> Yang
>
>
> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>
>> The analysis of Gen is correct. Flink currently uses its heartbeat as the
>> primary means to detect dead TaskManagers. This means that Flink will take
>> at least `heartbeat.timeout` time before the system recovers. Even if the
>> cancellation happens fast (e.g. by having configured a low
>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>> TaskManager until it is marked as dead and its slots are released (unless
>> the ResourceManager does not get a signal from the underlying resource
>> management system that a container/pod has died). One way to improve the
>> situation is to introduce logic which can react to a ConnectionException
>> and then black lists or releases a TaskManager, for example. This is
>> currently not implemented in Flink, though.
>>
>> Concerning the cancellation operation: Flink currently does not listen to
>> the dead letters of Akka. This means that the `akka.ask.timeout` is the
>> primary means to fail the future result of a rpc which could not be sent.
>> This is also an improvement we should add to Flink's RpcService. I've
>> created a JIRA issue for this problem [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:
>>
>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>
>>> Best
>>> Lu
>>>
>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:
>>>
>>>> I'm also wondering here.
>>>>
>>>> In my opinion, it's because the JM can not confirm whether the TM is
>>>> lost or it's a temporary network trouble and will recover soon, since I can
>>>> see in the log that akka has got a Connection refused but JM still sends a
>>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>>>> I'm not sure if it's indeed designed like this.
>>>>
>>>> I would really appreciate it if anyone who knows more details could
>>>> answer. Thanks.
>>>>
>>>


Re: Job Recovery Time on TM Lost

2021-07-01 Thread Till Rohrmann
The analysis of Gen is correct. Flink currently uses its heartbeat as the
primary means to detect dead TaskManagers. This means that Flink will take
at least `heartbeat.timeout` time before the system recovers. Even if the
cancellation happens fast (e.g. by having configured a low
akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
TaskManager until it is marked as dead and its slots are released (unless
the ResourceManager does not get a signal from the underlying resource
management system that a container/pod has died). One way to improve the
situation is to introduce logic which can react to a ConnectionException
and then black lists or releases a TaskManager, for example. This is
currently not implemented in Flink, though.

Concerning the cancellation operation: Flink currently does not listen to
the dead letters of Akka. This means that the `akka.ask.timeout` is the
primary means to fail the future result of a rpc which could not be sent.
This is also an improvement we should add to Flink's RpcService. I've
created a JIRA issue for this problem [1].

[1] https://issues.apache.org/jira/browse/FLINK-23202

Cheers,
Till

On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:

> Thanks Gen! cc flink-dev to collect more inputs.
>
> Best
> Lu
>
> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:
>
>> I'm also wondering here.
>>
>> In my opinion, it's because the JM can not confirm whether the TM is lost
>> or it's a temporary network trouble and will recover soon, since I can see
>> in the log that akka has got a Connection refused but JM still sends a
>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>> I'm not sure if it's indeed designed like this.
>>
>> I would really appreciate it if anyone who knows more details could
>> answer. Thanks.
>>
>


Re: Add control mode for flink

2021-06-11 Thread Till Rohrmann
Thanks for starting this discussion. I do see the benefit of dynamically
configuring your Flink job and the cluster running it. Some of the use
cases which were mentioned here are already possible. E.g. adjusting the
log level dynamically can be done by configuring an appropriate logging
backend and then changing the logging properties (log4j 2 supports this for
example). Then the remaining use cases can be categorized into two
categories:

1) changing the job
2) changing the cluster configuration

1) would benefit from general control flow events which will be processed
by all operators. 2) would require some component sending some control
events to the other Flink processes.

Implementing the control flow events can already be done to a good extent
on the user level by using a connected stream and a user level-record type
which can distinguish between control events and normal records.
Admittedly, it is a bit of work, though.

I think persisting all of these changes would be very important because
otherwise, you might end up easily in an inconsistent state. For example,
assume you have changed the log level and now a subset of the TaskManagers
needs to be restarted. Now, all of a sudden some TaskManagers log on level
X and the others on level Y. The same applies to job changes. A regional
failover would have to restore the latest dynamically configured state. All
in all, this looks like a very complex and complicated task.

On the other hand, most of the described use cases should be realizable
with a restart of a job. So if Flink were able to quickly resume a job,
then we would probably not need this feature. Applying the changes to the
Flink and the job configuration and resubmitting the job would do the
trick. Hence, improving Flink's recovery speed could be an alternative
approach to this problem.

Cheers,
Till

On Fri, Jun 11, 2021 at 9:51 AM Jary Zhen  wrote:

>  big +1 for this feature,
>
>1. Reset kafka offset in certain cases.
>2. Stop checkpoint in certain cases.
>3. Change log level for debug.
>
>
> 刘建刚  于2021年6月11日周五 下午12:17写道:
>
>> Thanks for all the discussions and suggestions. Since the topic has
>> been discussed for about a week, it is time to have a conclusion and new
>> ideas are welcomed at the same time.
>> First, the topic starts with use cases in restful interface. The
>> restful interface supported many useful interactions with users, for
>> example as follows. It is an easy way to control the job compared with
>> broadcast api.
>>
>>1. Change data processing’ logic by dynamic configs, such as filter
>>condition.
>>2. Define some tools to control the job, such as QPS limit, sampling,
>>change log level and so on.
>>
>> Second, we broaden the topic to control flow in order to support all
>> kinds of control events besides the above user cases. There is a strong
>> demand to support custom (broadcast) events for iteration, SQL control
>> events and so on. As Xintong Song said, the key to the control flow lies as
>> follows:
>>
>>1. Who (which component) is responsible for generating the control
>>messages? It may be the jobmaster by some ways, the inner operator and so
>>on.
>>2. Who (which component) is responsible for reacting to the messages.
>>3. How do the messages propagate? Flink should support sending
>>control messages by channels.
>>4. When it comes to affecting the computation logics, how should the
>>control flow work together with the exact-once consistency.  To use the
>>checkpoint mechanism, control messages flowing from source to down tasks
>>may be a good idea.
>>
>> Third, a common and flexible control flow design requires good design
>> and implementation as a base. Future features and existing features should
>> both be considered. For future features, a common restful interface is
>> first needed to support dynamic configs. For existing features, There exist
>> checkpoint barriers, watermark and latency marker. They have some special
>> behaviors but also share a lot in common. The common logic should be
>> considered but maybe they should remain unchanged until the control flow is
>> stable.
>> Some other problems as follows:
>>
>>1. How to persist the control signals when the jobmaster fails? An
>>idea is to persist control signals in HighAvailabilityServices and replay
>>them after failover. The restful request should be non-blocking.
>>2. Should all the operators receive the control messages? All
>>operators should have the ability to receive upper operators' control
>>messages but maybe not process them. If we want to persist the control
>>message state, all the subtasks belonging to one operator should have the
>>same control events in order to rescale easily.
>>
>> For the next step, I will draft a FLIP with the scope of common
>> control flow framework. More discussions, ideas and problems are still
>> welcome.
>>
>> Thank you~
>>
>

Re: How to gracefully handle job recovery failures

2021-06-11 Thread Till Rohrmann
Hi Li,

Roman is right about Flink's behavior and what you can do about it. The
idea behind its current behavior is the following: If Flink cannot recover
a job, it is very hard for it to tell whether it is due to an intermittent
problem or a permanent one. No matter how often you retry, you can always
run into the situation that you give up too early. Since we believe that
this would be a very surprising behavior because it effectively means that
Flink can forget about jobs in case of a recovery, we decided that this
situation requires the intervention of the user to resolve the situation.
By enforcing the user to make a decision, we make this problem very
explicit and require her to think about the situation. I hope this makes
sense.

So in your case, what you have to do is to remove the relevant ZooKeeper
zNode which contains the pointer to the submitted job graph file. That way,
Flink will no longer try to recover this job. I do agree that this is a bit
cumbersome and it could definitely help to offer a small tool to do this
kind of cleanup task.

Cheers,
Till

On Fri, Jun 11, 2021 at 8:24 AM Roman Khachatryan  wrote:

> Hi Li,
>
> If I understand correctly, you want the cluster to proceed recovery,
> skipping some non-recoverable jobs (but still recover others).
> The only way I can think of is to remove the corresponding nodes in
> ZooKeeper which is not very safe.
>
> I'm pulling in Robert and Till who might know better.
>
> Regards,
> Roman
>
>
> On Thu, Jun 10, 2021 at 8:56 PM Li Peng  wrote:
> >
> > Hi Roman,
> >
> > Is there a way to abandon job recovery after a few tries? By that I mean
> that this problem was fixed by me restarting the cluster and not try to
> recover a job. Is there some setting that emulates what I did, so I don't
> need to do manual intervention if this happens again??
> >
> > Thanks,
> > Li
> >
> > On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan 
> wrote:
> >>
> >> Hi Li,
> >>
> >> The missing file is a serialized job graph and the job recovery can't
> >> proceed without it.
> >> Unfortunately, the cluster can't proceed if one of the jobs can't
> recover.
> >>
> >> Regards,
> >> Roman
> >>
> >> On Thu, Jun 10, 2021 at 6:02 AM Li Peng  wrote:
> >> >
> >> > Hey folks, we have a cluster with HA mode enabled, and recently after
> doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v.
> 2.12) crashed and was stuck in a crash loop, with the following error:
> >> >
> >> > 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error
> occurred in the cluster entrypoint.
> >> > java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not recover job with job
> id .
> >> > at
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
> >> > at
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
> >> > at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
> >> > at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> >> > at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> >> > at java.base/java.lang.Thread.run(Thread.java:834)
> >> > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not
> recover job with job id .
> >> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
> >> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
> >> > at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
> >> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
> >> > at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> >> > ... 3 common frames omitted
> >> > Caused by: org.apache.flink.util.FlinkException: Could not retrieve
> submitted JobGraph from state handle under
> /. This indicates that the retrieved state
> handle is broken. Try cleaning the state handle store.
> >> > at
> org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
> >> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
> >> > ... 7 common frames omitted
> >> > Caused by: java.io.FileNotFoundException: No such file or directory:
> s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
> >> > at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileSta

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Till Rohrmann
Great :-)

On Tue, Jun 8, 2021 at 1:11 PM Yingjie Cao  wrote:

> Hi Till,
>
> Thanks for the suggestion. The blog post is already on the way.
>
> Best,
> Yingjie
>
> Till Rohrmann  于2021年6月8日周二 下午5:30写道:
>
>> Thanks for the update Yingjie. Would it make sense to write a short blog
>> post about this feature including some performance improvement numbers? I
>> think this could be interesting to our users.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li 
>> wrote:
>>
>>> Thanks Yingjie for the great effort!
>>>
>>> This is really helpful to Flink Batch users!
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
>>> wrote:
>>>
>>> > Hi devs & users,
>>> >
>>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>>> > implementation has some differences compared with the initial proposal
>>> in
>>> > the FLIP document. To avoid potential misunderstandings, I have
>>> updated the
>>> > FLIP document[1] accordingly and I also drafted another document[2]
>>> which
>>> > contains more implementation details.  FYI.
>>> >
>>> > [1]
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>>> > [2]
>>> >
>>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>>> >
>>> > Best,
>>> > Yingjie
>>> >
>>> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>>> >
>>> >> Hi devs,
>>> >>
>>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>>> >> which writes data sent to different reducer tasks into separate files
>>> >> concurrently. Compared to sort-merge based approach writes those data
>>> >> together into a single file and merges those small files into bigger
>>> ones,
>>> >> hash-based approach has several weak points when it comes to running
>>> large
>>> >> scale batch jobs:
>>> >>
>>> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
>>> >>current hash-based blocking shuffle implementation writes too many
>>> files
>>> >>concurrently which gives high pressure to the file system, for
>>> example,
>>> >>maintenance of too many file metas, exhaustion of inodes or file
>>> >>descriptors. All of these can be potential stability issues.
>>> Sort-Merge
>>> >>based blocking shuffle don’t have the problem because for one
>>> result
>>> >>partition, only one file is written at the same time.
>>> >>2. *Performance*: Large amounts of small shuffle files and random
>>> IO
>>> >>can influence shuffle performance a lot especially for hdd (for
>>> ssd,
>>> >>sequential read is also important because of read ahead and
>>> cache). For
>>> >>batch jobs processing massive data, small amount of data per
>>> subpartition
>>> >>is common because of high parallelism. Besides, data skew is
>>> another cause
>>> >>of small subpartition files. By merging data of all subpartitions
>>> together
>>> >>in one file, more sequential read can be achieved.
>>> >>3. *Resource*: For current hash-based implementation, each
>>> >>subpartition needs at least one buffer. For large scale batch
>>> shuffles, the
>>> >>memory consumption can be huge. For example, we need at least 320M
>>> network
>>> >>memory per result partition if parallelism is set to 1 and
>>> because of
>>> >>the huge network consumption, it is hard to config the network
>>> memory for
>>> >>large scale batch job and  sometimes parallelism can not be
>>> increased just
>>> >>because of insufficient network memory  which leads to bad user
>>> experience.
>>> >>
>>> >> To improve Flink’s capability of running large scale batch jobs, we
>>> would
>>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>>> >> feedback is appreciated.
>>> >>
>>> >> [1]
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>> >>
>>> >> Best,
>>> >> Yingjie
>>> >>
>>> >
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Till Rohrmann
Thanks for the update Yingjie. Would it make sense to write a short blog
post about this feature including some performance improvement numbers? I
think this could be interesting to our users.

Cheers,
Till

On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li  wrote:

> Thanks Yingjie for the great effort!
>
> This is really helpful to Flink Batch users!
>
> Best,
> Jingsong
>
> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
> wrote:
>
> > Hi devs & users,
> >
> > The FLIP-148[1] has been released with Flink 1.13 and the final
> > implementation has some differences compared with the initial proposal in
> > the FLIP document. To avoid potential misunderstandings, I have updated
> the
> > FLIP document[1] accordingly and I also drafted another document[2] which
> > contains more implementation details.  FYI.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> > [2]
> >
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
> >
> > Best,
> > Yingjie
> >
> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
> >
> >> Hi devs,
> >>
> >> Currently, Flink adopts a hash-style blocking shuffle implementation
> >> which writes data sent to different reducer tasks into separate files
> >> concurrently. Compared to sort-merge based approach writes those data
> >> together into a single file and merges those small files into bigger
> ones,
> >> hash-based approach has several weak points when it comes to running
> large
> >> scale batch jobs:
> >>
> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
> >>current hash-based blocking shuffle implementation writes too many
> files
> >>concurrently which gives high pressure to the file system, for
> example,
> >>maintenance of too many file metas, exhaustion of inodes or file
> >>descriptors. All of these can be potential stability issues.
> Sort-Merge
> >>based blocking shuffle don’t have the problem because for one result
> >>partition, only one file is written at the same time.
> >>2. *Performance*: Large amounts of small shuffle files and random IO
> >>can influence shuffle performance a lot especially for hdd (for ssd,
> >>sequential read is also important because of read ahead and cache).
> For
> >>batch jobs processing massive data, small amount of data per
> subpartition
> >>is common because of high parallelism. Besides, data skew is another
> cause
> >>of small subpartition files. By merging data of all subpartitions
> together
> >>in one file, more sequential read can be achieved.
> >>3. *Resource*: For current hash-based implementation, each
> >>subpartition needs at least one buffer. For large scale batch
> shuffles, the
> >>memory consumption can be huge. For example, we need at least 320M
> network
> >>memory per result partition if parallelism is set to 1 and
> because of
> >>the huge network consumption, it is hard to config the network
> memory for
> >>large scale batch job and  sometimes parallelism can not be
> increased just
> >>because of insufficient network memory  which leads to bad user
> experience.
> >>
> >> To improve Flink’s capability of running large scale batch jobs, we
> would
> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> >> feedback is appreciated.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >>
> >> Best,
> >> Yingjie
> >>
> >
>
> --
> Best, Jingsong Lee
>


Re: recover from svaepoint

2021-06-03 Thread Till Rohrmann
Thanks for this insight. So the problem might be Flink using an internal
Kafka API (the connector uses reflection to get hold of the
TransactionManager) which changed between version 2.4.1 and 2.5. I think
this is a serious problem because it breaks our end-to-end exactly once
story when using new Kafka versions.

Cheers,
Till

On Thu, Jun 3, 2021 at 10:17 AM Tianxin Zhao  wrote:

> I encountered the exact same issue before when experimenting in a testing
> environment. I was not able to spot the bug as mentioned in this thread,
> the solution I did was to downgrade my own kafka-client version from 2.5 to
> 2.4.1, matching the version of flink-connector-kafka.
> In 2.4.1 Kafka, TransactionManager is initializing producerIdAndEpoch using
>
> this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID,
>> NO_PRODUCER_EPOCH);
>
>
> instead of
>
>> this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
>
>
> On Thu, Jun 3, 2021 at 12:11 AM Till Rohrmann 
> wrote:
>
>> Thanks for the update. Skimming over the code it looks indeed that we are
>> overwriting the values of the static value ProducerIdAndEpoch.NONE. I am
>> not 100% how this will cause the observed problem, though. I am also not a
>> Flink Kafka connector and Kafka expert so I would appreciate it if someone
>> more familiar could double check this part of the code.
>>
>> Concerning the required changing of the UID of an operator Piotr, is this
>> a known issue and documented somewhere? I find this rather surprising from
>> a user's point of view.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann 
>> wrote:
>>
>>> Forwarding 周瑞's message to a duplicate thread:
>>>
>>> After our analysis, we found a bug in the
>>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
>>> method
>>> The analysis process is as follows:
>>>
>>>
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>>> public void initializeState(FunctionInitializationContext context)
>>> throws Exception {
>>> state =
>>> context.getOperatorStateStore().getListState(stateDescriptor);
>>> boolean recoveredUserContext = false;
>>> if (context.isRestored()) {
>>> LOG.info("{} - restoring state", name());
>>> for (State operatorState : state.get()) {
>>> userContext = operatorState.getContext();
>>> List> recoveredTransactions =
>>> operatorState.getPendingCommitTransactions();
>>> List handledTransactions = new
>>> ArrayList<>(recoveredTransactions.size() + 1);
>>> for (TransactionHolder recoveredTransaction :
>>> recoveredTransactions) {
>>> // If this fails to succeed eventually, there is
>>> actually data loss
>>> recoverAndCommitInternal(recoveredTransaction);
>>> handledTransactions.add(recoveredTransaction.handle);
>>> LOG.info("{} committed recovered transaction {}",
>>> name(), recoveredTransaction);
>>> }
>>>
>>> {
>>> TXN transaction =
>>> operatorState.getPendingTransaction().handle;
>>> recoverAndAbort(transaction);
>>> handledTransactions.add(transaction);
>>> LOG.info(
>>> "{} aborted recovered transaction {}",
>>> name(),
>>> operatorState.getPendingTransaction());
>>> }
>>>
>>> if (userContext.isPresent()) {
>>> finishRecoveringContext(handledTransactions);
>>> recoveredUserContext = true;
>>> }
>>> }
>>> }
>>>
>>> (1)recoverAndCommitInternal(recoveredTransaction);
>>> The previous transactionalid, producerId and epoch in the state are used
>>> to commit the transaction,However, we find that the producerIdAndEpoch of
>>> transactionManager is a static constant (ProducerIdAndEpoch.NONE), which
>>> pollutes the static constant ProducerIdAndEpoch.NONE
>>>
>>> @Override
>>> protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState
>>> transaction) {
>>> if (transaction.isTransactional()) {
>>> FlinkKafkaInternalProducer pro

Re: recover from svaepoint

2021-06-03 Thread Till Rohrmann
Thanks for the update. Skimming over the code it looks indeed that we are
overwriting the values of the static value ProducerIdAndEpoch.NONE. I am
not 100% how this will cause the observed problem, though. I am also not a
Flink Kafka connector and Kafka expert so I would appreciate it if someone
more familiar could double check this part of the code.

Concerning the required changing of the UID of an operator Piotr, is this a
known issue and documented somewhere? I find this rather surprising from a
user's point of view.

Cheers,
Till

On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann  wrote:

> Forwarding 周瑞's message to a duplicate thread:
>
> After our analysis, we found a bug in the
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
> method
> The analysis process is as follows:
>
>
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
> public void initializeState(FunctionInitializationContext context) throws
> Exception {
> state = context.getOperatorStateStore().getListState(stateDescriptor);
> boolean recoveredUserContext = false;
> if (context.isRestored()) {
> LOG.info("{} - restoring state", name());
> for (State operatorState : state.get()) {
> userContext = operatorState.getContext();
> List> recoveredTransactions =
> operatorState.getPendingCommitTransactions();
> List handledTransactions = new
> ArrayList<>(recoveredTransactions.size() + 1);
> for (TransactionHolder recoveredTransaction :
> recoveredTransactions) {
> // If this fails to succeed eventually, there is actually
> data loss
> recoverAndCommitInternal(recoveredTransaction);
> handledTransactions.add(recoveredTransaction.handle);
> LOG.info("{} committed recovered transaction {}", name(),
> recoveredTransaction);
> }
>
> {
> TXN transaction =
> operatorState.getPendingTransaction().handle;
> recoverAndAbort(transaction);
> handledTransactions.add(transaction);
> LOG.info(
> "{} aborted recovered transaction {}",
> name(),
> operatorState.getPendingTransaction());
> }
>
> if (userContext.isPresent()) {
> finishRecoveringContext(handledTransactions);
> recoveredUserContext = true;
> }
> }
> }
>
> (1)recoverAndCommitInternal(recoveredTransaction);
> The previous transactionalid, producerId and epoch in the state are used
> to commit the transaction,However, we find that the producerIdAndEpoch of
> transactionManager is a static constant (ProducerIdAndEpoch.NONE), which
> pollutes the static constant ProducerIdAndEpoch.NONE
>
> @Override
> protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState
> transaction) {
> if (transaction.isTransactional()) {
> FlinkKafkaInternalProducer producer = null;
> try {
> producer =
> initTransactionalProducer(transaction.transactionalId, false);
> producer.resumeTransaction(transaction.producerId,
> transaction.epoch);
> producer.commitTransaction();
> } catch (InvalidTxnStateException | ProducerFencedException ex) {
> // That means we have committed this transaction before.
> LOG.warn(
> "Encountered error {} while recovering transaction {}.
> "
> + "Presumably this transaction has been
> already committed before",
> ex,
> transaction);
> } finally {
> if (producer != null) {
> producer.close(0, TimeUnit.SECONDS);
> }
> }
> }
> }
>
> public void resumeTransaction(long producerId, short epoch) {
> synchronized (producerClosingLock) {
> ensureNotClosed();
> Preconditions.checkState(
> producerId >= 0 && epoch >= 0,
> "Incorrect values for producerId %s and epoch %s",
> producerId,
> epoch);
> LOG.info(
> "Attempting to resume transaction {} with producerId {}
> and epoch {}",
> transactionalId,
> producerId,
> epoch);
>
> Object transactionManager = getField(kafkaProducer,
> "transactionManager");
> synchronized (transac

Re: recover from svaepoint

2021-06-02 Thread Till Rohrmann
   if (!isEpochBump) {
transitionTo(State.INITIALIZING);
log.info("Invoking InitProducerId for the first time in order
to acquire a producer ID");
} else {
log.info("Invoking InitProducerId with current producer ID and
epoch {} in order to bump the epoch", producerIdAndEpoch);
}
InitProducerIdRequestData requestData = new
InitProducerIdRequestData()
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch);
InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
isEpochBump);
enqueueRequest(handler);
return handler.result;
}, State.INITIALIZING);
}

On Wed, Jun 2, 2021 at 3:55 PM Piotr Nowojski  wrote:

> Hi,
>
> I think there is no generic way. If this error has happened indeed after
> starting a second job from the same savepoint, or something like that, user
> can change Sink's operator UID.
>
> If this is an issue of intentional recovery from an earlier
> checkpoint/savepoint, maybe `FlinkKafkaProducer#setLogFailuresOnly` will be
> helpful.
>
> Best, Piotrek
>
> wt., 1 cze 2021 o 15:16 Till Rohrmann  napisał(a):
>
>> The error message says that we are trying to reuse a transaction id that
>> is
>> currently being used or has expired.
>>
>> I am not 100% sure how this can happen. My suspicion is that you have
>> resumed a job multiple times from the same savepoint. Have you checked
>> that
>> there is no other job which has been resumed from the same savepoint and
>> which is currently running or has run and completed checkpoints?
>>
>> @pnowojski  @Becket Qin  how
>> does the transaction id generation ensures that we don't have a clash of
>> transaction ids if we resume the same job multiple times from the same
>> savepoint? From the code, I do see that we have a
>> TransactionalIdsGenerator
>> which is initialized with the taskName and the operator UID.
>>
>> fyi: @Arvid Heise 
>>
>> Cheers,
>> Till
>>
>>
>> On Mon, May 31, 2021 at 11:10 AM 周瑞  wrote:
>>
>> > HI:
>> >   When "sink.semantic = exactly-once", the following exception is
>> > thrown when recovering from svaepoint
>> >
>> >public static final String KAFKA_TABLE_FORMAT =
>> > "CREATE TABLE "+TABLE_NAME+" (\n" +
>> > "  "+COLUMN_NAME+" STRING\n" +
>> > ") WITH (\n" +
>> > "   'connector' = 'kafka',\n" +
>> > "   'topic' = '%s',\n" +
>> > "   'properties.bootstrap.servers' = '%s',\n" +
>> > "   'sink.semantic' = 'exactly-once',\n" +
>> > "   'properties.transaction.timeout.ms' =
>> > '90',\n" +
>> > "   'sink.partitioner' =
>> > 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
>> > "   'format' = 'dbz-json'\n" +
>> > ")\n";
>> >   [] - Source: TableSourceScan(table=[[default_catalog,
>> default_database,
>> > debezium_source]], fields=[data]) -> Sink: Sink
>> > (table=[default_catalog.default_database.KafkaTable], fields=[data])
>> (1/1
>> > )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
>> > FAILED with failure cause: org.apache.kafka.common.KafkaException:
>> > Unexpected error in InitProducerIdResponse; Producer attempted an
>> > operation with an old epoch. Either there is a newer producer with the
>> > same transactionalId, or the producer's transaction has been expired by
>> > the broker.
>> > at org.apache.kafka.clients.producer.internals.
>> >
>> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
>> > .java:1352)
>> > at org.apache.kafka.clients.producer.internals.
>> > TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
>> > 1260)
>> > at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
>> > .java:109)
>> > at org.apache.kafka.clients.NetworkClient.completeResponses(
>> > NetworkClient.java:572)
>> > at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
>> > at org.apache.kafka.clients.producer.internals.Sender
>> > .maybeSendAndPollTransactionalRequest(Sender.java:414)
>> > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
>> > .java:312)
>> > at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
>> > 239)
>> > at java.lang.Thread.run(Thread.java:748)
>> >
>>
>


Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

2021-06-02 Thread Till Rohrmann
 Hi Alexey,

I think the current idleness detection works based on timeouts. You need a
special watermark generator that periodically emits the watermarks. If no
event has been emitted for so and so long, then it is marked as idle.

Yes, I was referring to FLINK-18450. At the moment nobody is actively
working on it, but it is on the roadmap for improvements for the new source
APIs (FLIP-27).

Cheers,
Till

On Tue, Jun 1, 2021 at 8:55 PM Alexey Trenikhun  wrote:

> Hi Till,
>
> >However, this will stall the whole reading process if there is a
> partition which has no more data. Hence, you will probably also need a
> mechanism to advance the watermark if the partition becomes idle.
> This is why I need to find out is partition idle. Looks like Kafka Flink
> Connector definitely has this information,  looks like derived
> class KafkaTopicPartitionStateWithWatermarkGenerator has immediateOutput
> and deferredOutput have field state which has *idle* flag.
>
> Thank you for information about new KafkaConnector, I assume that you are
> referring to [1], but it seems also stalled. Or you are talking about
> different task ?
>
> [1]-https://issues.apache.org/jira/browse/FLINK-18450
> [FLINK-18450] Add watermark alignment logic to SourceReaderBase. - ASF JIRA
> <https://issues.apache.org/jira/browse/FLINK-18450>
> trigger comment-preview_link fieldId comment fieldName Comment
> rendererType atlassian-wiki-renderer issueKey FLINK-18450 Preview comment
> issues.apache.org
>
> Thanks,
> Alexey
> --
> *From:* Till Rohrmann 
> *Sent:* Tuesday, June 1, 2021 6:24 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: How to check is Kafka partition "idle" in
> emitRecordsWithTimestamps
>
> Hi Alexey,
>
> looking at KafkaTopicPartitionStatus, it looks that it does not contain
> this information. In a nutshell, what you probably have to do is to
> aggregate the watermarks across all partitions and then pause the
> consumption of a partition if its watermark advances too much wrt to the
> minimum watermark. However, this will stall the whole reading process if
> there is a partition which has no more data. Hence, you will probably also
> need a mechanism to advance the watermark if the partition becomes idle.
>
> Note that the community is currently working on a new KafkaConnector based
> on Flink's new source API (FLIP-27). If I am not mistaken, then these new
> interfaces should eventually also support event time alignment.
>
> Cheers,
> Till
>
> On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun  wrote:
>
> Hello,
> I'm thinking about implementing custom Kafka connector which provides
> event alignment (similar to FLINK-10921, which seems abandoned). What is
> the way to determine is partition is idle from override
> of AbstractFetcher.emitRecordsWithTimestamps()?
> Does KafkaTopicPartitionState has this information ?
>
> Thanks,
> Alexey
>
>


Re: S3 + Parquet credentials issue

2021-06-01 Thread Till Rohrmann
Hi Angelo,

what Svend has written is very good advice. Additionally, you could give us
a bit more context by posting the exact stack trace and the exact
configuration you use to deploy the Flink cluster. To me this looks like a
configuration/setup problem in combination with AWS.

Cheers,
Till

On Mon, May 31, 2021 at 10:49 PM Svend  wrote:

> Hi Angelo,
>
> I've not worked exactly in the situation you described, although I've had
> to configure S3 access from a Flink application recently and here are a
> couple of things I learnt along the way:
>
> * You should normally not need to include flink-s3-fs-hadoop nor
> hadoop-mapreduce-client-core in your classpath but should rather make
> flink-s3-fs-hadoop available to Flink by putting it into the plugins
> folder. The motivation for that is that this jar is a fat jar containing a
> lot of hadoop and aws classes, s.t. including it in your classpath quickly
> leads to conflicts. The plugins folder is associated with a separate
> classpath, with helps avoiding those conflicts.
>
> * Under the hood, Fink is using the hadoop-aws library to connect to s3 =>
> the documentation regarding how to configure it, and especially security
> accesses, is available in [1]
>
> * Ideally, when running on AWS, your code should not be using
> BasicAWSCredentialsProvider, but instead the application should assume a
> role, which you associate with some IAM permission.  If that's your case,
> the specific documentation for that situation is in [2]. If you're running
> some test locally on your laptop, BasicAWSCredentialsProvider with some
> key id and secret pointing to a dev account may be appropriate.
>
> * As I understand it, any configuration entry in flink.yaml that starts
> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in
> [3]) => by reading documentation in [1] and [2] you might be able to figure
> out which parameters are relevant to your case, which you can then set with
> the mechanism just mentioned. For example, in my case, I simply add this to
> flink.yaml:
>
> fs.s3a.aws.credentials.provider:
> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
>
> * You can debug the various operations that are attempted on S3 by setting
> this logger to DEBUG level:  org.apache.hadoop.fs.s3a
>
>
> Good luck :)
>
> Svend
>
>
>
> [1]
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html
> [2]
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/assumed_roles.html
> [3]
> https://ververica.zendesk.com/hc/en-us/articles/360002452379-How-to-set-arbitrary-S3-configuration-options-Hadoop-S3A-Presto-S3-in-Flink-
>
>
> On Mon, 31 May 2021, at 3:52 PM, Angelo G. wrote:
>
> Hello,
>
> Trying to read a parquet file located in S3 leads to a AWS credentials
> exception. Switching to other format (raw, for example) works ok regarding
> to file access.
>
> This is a snippet of code to reproduce the issue:
>
> static void parquetS3Error() {
>
> EnvironmentSettings settings = 
> EnvironmentSettings.*newInstance*().inBatchMode().useBlinkPlanner().build();
>
> TableEnvironment t_env = TableEnvironment.*create*(settings);
>
> // parquet format gives error:
> // Caused by: java.net.SocketTimeoutException: doesBucketExist on 
> bucket-prueba-medusa: com.amazonaws.AmazonClientException:
> // No AWS Credentials provided by BasicAWSCredentialsProvider 
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
> // com.amazonaws.SdkClientException: Failed to connect to service 
> endpoint:
> t_env.executeSql("CREATE TABLE backup (  `date` STRING,  `value` INT) 
> WITH ( 'connector' = 'filesystem', 'path' = 's3a://.../', 'format' = 
> 'parquet')");
>
> // other formats (i.e. raw) work properly:
> // Job has been submitted with JobID 6ecd31d322aba759f9b8b591e9f4fed5
> //++
> //|url |
> //++
> //| [80, 65, 82, 49, 21, 0, 21,... |
> //| [0, 0, 0, 50, 48, 50, 49, 4... |
> t_env.executeSql("CREATE TABLE backup (  `url` BINARY) WITH ( 'connector' 
> = 'filesystem', 'path' = 's3a://.../', 'format' = 'raw')");
>
> Table t1 = t_env.from("backup");
>
> t1.execute().print();
>
> }
>
> Flink version is 1.12.2.
>
> Please find attached the pom with dependencies and version numbers.
>
> What would be a suitable workaround for this?
>
> Thank you very much.
>
> Angelo.
>
>
>
>
> *Attachments:*
>
>- pom.xml
>
>
>


Re: Reading Flink states from svaepoint uning State Processor API

2021-06-01 Thread Till Rohrmann
Hi Min,

Usually, you should be able to provide type information and thereby a
serializer via the StateDescriptors which you create to access the state.
If this is not working, then you need to give us a bit more context to
understand what's not working.

I am also pulling in Seth who is the original author of the state processor
API.

Cheers,
Till

On Mon, May 31, 2021 at 4:00 PM Tan, Min  wrote:

> Hi,
>
>
>
> I am using Flink 1.10.1 and try to read the flink states from a savepoint
> using Flink state processor API.
>
> It works well when state types are the normal Java type or Java POJOs.
>
>
>
> When Avro generated Java classes are used as the state type, it does not
> read any states anymore.
>
>
>
> Are any additional customer serializers required in this situation?
>
>
>
> Regards,
>
> Min
>
>
>
>
>


Re: savepoint fail

2021-06-01 Thread Till Rohrmann
Responded as part of the following discussion
https://lists.apache.org/x/thread.html/re85a1fb3f17b5ef1af2844fc1a07b6d2f5e6237bf4c33059e13890ee@%3Cuser.flink.apache.org%3E.
Let's continue the discussion there.

Cheers,
Till

On Mon, May 31, 2021 at 11:02 AM 周瑞  wrote:

> HI:
>   When "sink.semantic = exactual-only", the following exception is
> thrown when recovering from svaepoint
>
>public static final String KAFKA_TABLE_FORMAT =
> "CREATE TABLE "+TABLE_NAME+" (\n" +
> "  "+COLUMN_NAME+" STRING\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'topic' = '%s',\n" +
> "   'properties.bootstrap.servers' = '%s',\n" +
> "   'sink.semantic' = 'exactly-once',\n" +
> "   'properties.transaction.timeout.ms' =
> '90',\n" +
> "   'sink.partitioner' =
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
> "   'format' = 'dbz-json'\n" +
> ")\n";
>   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
> at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
> at java.lang.Thread.run(Thread.java:748)
>


Re: Got exception when running the localhost cluster

2021-06-01 Thread Till Rohrmann
Hi Lingfeng,

Youngwoo is right. Flink currently officially supports Java 8 and Java 11.

Cheers,
Till

On Mon, May 31, 2021 at 9:00 AM Youngwoo Kim (김영우)  wrote:

> Hi Lingfeng,
>
> I believe Java 8 or 11 is appropriate for the Flink cluster at this point.
> I'm not sure that Flink 1.13 supports Java 16 officially.
>
> Thanks,
> Youngwoo
>
> On Mon, May 31, 2021 at 2:49 PM Lingfeng Pu  wrote:
>
>> Hi,
>>
>> I'm new to Flink. I got a problem when running the local cluster on my
>> computer. Some key software information as follows:
>>
>> 1. Flink version: 1.13.0 for Scala 2.11;
>> 2. OS: Fedora 34;
>> 3. Java version: 16;
>> 4. Scala version: 2.11.12.
>>
>> When I started up the local cluster by command line, everything seems
>> fine from the command line, BUT I could not access the localhost:8081 is
>> failed to open. Furthermore, the exception comes out when I running the
>> Flink example, please see all the details below:
>>
>> [root@localhost flink-1.13.0]# ./bin/start-cluster.sh
>> Starting cluster.
>> Starting standalonesession daemon on host fedora.
>> Starting taskexecutor daemon on host fedora.
>> [root@localhost flink-1.13.0]# ./bin/flink run
>> examples/streaming/WordCount.jar
>> Executing WordCount example with default input data set.
>> Use --input to specify file input.
>> Printing result to stdout. Use --output to specify output path.
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Unable to make field private final byte[]
>> java.lang.String.value accessible: module java.base does not "opens
>> java.lang" to unnamed module @2baf3d81
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>> Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make
>> field private final byte[] java.lang.String.value accessible: module
>> java.base does not "opens java.lang" to unnamed module @2baf3d81
>> at
>> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357)
>> at
>> java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
>> at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177)
>> at java.base/java.lang.reflect.Field.setAccessible(Field.java:171)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:106)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2053)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:203)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1243)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:937)
>> at
>> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:567)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>> ... 8 more
>>
>> I tried search solutions online, but nothing useful for me so far. I
>> urgently need some specific advice about how to solve this issue! I'll be
>> grateful for that :)
>>
>>


Re: Flink kafka

2021-06-01 Thread Till Rohrmann
Responded as part of the following discussion
https://lists.apache.org/x/thread.html/re85a1fb3f17b5ef1af2844fc1a07b6d2f5e6237bf4c33059e13890ee@%3Cuser.flink.apache.org%3E.
Let's continue the discussion there.

Cheers,
Till

On Sun, May 30, 2021 at 2:32 PM 周瑞  wrote:

>
> 程序用于测试 flink kafka exactly once, 普通提交可以正常运行, 但如果从 savepoint 中恢复就会报下面的错误
> kafka server 端, 配置了  transaction.timeout.ms = 90
>
> public static final String KAFKA_TABLE_FORMAT =
> "CREATE TABLE "+TABLE_NAME+" (\n" +
> "  "+COLUMN_NAME+" STRING\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'topic' = '%s',\n" +
> "   'properties.bootstrap.servers' = '%s',\n" +
> "   'sink.semantic' = 'exactly-once',\n" +
> "   'properties.transaction.timeout.ms' = '90',\n" +
> "   'sink.partitioner' = 
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
> "   'properties.max.in.flight.requests.per.connection' = 
> '1',\n" +
> "   'properties.enable.idempotence' = 'true',\n" +
> "   'properties.transactional.id' = '%s',\n" +
> "   'format' = 'dbz-json'\n" +
> ")\n";
>
>
> 2021-05-30 19:38:57,513 WARN org.apache.flink.runtime.taskmanager.Task []
> - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable2], fields=[data]) (1/1
> )#144518 (4739d37a5f82268901f8fb51b39735e9) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
> at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
> at java.lang.Thread.run(Thread.java:748)
>
> 查阅了 google 上的相关资料, 但是仍然无法解决, 有人遇到过类似的问题? 或者能提供排查思路么?
>


Re: classloader.resolve-order is not honored when submitting job to a remote cluster

2021-06-01 Thread Till Rohrmann
Hi Tao,

I think this looks like a bug to me. Could it be that this problem is
covered by [1, 2]? Maybe you can review this PR and check whether it solves
the problem. If yes, then let's quickly get it in.

[1] https://issues.apache.org/jira/browse/FLINK-21445
[2] https://github.com/apache/flink/pull/15020

Cheers,
Till

On Sun, May 30, 2021 at 9:41 AM tao xiao  wrote:

> Hi team,
>
> I discovered that child first class loader is always used to initialize
> the main program when submitting the job to a yarn cluster using
> application mode regardless of what value classloader.resolve-order is set
> in flink-conf.yaml. But this is not the case if I submit the same job with
> the same config to the local cluster which honors the config and use the
> correct class loader to load the main program. Here is the log from local
> cluster
>
> 2021-05-30 15:01:16,372 INFO  org.apache.flink.client.cli.CliFrontend
>  [] -
> 
> 2021-05-30 15:01:16,375 INFO  org.apache.flink.client.cli.CliFrontend
>  [] -  Starting Command Line Client (Version: 1.12.1,
> Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
> [trim down the log]
> *2021-05-30 15:01:16,616 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: classloader.resolve-order, parent-first*
> 2021-05-30 15:01:16,763 WARN  org.apache.flink.runtime.util.HadoopUtils
>  [] - Could not find Hadoop configuration via any of the
> supported methods (Flink configuration, environment variables).
> [trim down the log]
> 2021-05-30 15:01:16,830 INFO  org.apache.flink.client.ClientUtils
>  [] - Starting program (detached: false)
> *2021-05-30 15:01:16,871 INFO  io.demo.flink.WordCount
>  [] - Loaded by
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6*
>
> Here is the log from yarn cluster
> 2021-05-30 07:20:14,434 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> 
> 2021-05-30 07:20:14,438 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>  Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11,
> Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
> [trim down the log]
> 2021-05-30 07:20:15,205 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 2048m
> *2021-05-30 07:20:15,205 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: classloader.resolve-order, parent-first*
> 2021-05-30 07:20:15,205 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: metrics.scope.jm, flink.jobmanager
> [trim down the log]
> *2021-05-30 07:20:21,383 INFO  io.demo.flink.WordCount
>  [] - Loaded by
> org.apache.flink.util.ChildFirstClassLoader@3da30852*
>
> Here is the job to reproduce the problem
>
> public static void main(String[] args) throws Exception {
>
>  // set up the execution environment
>  final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>  LOG.info("Loaded by {}", WordCount.class.getClassLoader());
>  // get input data
>  DataStreamSource text = env.fromElements(
>"To be, or not to be,--that is the question:--",
>"Whether 'tis nobler in the mind to suffer",
>"The slings and arrows of outrageous fortune",
>"Or to take arms against a sea of troubles,"
>);
>
>text.print();
>  env.execute("demo job");
>
> }
>
>
> Flink version 1.12.1
>
> I believe the inconsistency is the result of user defined flink-conf not
> passed to PackageProgram which uses default config instead
>
>
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java#L109
>
> Not sure if this is expected behavior that we never assume the main
> program is loaded with the configured class loader
> --
> Regards,
> Tao
>


Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

2021-06-01 Thread Till Rohrmann
Hi Kai,

The rejection you are seeing should not be serious. The way this can happen
is the following: If Yarn restarts the application master, Flink will try
to recover previously started containers. If this is not possible or Yarn
only tells about a subset of the previously allocated containers, then it
can happen that if a container that has not been reported to the new
ResourceManager tries to register is rejected because it is not known. The
idea behind this behaviour is to only accept those resources which one has
knowingly requested in order to free other resources which might belong to
another Yarn application.

In any case, the newly started Flink ResourceManager should request new
containers so that there are enough TaskManagers available to run your job
(assuming that the Yarn cluster has enough resources). Hence, the cluster
should recover from this situation and there should not be a lot to worry
about.

Cheers,
Till

On Sun, May 30, 2021 at 7:36 AM Kai Fu  wrote:

> Hi team,
>
> We encountered an issue during recovery from checkpoint. It's recovering
> because the downstream Kafka sink is full for a while and the job is failed
> and keeps trying to recover(The downstream is full for about 4 hours). The
> job cannot recover from checkpoint successfully even if after we scaled up
> the Kafka cluster and shows the following exception. Is there any guidance
> on how to locate and avoid this kind of issue?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *2021-05-30 01:31:21,419 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
> Connecting to ResourceManager
> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*().2021-05-30
> 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor
> [] - Resolved ResourceManager address, beginning registration2021-05-30
> 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor
> [] - Fatal error occurred in TaskExecutor
> akka.tcp://flink@ip-10-61-98-204.ec2.internal:36631/user/rpc/taskmanager_0.org.apache.flink.util.FlinkException:
> The TaskExecutor's registration at the ResourceManager
> akka.tcp://flink@ip-10-61-98-18.ec2.internal:36833/user/rpc/resourcemanager_*
> has been rejected: Rejected TaskExecutor registration at the ResourceManger
> because: The ResourceManager does not recognize this TaskExecutor.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> ~[?:1.8.0_272]at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> ~[?:1.8.0_272]at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_272]at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.13.1.jar:1.13.1]at
> akka.actor.Actor$class.aroundReceive

Re: Flink Metrics Naming

2021-06-01 Thread Till Rohrmann
Hi Mason,

The idea is that a metric is not uniquely identified by its name alone but
instead by its path. The groups in which it is defined specify this path
(similar to directories). That's why it is valid to specify two metrics
with the same name if they reside in different groups.

I think Prometheus does not support such a tree structure and that's why
the path is exposed via labels if I am not mistaken. So long story short,
what you are seeing is a combination of how Flink organizes metrics and
what can be reported to Prometheus.

I am also pulling in Chesnay who is more familiar with this part of the
code.

Cheers,
Till

On Fri, May 28, 2021 at 7:33 PM Mason Chen  wrote:

> Can anyone give insight as to why Flink allows 2 metrics with the same
> “name”?
>
> For example,
>
> getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);
>
> And
>
> getRuntimeContext.addGroup(“other_group”,
> “other_group1”).counter(“myMetricName”);
>
> Are totally valid.
>
>
> It seems that it has lead to some not-so-great implementations—the
> prometheus reporter and attaching the labels to the metric name, making the
> name quite verbose.
>
>
>


Re: How to check is Kafka partition "idle" in emitRecordsWithTimestamps

2021-06-01 Thread Till Rohrmann
Hi Alexey,

looking at KafkaTopicPartitionStatus, it looks that it does not contain
this information. In a nutshell, what you probably have to do is to
aggregate the watermarks across all partitions and then pause the
consumption of a partition if its watermark advances too much wrt to the
minimum watermark. However, this will stall the whole reading process if
there is a partition which has no more data. Hence, you will probably also
need a mechanism to advance the watermark if the partition becomes idle.

Note that the community is currently working on a new KafkaConnector based
on Flink's new source API (FLIP-27). If I am not mistaken, then these new
interfaces should eventually also support event time alignment.

Cheers,
Till

On Fri, May 28, 2021 at 7:17 PM Alexey Trenikhun  wrote:

> Hello,
> I'm thinking about implementing custom Kafka connector which provides
> event alignment (similar to FLINK-10921, which seems abandoned). What is
> the way to determine is partition is idle from override
> of AbstractFetcher.emitRecordsWithTimestamps()?
> Does KafkaTopicPartitionState has this information ?
>
> Thanks,
> Alexey
>


Re: recover from svaepoint

2021-06-01 Thread Till Rohrmann
The error message says that we are trying to reuse a transaction id that is
currently being used or has expired.

I am not 100% sure how this can happen. My suspicion is that you have
resumed a job multiple times from the same savepoint. Have you checked that
there is no other job which has been resumed from the same savepoint and
which is currently running or has run and completed checkpoints?

@pnowojski  @Becket Qin  how
does the transaction id generation ensures that we don't have a clash of
transaction ids if we resume the same job multiple times from the same
savepoint? From the code, I do see that we have a TransactionalIdsGenerator
which is initialized with the taskName and the operator UID.

fyi: @Arvid Heise 

Cheers,
Till


On Mon, May 31, 2021 at 11:10 AM 周瑞  wrote:

> HI:
>   When "sink.semantic = exactly-once", the following exception is
> thrown when recovering from svaepoint
>
>public static final String KAFKA_TABLE_FORMAT =
> "CREATE TABLE "+TABLE_NAME+" (\n" +
> "  "+COLUMN_NAME+" STRING\n" +
> ") WITH (\n" +
> "   'connector' = 'kafka',\n" +
> "   'topic' = '%s',\n" +
> "   'properties.bootstrap.servers' = '%s',\n" +
> "   'sink.semantic' = 'exactly-once',\n" +
> "   'properties.transaction.timeout.ms' =
> '90',\n" +
> "   'sink.partitioner' =
> 'com.woqutench.qmatrix.cdc.extractor.PkPartitioner',\n" +
> "   'format' = 'dbz-json'\n" +
> ")\n";
>   [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -> Sink: Sink
> (table=[default_catalog.default_database.KafkaTable], fields=[data]) (1/1
> )#859 (075273be72ab01bf1afd3c066876aaa6) switched from INITIALIZING to
> FAILED with failure cause: org.apache.kafka.common.KafkaException:
> Unexpected error in InitProducerIdResponse; Producer attempted an
> operation with an old epoch. Either there is a newer producer with the
> same transactionalId, or the producer's transaction has been expired by
> the broker.
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager
> .java:1352)
> at org.apache.kafka.clients.producer.internals.
> TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:
> 1260)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse
> .java:109)
> at org.apache.kafka.clients.NetworkClient.completeResponses(
> NetworkClient.java:572)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
> at org.apache.kafka.clients.producer.internals.Sender
> .maybeSendAndPollTransactionalRequest(Sender.java:414)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender
> .java:312)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
> 239)
> at java.lang.Thread.run(Thread.java:748)
>


Re: Getting Abstract method Error with flink 1.13.0 and 1.12.0

2021-06-01 Thread Till Rohrmann
Hi Dipanjan,

this type of question is best sent to Flink's user mailing list because
there are a lot more people using Flink who could help you. The dev mailing
list is intended to be used for development discussions.

Cheers,
Till

On Tue, Jun 1, 2021 at 1:31 PM Dipanjan Mazumder  wrote:

> Hi ,
>
>I have integrated flink-siddhi library ([com.github.haoch/flink-siddhi_2.11
> "0.2.2-SNAPSHOT"]
> )
> and i tried to configure and implement control stream from flink-siddh and
> it broke with AbstractMethodError. When i tried running the same with flink
> 1.11.0 it worked.
>
> More Details is given in this stack overflow link : Flink-Siddhi control
> event failing to start
> 
>
> Any help on this will be very great and will help me go forward:
>
> Flink-Siddhi control event failing to start
>
> While trying to configure and implement flink-
> siddhi(https://clojars.org/com.github.haoch/flink-siddhi_2.11/ver...
>
> 
>
>
>
> [com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"]
>
> [com.github.haoch/flink-siddhi_2.11 "0.2.2-SNAPSHOT"] null
>
> 
>
> Regards
> Dipanjan
>
>


Re: How to use or configure flink checkpointing with siddhi internal state

2021-06-01 Thread Till Rohrmann
Hi Dipanjan,

I am assuming that you are using the flink-siddhi library [1]. I am not an
expert but it looks as if the AbstractSiddhiOperator overrides the
snapshotState [2] method to store the Siddhi state in Flink.

[1] https://github.com/haoch/flink-siddhi
[2]
https://github.com/haoch/flink-siddhi/blob/master/core/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java#L331

Cheers,
Till

On Mon, May 31, 2021 at 7:27 PM Dipanjan Mazumder 
wrote:

> Hi ,
>I was trying to do checkpointing while using siddhi as the CEP engine
> running on flink. While using siddhi windowing , it uses an internal state
> to aggregated or perform operation on a bucket of events pertaining to a
> specific time window. But what i am not sure is how can that state be
> mapped to Flinks internal state so that i can use flink checkpointing to
> safeguard the internal state of the siddhi operators in the face of failure.
> Any help or pointer for this will be of great help to me.Thanks in advance.
> Dipanjan -


Re: [ANNOUNCE] Apache Flink 1.13.1 released

2021-05-31 Thread Till Rohrmann
Thanks for the great work Dawid and to everyone who has contributed to this
release.

Cheers,
Till

On Mon, May 31, 2021 at 10:25 AM Yangze Guo  wrote:

> Thanks, Dawid for the great work, thanks to everyone involved.
>
> Best,
> Yangze Guo
>
> On Mon, May 31, 2021 at 4:14 PM Youngwoo Kim (김영우) 
> wrote:
> >
> > Got it.
> > Thanks Dawid for the clarification.
> >
> > - Youngwoo
> >
> > On Mon, May 31, 2021 at 4:50 PM Dawid Wysakowicz 
> wrote:
> >>
> >> Hi Youngwoo,
> >>
> >> Usually we publish the docker images a day after the general release, so
> >> that the artifacts are properly distributed across Apache mirrors. You
> >> should be able to download the docker images from apache/flink now. It
> >> may take a few extra days to have the images published as the official
> >> image, as it depends on the maintainers of docker hub.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 31/05/2021 08:01, Youngwoo Kim (김영우) wrote:
> >> > Great work! Thank you Dawid and all of the contributors.
> >> > I'm eager to adopt the new release, however can't find docker images
> for
> >> > that from https://hub.docker.com/_/flink
> >> >
> >> > Hope it'll be available soon.
> >> >
> >> > Thanks,
> >> > Youngwoo
> >> >
> >> >
> >> > On Sat, May 29, 2021 at 1:49 AM Dawid Wysakowicz <
> dwysakow...@apache.org>
> >> > wrote:
> >> >
> >> >> The Apache Flink community is very happy to announce the release of
> Apache
> >> >> Flink 1.13.1, which is the first bugfix release for the Apache Flink
> 1.13
> >> >> series.
> >> >>
> >> >> Apache Flink® is an open-source stream processing framework for
> >> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> >> applications.
> >> >>
> >> >> The release is available for download at:
> >> >> https://flink.apache.org/downloads.html
> >> >>
> >> >> Please check out the release blog post for an overview of the
> improvements
> >> >> for this bugfix release:
> >> >> https://flink.apache.org/news/2021/05/28/release-1.13.1.html
> >> >>
> >> >> The full release notes are available in Jira:
> >> >>
> >> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350058
> >> >>
> >> >> We would like to thank all contributors of the Apache Flink
> community who
> >> >> made this release possible!
> >> >>
> >> >> Regards,
> >> >> Dawid Wysakowicz
> >> >>
> >>
>


Re: Issue with using siddhi extension function with flink

2021-05-21 Thread Till Rohrmann
Hi Dipanjan,

Please double check whether the libraries are really contained in the job
jar you are submitting because if the library is contained in this jar,
then it should be on the classpath and you should be able to load it.

Cheers,
Till

On Thu, May 20, 2021 at 3:43 PM Dipanjan Mazumder  wrote:

> Hi ,
>
>i am trying to integrate siddhi with flink while trying to use siddhi
> extension function on deploying the job in flink cluster it is not able to
> find those libraries at run time , so i had to explicitly put those
> libraries to the /opt/flink/lib folder for the jobmanager and taskmanager ,
> fat jar of the flink job application has those libraries but it cannot
> identify those extension functions at runtime and putting them to the lib
> folder is not a feasible choice. Can you give some pointer on this
> problem.. thanks in advance ..
>
>
> I have tried multiple ways to load the classes using class.forname etc..
> but nothing works even if the fat jar for the flink job application has the
> siddhi extensions in it.i don’t want to add those libraries to the
> jobmanage and taskmanagers lib folder everytime.
>
>
> Any help will be appreciated.
>
>
> Regards
>
> Dipanjan
>


Re: SIGSEGV error

2021-05-18 Thread Till Rohrmann
Great to hear that you fixed the problem by specifying an explicit
serializer for the state.

Cheers,
Till

On Tue, May 18, 2021 at 9:43 AM Joshua Fan  wrote:

> Hi Till,
> I also tried the job without gzip, it came into the same error.
> But the problem is solved now. I was about to give up to solve it, I found
> the mail at
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JVM-crash-SIGSEGV-in-ZIP-GetEntry-td17326.html.
> So I think maybe it was something about the serialize staff.
> What I have done is :
> before:
>
> OperatorStateStore stateStore = context.getOperatorStateStore();
> ListStateDescriptor lsd = new 
> ListStateDescriptor("bucket-states",State.class);
>
> after:
>
> OperatorStateStore stateStore = context.getOperatorStateStore();
> ListStateDescriptor lsd = new ListStateDescriptor("bucket-states",new 
> JavaSerializer());
>
> Hope this is helpful.
>
> Yours sincerely
> Josh
>
>
>
> Till Rohrmann  于2021年5月18日周二 下午2:54写道:
>
>> Hi Joshua,
>>
>> could you try whether the job also fails when not using the gzip format?
>> This could help us narrow down the culprit. Moreover, you could try to run
>> your job and Flink with Java 11 now.
>>
>> Cheers,
>> Till
>>
>> On Tue, May 18, 2021 at 5:10 AM Joshua Fan 
>> wrote:
>>
>>> Hi all,
>>>
>>> Most of the posts says that "Most of the times, the crashes in
>>> ZIP_GetEntry occur when the jar file being accessed has been
>>> modified/overwritten while the JVM instance was running. ", but do not
>>> know when and which jar file was modified according to the job running in
>>> flink.
>>>
>>> for your information.
>>>
>>> Yours sincerely
>>> Josh
>>>
>>> Joshua Fan  于2021年5月18日周二 上午10:15写道:
>>>
>>>> Hi Stephan, Till
>>>>
>>>> Recently, I tried to upgrade a flink job from 1.7 to 1.11,
>>>> unfortunately, the weird problem appeared, " SIGSEGV (0xb) at
>>>> pc=0x0025, pid=135306, tid=140439001388800".  The pid log is
>>>> attached.
>>>> Actually, it is a simple job that consumes messages from kafka and
>>>> writes into hdfs with a gzip format. It can run in 1.11 for about 2
>>>> minutes, then the JVM will crash, then job restart and jvm crash again
>>>> until the application fails.
>>>> I also tried to set -Dsun.zip.disableMemoryMapping=true,but it turns
>>>> out helpless, the same crash keeps happening. Google suggests to upgrade
>>>> jdk to jdk1.9, but it is not feasible.
>>>> Any suggestions? Thanks a lot.
>>>>
>>>> Yours sincerely
>>>> Josh
>>>>
>>>> Stephan Ewen  于2019年9月13日周五 下午11:11写道:
>>>>
>>>>> Given that the segfault happens in the JVM's ZIP stream code, I am
>>>>> curious is this is a bug in Flink or in the JVM core libs, that happens to
>>>>> be triggered now by newer versions of FLink.
>>>>>
>>>>> I found this on StackOverflow, which looks like it could be related:
>>>>> https://stackoverflow.com/questions/38326183/jvm-crashed-in-java-util-zip-zipfile-getentry
>>>>> Can you try the suggested option "-Dsun.zip.disableMemoryMapping=true"
>>>>> ?
>>>>>
>>>>>
>>>>> On Fri, Sep 13, 2019 at 11:36 AM Till Rohrmann 
>>>>> wrote:
>>>>>
>>>>>> Hi Marek,
>>>>>>
>>>>>> could you share the logs statements which happened before the SIGSEGV
>>>>>> with us? They might be helpful to understand what happened before.
>>>>>> Moreover, it would be helpful to get access to your custom serializer
>>>>>> implementations. I'm also pulling in Gordon who worked on
>>>>>> the TypeSerializerSnapshot improvements.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Thu, Sep 12, 2019 at 9:28 AM Marek Maj 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an
>>>>>>> upgrade our task managers started to fail with SIGSEGV error from time 
>>>>>>> to
>>>>>>> time.
>>>>>>>
>>>>>>> In process of a

Re: SIGSEGV error

2021-05-17 Thread Till Rohrmann
Hi Joshua,

could you try whether the job also fails when not using the gzip format?
This could help us narrow down the culprit. Moreover, you could try to run
your job and Flink with Java 11 now.

Cheers,
Till

On Tue, May 18, 2021 at 5:10 AM Joshua Fan  wrote:

> Hi all,
>
> Most of the posts says that "Most of the times, the crashes in
> ZIP_GetEntry occur when the jar file being accessed has been
> modified/overwritten while the JVM instance was running. ", but do not
> know when and which jar file was modified according to the job running in
> flink.
>
> for your information.
>
> Yours sincerely
> Josh
>
> Joshua Fan  于2021年5月18日周二 上午10:15写道:
>
>> Hi Stephan, Till
>>
>> Recently, I tried to upgrade a flink job from 1.7 to 1.11, unfortunately,
>> the weird problem appeared, " SIGSEGV (0xb) at pc=0x0025,
>> pid=135306, tid=140439001388800".  The pid log is attached.
>> Actually, it is a simple job that consumes messages from kafka and writes
>> into hdfs with a gzip format. It can run in 1.11 for about 2 minutes, then
>> the JVM will crash, then job restart and jvm crash again until the
>> application fails.
>> I also tried to set -Dsun.zip.disableMemoryMapping=true,but it turns out
>> helpless, the same crash keeps happening. Google suggests to upgrade jdk to
>> jdk1.9, but it is not feasible.
>> Any suggestions? Thanks a lot.
>>
>> Yours sincerely
>> Josh
>>
>> Stephan Ewen  于2019年9月13日周五 下午11:11写道:
>>
>>> Given that the segfault happens in the JVM's ZIP stream code, I am
>>> curious is this is a bug in Flink or in the JVM core libs, that happens to
>>> be triggered now by newer versions of FLink.
>>>
>>> I found this on StackOverflow, which looks like it could be related:
>>> https://stackoverflow.com/questions/38326183/jvm-crashed-in-java-util-zip-zipfile-getentry
>>> Can you try the suggested option "-Dsun.zip.disableMemoryMapping=true"?
>>>
>>>
>>> On Fri, Sep 13, 2019 at 11:36 AM Till Rohrmann 
>>> wrote:
>>>
>>>> Hi Marek,
>>>>
>>>> could you share the logs statements which happened before the SIGSEGV
>>>> with us? They might be helpful to understand what happened before.
>>>> Moreover, it would be helpful to get access to your custom serializer
>>>> implementations. I'm also pulling in Gordon who worked on
>>>> the TypeSerializerSnapshot improvements.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Sep 12, 2019 at 9:28 AM Marek Maj  wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an
>>>>> upgrade our task managers started to fail with SIGSEGV error from time to
>>>>> time.
>>>>>
>>>>> In process of adjusting the code to 1.8.1, we noticed that there were
>>>>> some changes around TypeSerializerSnapshot interface and its
>>>>> implementations. At that time we had a few custom serializers which we
>>>>> decided to throw out during migration and then leverage flink default
>>>>> serializers. We don't mind clearing the state in the process of migration,
>>>>> an effort to migrate with state seems to be not worth it.
>>>>>
>>>>> Unfortunately after running new version we see SIGSEGV errors from
>>>>> time to time. It may be that serialization is not the real cause, but at
>>>>> the moment it seems to be the most probable reason. We have not performed
>>>>> any significant code changes besides serialization area.
>>>>>
>>>>> We run job on yarn, hdp version 2.7.3.2.6.2.0-205.
>>>>> Checkpoint configuration: RocksDB backend, not incremental, 50s min
>>>>> processing time
>>>>>
>>>>> You can find parts of JobManager log and ErrorFile log of failed
>>>>> container included below.
>>>>>
>>>>> Any suggestions are welcome
>>>>>
>>>>> Best regards
>>>>> Marek Maj
>>>>>
>>>>> jobmanager.log
>>>>>
>>>>> 019-09-10 16:30:28.177 INFO  o.a.f.r.c.CheckpointCoordinator   -
>>>>> Completed checkpoint 47 for job c8a9ae03785ade86348c3189cf7dd965
>>>>> (18532488122 bytes in 60871 ms).
>>>>>
>>>>> 2019-09-10 16:31:19.223 INFO  o.a.f.r.c.Checkpoin

Re: Issues while writing data to a parquet sink

2021-05-17 Thread Till Rohrmann
Hi Adi,

To me, this looks like a version conflict of some kind. Maybe you use
different Avro versions for your user program and on your Flink cluster.
Could you check that you don't have conflicting versions on your classpath?
It would also be helpful to have a minimal example that allows
reproducing the problem (e.g. repo with Flink job and test data).

I am forwarding this mail also to the user ML. Maybe some other community
members have seen this problem before.

Cheers,
Till

On Fri, May 14, 2021 at 9:53 PM Adishesh Kishore  wrote:

> Hi Till,
>
> I am using an avro schema to write data to a parquet sink. I am getting
> the following stack trace for some reason(Though I am not using a decimal
> logical type anywhere),
>
> java.lang.IncompatibleClassChangeError: class org.apache.avro.
> LogicalTypes$Decimal has interface org.apache.avro.LogicalType as super
> class
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:
> 142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at org.apache.parquet.avro.AvroSchemaConverter.convertField(
> AvroSchemaConverter.java:180)
> at org.apache.parquet.avro.AvroSchemaConverter.convertUnion(
> AvroSchemaConverter.java:214)
> at org.apache.parquet.avro.AvroSchemaConverter.convertField(
> AvroSchemaConverter.java:171)
> at org.apache.parquet.avro.AvroSchemaConverter.convertField(
> AvroSchemaConverter.java:130)
> at org.apache.parquet.avro.AvroSchemaConverter.convertField(
> AvroSchemaConverter.java:227)
> at org.apache.parquet.avro.AvroSchemaConverter.convertFields(
> AvroSchemaConverter.java:124)
> at org.apache.parquet.avro.AvroSchemaConverter.convert(
> AvroSchemaConverter.java:115)
> at org.apache.parquet.avro.AvroParquetWriter.writeSupport(
> AvroParquetWriter.java:150)
> at org.apache.parquet.avro.AvroParquetWriter.access$200(
> AvroParquetWriter.java:36)
> at org.apache.parquet.avro.AvroParquetWriter$Builder.getWriteSupport(
> AvroParquetWriter.java:182)
> at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter
> .java:529)
> at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
> .createAvroParquetWriter(ParquetAvroWriters.java:87)
> at org.apache.flink.formats.parquet.avro.ParquetAvroWriters
> .lambda$forGenericRecord$abd75386$1(ParquetAvroWriters.java:61)
> at org.apache.flink.formats.parquet.ParquetWriterFactory.create(
> ParquetWriterFactory.java:57)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> BulkPartWriter$Factory.openNew(BulkPartWriter.java:103)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .rollPartFile(Bucket.java:222)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .write(Bucket.java:212)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
> .onElement(Buckets.java:378)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> StreamingParquetSink.invoke(StreamingParquetSink.java:555)
>
> I am using the following schema to create the writer,
>
> {
>   "type": "error",
>   "name": "root",
>   "namespace": "root",
>   "doc": "",
>   "fields": [
> {
>   "name": "account_id",
>   "type": [
> "long",
> "null"
>   ],
>   "doc": "",
>   "default": 0
> },
> {
>   "name": "amount",
>   "type": [
> "long",
> "null"
>   ],
>   "doc": "",
>   "default": 0
> },
> {
>   "name": "date",
>   "type": [
> {
>   "type": "string",
>   "logicalType": "date"
> },
> "null"
>   ],
>   "doc": "",
>   "default": ""
> },
> {
>   "name": "duration",
>   "type": [
> "long",
> "null"
>   ],
>   "doc": "",
>   "default": 0
> },
> {
>   "name": "loan_id",
>   "type": [
> "long",
> "null"
>   ],
>   "doc": "",
>   "default": 0
> },
> {
>   "name": "payments",
>   "type": [
> "double",
> "null"
>   ],
>   "doc": "",
>   "default": 0
> },
> {
>   "name": "status",
>   "type": [
> "string",
> "null"
>   ],
>   "doc": "",
>   "default": ""
> }
>   ]
> }
>
>
> Appreciate any help in advance

Re: Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

2021-05-17 Thread Till Rohrmann
One small addition: The old mapping looks to use the
SubtaskStateMapper.RANGE whereas the new mapping looks to use the
SubtaskStateMapper.ROUND_ROBIN.

On Mon, May 17, 2021 at 11:56 AM Till Rohrmann  wrote:

> Hi ChangZhuo Chen,
>
> This looks like a bug in Flink. Could you provide us with the logs of the
> run and more information about your job? In particular, how does your
> topology look like?
>
> My suspicion is the following: You have an operator with two inputs. One
> input is keyed whereas the other input is something else. Due to this
> property, the JobVertex has two different SubtaskStateMappers assigned
> which produce different subtask mappings in the TaskStateAssignment. This
> is the exception you are observing. The initial deployment works because
> you don't have any state to recover. However, subsequent recoveries should
> fail. I am also pulling in Arvid who worked on the subtask state assignment
> recently and might be able to shed some more light on this matter.
>
> Cheers,
> Till
>
> On Fri, May 14, 2021 at 4:35 AM ChangZhuo Chen (陳昌倬) 
> wrote:
>
>> Hi,
>>
>> We run our application in Flink 1.13.0, Kubernetes standalone
>> application cluster with reactive mode enabled. The application has
>> stopped and cannot restore today, so we try to restore the application
>> from checkpoint. However, the application cannot restart from checkpoint
>> due to the following error. We have no idea the meaning of this
>> exception, so any help is welcome.
>>
>>
>> 2021-05-14 01:55:37,204 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster [] - Close
>> ResourceManager connection 06d772aae2ab4afb8c6917dac40cd727: Stopping
>> JobMaster for job
>> rt-match_11.2.16_5d671ba3()..
>> 2021-05-14 01:55:37,205 INFO
>> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
>> Stopping DefaultLeaderRetrievalService.
>> 2021-05-14 01:55:37,205 INFO
>> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver
>> [] - Stopping
>> KubernetesLeaderRetrievalDriver{configMapName='rt-match-flink-dev-resourcemanager-leader'}.
>> 2021-05-14 01:55:37,205 INFO
>> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
>> [] - The watcher is closing.
>> 2021-05-14 01:55:37,215 INFO
>> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>> Stopping DefaultLeaderElectionService.
>> 2021-05-14 01:55:37,215 INFO
>> org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver
>> [] - Closing
>> KubernetesLeaderElectionDriver{configMapName='rt-match-flink-dev--jobmanager-leader'}.
>> 2021-05-14 01:55:37,216 INFO
>> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
>> [] - The watcher is closing.
>> 2021-05-14 01:55:37,342 INFO
>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed
>> job graph  from
>> KubernetesStateHandleStore{configMapName='rt-match-flink-dev-dispatcher-leader'}.
>> 2021-05-14 01:55:37,914 INFO
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
>> [] - Application FAILED:
>> java.util.concurrent.CompletionException:
>> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
>> Application Status: FAILED
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$4(ApplicationDispatcherBootstrap.java:304)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
>> ~[?:?]
>> at
>> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
>> ~[?:?]
>> at
>> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094)
>> ~[?:?]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.unwrapJobResultException(ApplicationDispatcherBootstrap.java:297)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$getApplicationResult$3(ApplicationDispatcherBootstrap.java:270)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> at
>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>> ~[?:?]
>> at
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Array

Re: Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

2021-05-17 Thread Till Rohrmann
Hi ChangZhuo Chen,

This looks like a bug in Flink. Could you provide us with the logs of the
run and more information about your job? In particular, how does your
topology look like?

My suspicion is the following: You have an operator with two inputs. One
input is keyed whereas the other input is something else. Due to this
property, the JobVertex has two different SubtaskStateMappers assigned
which produce different subtask mappings in the TaskStateAssignment. This
is the exception you are observing. The initial deployment works because
you don't have any state to recover. However, subsequent recoveries should
fail. I am also pulling in Arvid who worked on the subtask state assignment
recently and might be able to shed some more light on this matter.

Cheers,
Till

On Fri, May 14, 2021 at 4:35 AM ChangZhuo Chen (陳昌倬) 
wrote:

> Hi,
>
> We run our application in Flink 1.13.0, Kubernetes standalone
> application cluster with reactive mode enabled. The application has
> stopped and cannot restore today, so we try to restore the application
> from checkpoint. However, the application cannot restart from checkpoint
> due to the following error. We have no idea the meaning of this
> exception, so any help is welcome.
>
>
> 2021-05-14 01:55:37,204 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Close
> ResourceManager connection 06d772aae2ab4afb8c6917dac40cd727: Stopping
> JobMaster for job
> rt-match_11.2.16_5d671ba3()..
> 2021-05-14 01:55:37,205 INFO
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
> Stopping DefaultLeaderRetrievalService.
> 2021-05-14 01:55:37,205 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver
> [] - Stopping
> KubernetesLeaderRetrievalDriver{configMapName='rt-match-flink-dev-resourcemanager-leader'}.
> 2021-05-14 01:55:37,205 INFO
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
> [] - The watcher is closing.
> 2021-05-14 01:55:37,215 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
> 2021-05-14 01:55:37,215 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver
> [] - Closing
> KubernetesLeaderElectionDriver{configMapName='rt-match-flink-dev--jobmanager-leader'}.
> 2021-05-14 01:55:37,216 INFO
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapWatcher
> [] - The watcher is closing.
> 2021-05-14 01:55:37,342 INFO
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed
> job graph  from
> KubernetesStateHandleStore{configMapName='rt-match-flink-dev-dispatcher-leader'}.
> 2021-05-14 01:55:37,914 INFO
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
> [] - Application FAILED:
> java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
> Application Status: FAILED
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$4(ApplicationDispatcherBootstrap.java:304)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094)
> ~[?:?]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.unwrapJobResultException(ApplicationDispatcherBootstrap.java:297)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$getApplicationResult$3(ApplicationDispatcherBootstrap.java:270)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> ~[?:?]
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
> ~[?:?]
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> ~[?:?]
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> ~[?:?]
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> ~[?:?]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.getApplicationResult(ApplicationDispatcherBootstrap.java:272)
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(Applicati

Re: Task Local Recovery with mountable disks in the cloud

2021-05-10 Thread Till Rohrmann
Hi Sonam,

I think it would be great to create a FLIP for this feature. FLIPs don't
have to be super large and in this case, I could see it work to express the
general idea to make local recovery work across TaskManager failures and
then outline the different ideas we had so far. If we then decide to go
with the persisting of cache information (the AllocationIDs), then this
could be a good outcome. If we decide to go with the more complex solution
of telling the ResourceManager and JobMaster about the ranges of cached
state data, then this is also ok.

Cheers,
Till

On Fri, May 7, 2021 at 6:30 PM Sonam Mandal  wrote:

> Hi Till,
>
> Thanks for getting back to me. Apologies for my delayed response.
>
> Thanks for confirming that the slot ID (Allocation ID) is indeed necessary
> today for task local recovery to kick in, and thanks for your insights on
> how to make this work.
>
> We are interested in exploring this disaggregation between local state
> storage and slots to allow potential reuse of local state even when TMs go
> down.
>
> I'm planning to spend some time exploring the Flink code around local
> recovery and state persistence. I'm still new to Flink, so any guidance
> will be helpful. I think both of your ideas on how to make this happen
> are interesting and worth exploring. What's the procedure to collaborate or
> get guidance on this feature? Will a FLIP be required, or will opening a
> ticket do?
>
> Thanks,
> Sonam
> --
> *From:* Till Rohrmann 
> *Sent:* Monday, April 26, 2021 10:24 AM
> *To:* dev 
> *Cc:* user@flink.apache.org ; Sonam Mandal <
> soman...@linkedin.com>
> *Subject:* Re: Task Local Recovery with mountable disks in the cloud
>
> Hi Sonam,
>
> sorry for the late reply. We were a bit caught in the midst of the feature
> freeze for the next major Flink release.
>
> In general, I think it is a very good idea to disaggregate the local state
> storage to make it reusable across TaskManager failures. However, it is
> also not trivial to do.
>
> Maybe let me first describe how the current task local recovery works and
> then see how we could improve it:
>
> Flink creates for every slot allocation an AllocationID. The AllocationID
> associates a slot on a TaskExecutor with a job and is also used for scoping
> the lifetime of a slot wrt a job (theoretically, one and the same slot
> could be used to fulfill multiple slot requests of the same job if the slot
> allocation is freed in between). Note that the AllocationID is a random ID
> and, thus, changes whenever the ResourceManager allocates a new slot on a
> TaskExecutor for a job.
>
> Task local recovery is effectively a state cache which is associated with
> an AllocationID. So for every checkpoint and every task, a TaskExecutor
> copies the state data and stores them in the task local recovery cache. The
> cache is maintained as long as the slot allocation is valid (e.g. the slot
> has not been freed by the JobMaster and the slot has not timed out). This
> makes the lifecycle management of the state data quite easy and makes sure
> that a process does not clutter local disks. On the JobMaster side, Flink
> remembers for every Execution, where it is deployed (it remembers the
> AllocationID). If a failover happens, then Flink tries to re-deploy the
> Executions into the slots they were running in before by matching the
> AllocationIDs.
>
> The reason why we scoped the state cache to an AllocationID was for
> simplicity and because we couldn't guarantee that a failed TaskExecutor X
> will be restarted on the same machine again and thereby having access to
> the same local disk as before. That's also why Flink deletes the cache
> directory when a slot is freed or when the TaskExecutor is shut down
> gracefully.
>
> With persistent volumes this changes and we can make the TaskExecutors
> "stateful" in the sense that we can reuse an already occupied cache. One
> rather simple idea could be to also persist the slot allocations of a
> TaskExecutor (which slot is allocated and what is its assigned
> AllocationID). This information could be used to re-initialize the
> TaskExecutor upon restart. That way, it does not have to register at the
> ResourceManager and wait for new slot allocations but could directly start
> offering its slots to the jobs it remembered. If the TaskExecutor cannot
> find the JobMasters for the respective jobs, it would then free the slots
> and clear the cache accordingly.
>
> This could work as long as the ResourceManager does not start new
> TaskExecutors whose slots could be used to recover the job. If this is a
> problem, then one needs to answer the question how long to wait for the old
> TaskEx

Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread Till Rohrmann
Yes, exposing an API to adjust the parallelism of individual operators is
definitely a good step towards the auto-scaling feature which we will
consider. The missing piece is persisting this information so that in case
of recovery you don't recover with a completely different parallelism.

I also agree that it will be very hard for Flink to decide on the best
parallelism in general. For that to do you usually need to know a bit about
the application logic. Hence, outsourcing this problem to the user who can
do better decisions is a very good idea.

The community will keep improving this feature so that with next releases
it should become more powerful.

Cheers,
Till

On Thu, May 6, 2021 at 2:38 PM vishalovercome  wrote:

> Thank you for answering all my questions. My suggestion would be to start
> off
> with exposing an API to allow dynamically changing operator parallelism as
> the users of flink will be better able to decide the right scaling policy.
> Once this functionality is there, its just a matter of providing policies
> (ratio based, throughput based, back-pressure based). The web UI could be
> used for setting parallelism as well.
>
> An analogy would be autoscaling provided by cloud providers. The features
> provided are:
> 1. Web UI for manually overriding parallelism (min, max, desired)
> 2. Metric based scaling policies
>
> It will be difficult for developers to think of a reasonable value for
> maxParallelism for each operator and like I explained above, sometimes even
> a small increase in parallelism is enough to bring things down. A UI /
> external policy based approach will allow for quick experimentation and
> fine
> tuning. I don't think it will be possible for flink developers to build one
> size fits all solution.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread Till Rohrmann
Hi Vishal,

thanks a lot for all your feedback on the new reactive mode. I'll try to
answer your questions.

0. In order to avoid confusion let me quickly explain a bit of terminology:
The reactive mode is the new feature that allows Flink to react to newly
available resources and to make use of them. In order to achieve this, it
uses the newly introduce AdaptiveScheduler which works by declaring the
required resources and adapting the job if it loses slots or receives slots
if the job is running below its configured parallelism. The
AdaptiveScheduler can also be used w/o the reactive mode which would give
you the capability that Flink would be able to continue executing your job
if your cluster loses TaskManagers (with the default scheduler, the job
would then fail with not enough resources). The difference between the
reactive and non-reactive mode is that in the reactive mode, Flink ignores
the configured parallelism value and tries to run the job at the
maxParallelism (so to say the maximum possible parallelism your job can be
run with).

1. The AdaptiveScheduler and thereby also the reactive mode uses a simple
slot distribution mechanism where every slot sharing group gets the same
number of slots. The parallelism of an operator in this slot sharing group
is then the minimum of the number of slots and the configured parallelism
(when using the reactive mode it would be the configured maxParallelism).
This is of course not ideal and can lead to unused slots. Moreover, it does
not support scaling different slot sharing groups independently. This is a
limitation of the current implementation.

2. With the reactive mode, external systems can control the parallelism
with which a job is running by starting and stopping TaskManagers.
Admittedly, this does not give you super fine-grained control over the
running operators. Being able to specify ratios for operators could be a
good extension.

3. Local recovery simply has not been implemented because of scoping
reasons. There is nothing fundamentally preventing this from working, it
just hasn't been implemented yet.

4. No, there are currently no separate metrics for restarts and rescaling
operations. I do see the usefulness of such a metric. However, for the
reactive mode where scaling down means that we kill a TaskManager, I am not
entirely sure how we will be able to distinguish this from any other reason
which can kill a TaskManager. The only way I could see this work is by
telling Flink about the killing of a TaskManager.

5. No, Flink is not able to do this kind of operator-based optimizations at
the moment. I think this could be possible once we have implemented the
next step of the reactive mode which is proper auto-scaling.

6. From a high-level perspective, there is not much difference between the
reactive mode and manually taking a savepoint and resuming the job from it,
and changing the parallelism. That's effectively also what Flink does
internally. The difference is that this operation is now automated and that
Flink can handle also situations where you don't get all the resources
after a restart where the manual approach would simply fail.

7. Setting the maxParallelism is only required for the reactive mode and if
you don't want to run an operator with the default maxParallelism value.
Per definition, the maxParallelism defines the maximum parallelism you can
run your operator with. Hence, if you set this value to something, then you
should be sure that you don't have to run your job with higher parallelism
than that. Note, that the reactive mode will try to run the operator with
this parallelism. However, if it has fewer resources, then it will run the
operators at lower parallelism. So the maxParallelism defines the upper
bound for the parallelism of your operator.

The reactive mode's intention is the first step towards more elastic
streaming pipelines and simplified operations. We see it as the foundation
for more advanced features such as true auto-scaling where each operator
can decide its parallelism. I hope this helps to understand the reactive
mode a bit better.

Cheers,
Till

On Wed, May 5, 2021 at 7:50 PM Ken Krugler 
wrote:

> Hi Vishal,
>
> WRT “bring down our internal services” - a common pattern with making
> requests to external services is to measure latency, and throttle (delay)
> requests in response to increased latency.
>
> You’ll see this discussed frequently on web crawling forums as an
> auto-tuning approach.
>
> Typically there’s a steady increase in latency as load on the service
> increases.
>
> The trick is throttling soon enough before you hit the “elbow” where a
> service effectively falls over.
>
> — Ken
>
>
>
> On May 5, 2021, at 9:08 AM, vishalovercome  wrote:
>
> Yes. While back-pressure would eventually ensure high throughput, hand
> tuning
> parallelism became necessary because the job with high source parallelism
> would immediately bring down our internal services - not giving enough time
> to flink to adju

Re: Presence of Jars in Flink reg security

2021-05-05 Thread Till Rohrmann
Hi Prasanna,

in the latest Flink version (1.13.0) I couldn't find these dependencies.
Which version of Flink are you looking at? What you could check is whether
one of these dependencies is contained in one of Flink's shaded
dependencies [1].

[1] https://github.com/apache/flink-shaded

Cheers,
Till

On Tue, May 4, 2021 at 3:00 PM Prasanna kumar 
wrote:

> Hi Flinksters,
>
> Our repo which is a maven based java project(flink) went through SCA scan 
> using WhiteSource tool and following are the HIGH severity issues reported. 
> The target vulnerable jar is not found when we build the dependency tree of 
> the project.
>
> Could any one let us know if flink uses these anywhere.
>
> +--++
> | Library  | Severity   |
> +==++
> | xercesImpl-2.9.1.jar | HIGH   |
> +--++
>   - Artifact ID: xercesImpl
>   - Group ID: xerces
>   - Library Version: 2.9.1
>   - Library Path: 
> /var/lib/jenkins/workspace/branch/latest/?/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar
>   - Dependency: None
>   - Type: MAVEN_ARTIFACT
>   - Description: XMLscanner.java in Apache Xerces2 Java Parser before 
> 2.12.0, as used in the Java Runtime Environment (JRE) in IBM Java 5.0 before 
> 5.0 SR16-FP3, 6 before 6 SR14, 6.0.1 before 6.0.1 SR6, and 7 before 7 SR5 as 
> well as Oracle Java SE 7u40 and earlier, Java SE 6u60 and earlier, Java SE 
> 5.0u51 and earlier, JRockit R28.2.8 and earlier, JRockit R27.7.6 and earlier, 
> Java SE Embedded 7u40 and earlier, and possibly other products allows remote 
> attackers to cause a denial of service via vectors related to XML attribute 
> names.
>   - Suggested Fix: Upgrade to version xerces:xercesImpl:Xerces-J_2_12_0
>
>
> +---++
> | Library   | Severity   |
> +===++
> | struts-core-1.3.8.jar | HIGH   |
> +---++
>   - Artifact ID: struts-core
>   - Group ID: org.apache.struts
>   - Library Version: 1.3.8
>   - Library Path: 
> /var/lib/jenkins/workspace/branchlatest/?/.m2/repository/org/apache/struts/struts-core/1.3.8/struts-core-1.3.8.jar
>   - Dependency: None
>   - Type: MAVEN_ARTIFACT
>   - Description: ActionServlet.java in Apache Struts 1 1.x through 1.3.10 
> does not properly restrict the Validator configuration, which allows remote 
> attackers to conduct cross-site scripting (XSS) attacks or cause a denial of 
> service via crafted input, a related issue to CVE-2015-0899.
>   - Suggested Fix: Replace or update the following file: 
> ActionServlet.java
>
> +--++
> | Library  | Severity   |
> +==++
> | plexus-utils-3.0.jar | HIGH   |
> +--++
>   - Artifact ID: plexus-utils
>   - Group ID: org.codehaus.plexus
>   - Library Version: 3.0
>   - Library Path: 
> /var/lib/jenkins/workspace/branchlatest/?/.m2/repository/org/codehaus/plexus/plexus-utils/3.0/plexus-utils-3.0.jar
>   - Dependency: None
>   - Type: MAVEN_ARTIFACT
>   - Description: Security vulnerability found in plexus-utils before 
> 3.0.24. XML injection found in XmlWriterUtil.java.
>   - Suggested Fix: Upgrade to version 3.0.24
>
> Thanks,
>
> Prasanna.
>
>


Re: Flink(1.12.2/scala 2.11) HA with Zk in kubernetes standalone mode deployment is not working

2021-05-03 Thread Till Rohrmann
Somewhere the system retrieves the address x.x.x.x:43092 which cannot be
connected to. Can you check that this points towards a valid Flink process?
Maybe it is some leftover information in the ZooKeeper from a previous run?
Maybe you can check what's written in the Znodes for
/leader/resource_manager_lock.
You can also enable DEBUG logs which will tell you a bit more about what is
happening.

Cheers,
Till

On Mon, May 3, 2021 at 7:12 PM Matthias Pohl  wrote:

> Hi Bhagi,
> Thanks for reaching out to the Flink community. The error the UI is
> showing is normal during an ongoing leader election. Additionally, the
> connection refused warnings seem to be normal according to other mailing
> list threads. Are you referring to the UI error as the issue you are
> facing?
>
> What puzzles me a bit are the timestamps of your provided logs. They do
> not seem to be fully aligned. Are there more logs that might indicate other
> issues?
>
> Matthias
>
> PS: I'm gonna add the user mailing list as this issue should usually be
> posted there.
>
> On Mon, May 3, 2021 at 5:21 PM bhagi@R  wrote:
>
>> Hi Team,
>>
>> I deployed kubernetes standalone deployment flink cluster with ZK HA, but
>> facing some issues, i have attached taskmanager and job manger logs.
>>
>> Can you please see the logs and help me solve this issue.
>>
>> UI is throwing this error:
>>
>> {"errors":["Service temporarily unavailable due to an ongoing leader
>> election. Please refresh."]}
>>
>> jobmanager.log
>> <
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/t1598/jobmanager.log>
>>
>> taskmanager.log
>> <
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/t1598/taskmanager.log>
>>
>> screenshot-1.png
>> <
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/t1598/screenshot-1.png>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
>


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-03 Thread Till Rohrmann
This is great news. Thanks a lot for being our release managers Dawid and
Guowei! And also thanks to everyone who has made this release possible :-)

Cheers,
Till

On Mon, May 3, 2021 at 5:46 PM vishalovercome  wrote:

> This is a very big release! Many thanks to the flink developers for their
> contributions to making Flink as good a framework that it is!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How to implement a window that emits events at regular intervals and on specific events

2021-04-30 Thread Till Rohrmann
Hi Tim,

The way session windows work is by first creating a new window for every
incoming event and then merging overlapping windows. That's why you see
that the end time of a window increases with every new incoming event. I
hope this explains what you are seeing. Apart from that, I think the
SessionTrigger looks good to me.

Cheers,
Till

On Fri, Apr 30, 2021 at 9:27 AM Tim Josefsson 
wrote:

> Thanks! I've managed to implement a working solution with the trigger API,
> but I'm not exactly sure why it works.
> I'm doing the following:
>
> DataStream summaries = env
> .addSource(kafkaConsumer, "playerEvents(Kafka)")
> .name("EP - Read player events from Kafka")
> .uid("EP - Read player events from Kafka")
> .map(json -> DECODER.decode(json, 
> TypeToken.of(HashMap.class))).returns(HashMap.class)
> .name("EP - Map Json to HashMap")
> .uid("EP - Map Json to HashMap")
> .filter((FilterFunction) event -> 
> !(event.get(Field.SESSION_ID) == null))
> .name("EP - Remove any events without sessionId since they shouldn't 
> generate sessions.")
> .uid("EP - Remove any events without sessionId since they shouldn't 
> generate sessions.")
> .filter((FilterFunction) event -> 
> event.get(Field.ACCOUNT_ID))
> .keyBy((KeySelector) event -> (String) 
> event.get(Field.SESSION_ID))
> 
> .window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(2)))
> .trigger(new SessionTrigger())
> .aggregate(new SummaryAggregator())
> .name("EP - Aggregate events into session summaries")
> .uid("EP - Aggregate events into session summaries");
>
> summaries.print();
>
> With the following trigger (omitting parts of the trigger):
>
> [ ... ]
> @Override
> public TriggerResult onElement(HashMap element, long timestamp, TimeWindow 
> window, TriggerContext ctx) throws Exception {
> ValueState firstSeen = ctx.getPartitionedState(
> new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
>
> // If an end event is detected, emit the content and purge
> if(endSession.contains(element.get(Field.EVENT_TYPE))) {
> return TriggerResult.FIRE_AND_PURGE;
> }
>
> if (firstSeen.value() == null) {
> ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + 
> 1L);
> ctx.registerProcessingTimeTimer(window.maxTimestamp());
> firstSeen.update(true);
> }
> logger.info("Current window end is {} for session {}", 
> window.maxTimestamp(), element.get(Field.SESSION_ID));
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) throws Exception {
> // Emit the current result every time the processing time trigger fires
> if (time == window.maxTimestamp()) {
> return TriggerResult.FIRE_AND_PURGE;
> } else {
> ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + 
> 1L);
> return TriggerResult.FIRE;
> }
> }
> [ ... ]
>
> So what I'm doing is setting the
> ctx.registerProcessingTimeTimer(window.maxTimestamp()); however I only
> set this once at the first event. But when testing it does work as I want
> and fires every ten seconds and the fires and purges only after no events
> have been received for 2 minutes (as specified in the SessionWindow). Is
> the processingTimeTimer being updated every time the window end time is
> increased (I noticed this happens in the background by Flink every time a
> new event arrives)?
>
> I'm happy with my solution, just trying to figure out how things work!
>
> Cheers,
> Tim
>
>
> On Thu, 29 Apr 2021 at 18:42, Till Rohrmann  wrote:
>
>> If you use the Trigger API, then you don't have to do anything special
>> for fault tolerance. When using the ProcessFunction, then you should use
>> Flink's state primitives to store your state (e.g. ValueState). This will
>> automatically checkpoint the state. In case of a failure Flink will always
>> resume from the latest successfully completed checkpoint.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 29, 2021 at 12:25 PM Tim Josefsson 
>> wrote:
>>
>>> Thanks for the suggestions! I'll see if I can implement something that
>>> works!
>>> A followup question, more related to state. If I implement either the
>>> custom trigger with or the process function, how wil

Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-30 Thread Till Rohrmann
Thanks for the stack traces Lars. With them I could confirm that the
problem should be fixed with FLINK-20114 [1]. The fixes will be contained
in the 1.12.4 and 1.13.0 release. Sorry for the inconveniences.

[1] https://issues.apache.org/jira/browse/FLINK-20114

Cheers,
Till

On Thu, Apr 29, 2021 at 8:30 PM Lars Skjærven  wrote:

> Unfortunately, I only have the truncated stack trace available (from the
> flink UI).
> L
>
>
> 2021-04-27 16:32:02
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: 
> progress source (4/6)#9.
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:924)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:885)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
> complete snapshot 10 for operator Source: progress source (4/6)#9. Failure 
> reason: Checkpoint was declined.
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
>   at 
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
>   ... 10 more
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>   at 
> org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:50)
>   at 
> org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:69)
>   at 
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.snapshotState(KafkaSourceReader.java:96)
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.snapshotState(SourceOperator.java:288)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
>   ... 20 more
>
>
>
> --
> *From:* Till Rohrmann 
> *Sent:* Thursday, April 29, 2021 18:44
> *To:* Lars Skjærven 
> *Cc:* Becket Qin ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on
> checkpointing
>
> Thanks for the additional information Lars. Could you maybe also share the
> full stack traces of the errors you see when the checkpoint fails?
>
> @Becket Qin  is it a known issue with the new Kafka
> sources trying to checkpoint negative offsets?
>
> Cheers,
> Till
>
> On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven 
> wrote:
>
> Thanks Till.
>
> Here is how we created the KafkaSource:
>
> val sensorSource = KafkaSource.builder[SensorInput]()
>   .setBoot

Re: [ANNOUNCE] Apache Flink 1.12.3 released

2021-04-29 Thread Till Rohrmann
Great to hear. Thanks a lot for being our release manager Arvid and to
everyone who has contributed to this release!

Cheers,
Till

On Thu, Apr 29, 2021 at 4:11 PM Arvid Heise  wrote:

> Dear all,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.12.3, which is the third bugfix release for the Apache Flink 1.12
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2021/04/29/release-1.12.3.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349691
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
>
> Your friendly release manager Arvid
>


Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Till Rohrmann
Yes you would have to use the operator state for this. This would have the
limitation that rescaling would probably not properly work. Also if the
assignment of shards to operators changes upon failure recovery it can
happen that it generates some incorrect results (some elements from shard 1
might end up on an operator which then consumes shard 2, for example).

Cheers,
Till

On Thu, Apr 29, 2021 at 2:51 PM Yegor Roganov  wrote:

> Hi Raghavendar, thank you for your reply.
>
> >
> stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new
> TestWindow());
> What would this stream be keyed on?
>
> On Thu, Apr 29, 2021 at 11:58 AM Raghavendar T S 
> wrote:
>
>> Hi Yegor
>>
>> The trigger implementation in Flink does not support  trigger by event
>> count and duration together. You can update the existing CountTrigger
>> implementation to support your functionality.
>> You can use the CustomTrigger.java (minor enhancement of CountTrigger) as
>> such which I have attached in this thread. TestWindow is the window
>> function which lets you receive the grouped events. You check the diff of
>> CountTrigger and CustomTrigger for your better understanding.
>>
>> *Usage*
>> stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new
>> TestWindow());
>>
>> Thank you
>> Raghavendar T S
>> merasplugins.com
>> teknosrc.com
>>
>>
>>
>>
>>
>>
>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
>>  Virus-free.
>> www.avast.com
>> <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail>
>> <#m_-5636128505747520398_m_2491226419129409135_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>
>> On Thu, Apr 29, 2021 at 1:04 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Yegor,
>>>
>>> If you want to use Flink's keyed windowing logic, then you need to
>>> insert a keyBy/shuffle operation because Flink currently cannot simply use
>>> the partitioning of the Kinesis shards. The reason is that Flink needs to
>>> group the keys into the correct key groups in order to support rescaling of
>>> the state.
>>>
>>> What you can do, though, is to create a custom operator or use a flatMap
>>> to build your own windowing operator. This operator could then use the
>>> partitioning of the Kinesis shards by simply collecting the events until
>>> either 30 seconds or 1000 events are observed.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Apr 28, 2021 at 11:12 AM Yegor Roganov 
>>> wrote:
>>>
>>>> Hello
>>>>
>>>> To learn Flink I'm trying to build a simple application where I want to
>>>> save events coming from Kinesis to S3.
>>>> I want to subscribe to each shard, and within each shard I want to
>>>> batch for 30 seconds, or until 1000 events are observed. These batches
>>>> should then be uploaded to S3.
>>>> What I don't understand is how to key my source on shard id, and do it
>>>> in a way that doesn't induce unnecessary shuffling.
>>>> Is this possible with Flink?
>>>>
>>>
>>
>> --
>> Raghavendar T S
>> www.teknosrc.com
>>
>


Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-29 Thread Till Rohrmann
Thanks for the additional information Lars. Could you maybe also share the
full stack traces of the errors you see when the checkpoint fails?

@Becket Qin  is it a known issue with the new Kafka
sources trying to checkpoint negative offsets?

Cheers,
Till

On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven  wrote:

> Thanks Till.
>
> Here is how we created the KafkaSource:
>
> val sensorSource = KafkaSource.builder[SensorInput]()
>   .setBootstrapServers(myConfig.kafkaBrokers)
>   .setGroupId(myConfig.kafkaGroupId)
>   .setTopics(myConfig.kafkaTopicIn)
>   .setDeserializer(new SensorInputPBDeserializationSchema)
>   .setStartingOffsets(OffsetsInitializer.earliest())
>   .build()
>
> The stream was built with
>
> env.fromSource(sensorSource , WatermarkStrategy.
> forMonotonousTimestamps(), "sensor events")
>
> The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer
> that does SensorInputPB.parseFrom(record.value()) and finally
> collector.collect(v)
>
> From here on we're doing a keyed windowed aggregation with .keyBy(...).
> window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new
> SensorEventAggregator)
>
> L
>
>
> --
> *From:* Till Rohrmann 
> *Sent:* Thursday, April 29, 2021 09:16
> *To:* Lars Skjærven ; Becket Qin <
> becket@gmail.com>
> *Cc:* user@flink.apache.org 
> *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on
> checkpointing
>
> Hi Lars,
>
> The KafkaSourceBuilder constructs the new KafkaSource which has not been
> fully hardened in 1.12.2. In fact, it should not be documented yet. I think
> you are running into an instability/bug of. The new Kafka source should be
> hardened a lot more in the 1.13.0 release.
>
> Could you tell us exactly how you created the KafkaSource so that we can
> verify that this problem has been properly fixed in the 1.13.0 release? I
> am also pulling in Becket who is the original author of this connector. He
> might be able to tell you more.
>
> Cheers,
> Till
>
> On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven 
> wrote:
>
> Hello,
> I ran into some issues when using the new KafkaSourceBuilder (running
> Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java
> 8). Initially it generated warnings on kafka configuration, but the job was
> able to consume and produce messages.
>
> The configuration 'client.id.prefix' was supplied but isn't a known config.   
> The configuration 'partition.discovery.interval.ms' was supplied but isn't a 
> known config.
>
>
> Finally the job crashed with a checkpointing error:
>
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: 
> progress source (4/6)#9.
> 
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
> complete snapshot 10 for operator Source: progress source (4/6)#9. Failure 
> reason: Checkpoint was declined.
> ...
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>
>
>
> Switching back to using FlinkKafkaConsumer, the warnings on the kafka
> config disapeared, and the job was able to checkpoint successfully.
>
> I'm wondering if the warnings and the errors are connected, and if there
> is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5
> ?
>
> Thanks,
> L
>
>


Re: How to implement a window that emits events at regular intervals and on specific events

2021-04-29 Thread Till Rohrmann
If you use the Trigger API, then you don't have to do anything special for
fault tolerance. When using the ProcessFunction, then you should use
Flink's state primitives to store your state (e.g. ValueState). This will
automatically checkpoint the state. In case of a failure Flink will always
resume from the latest successfully completed checkpoint.

Cheers,
Till

On Thu, Apr 29, 2021 at 12:25 PM Tim Josefsson 
wrote:

> Thanks for the suggestions! I'll see if I can implement something that
> works!
> A followup question, more related to state. If I implement either the
> custom trigger with or the process function, how will they handle crashes
> and such. So if I for instance have a checkpointing interval of 10s will
> the job recover from the last checkpoint with all the summaries as they
> were at that point. Or do I have to implement specific ValueStates in both
> cases?
>
> On Thu, 29 Apr 2021 at 10:25, Till Rohrmann  wrote:
>
>> Hi Tim,
>>
>> I think you could use Flink's trigger API [1] to implement a trigger
>> which fires when it sees a certain event or after some time.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson 
>> wrote:
>>
>>> Hello!
>>>
>>> I'm trying to figure out how to implement a window that will emit events
>>> at regular intervals or when a specific event is encountered.
>>>
>>> A bit of background. I have a stream of events from devices that will
>>> send events to our system whenever a user watches a video. These events
>>> include a unique id (sessionId) shared by all events of the same same
>>> session so I want to key my stream on this. After that I want to aggregate
>>> all the events into a session summary and this summary I want to emit every
>>> 5 minutes however I still want to keep the summary in the window (in case
>>> more events for that session arrives). However if I were to receive an end
>>> event (sent by the device once a user stops watching the video) I want to
>>> emit the summary and remove it from the window.
>>>
>>> Is it possible to do this with one of the existing windows together with
>>> a trigger or in some other way? Been trying to figure it out by reading the
>>> docs but haven't gotten any wiser so turning to the mailing list for help.
>>>
>>> Best regards,
>>> Tim
>>>
>>
>
> --
>
> *Tim Josefsson*
> [image: Webstep GPtW] <http://www.webstep.se/>
> mobil   +46 (0) 707 81 91 12
> telefon +46 (0) 8 21 40 70
>
> tim.josefs...@webstep.se
> *webstep.se <http://www.webstep.se/>*
> Suttungs gränd 2
> 753 19 Uppsala
> Stockholm | Uppsala | Malmö | Sundsvall | Oslo
> Bergen | Stavanger | Trondheim | Kristiansand
> [image: LinkedIn] <http://www.linkedin.com/company/webstep-ab> [image:
> Facebook] <http://www.facebook.com/webstepAB> [image: Facebook]
> <http://www.instagram.com/webstep_sverige>
>


Re: Best practice for packaging and deploying Flink jobs on K8S

2021-04-29 Thread Till Rohrmann
Alright. Then let's see what Dian recommends to do.

Cheers,
Till

On Thu, Apr 29, 2021 at 9:25 AM Sumeet Malhotra 
wrote:

> Hi Till,
>
> There’s no problem with the documented approach. I was looking if there
> were any standardized ways of organizing, packaging and deploying Python
> code on a Flink cluster.
>
> Thanks,
> Sumeet
>
>
>
> On Thu, Apr 29, 2021 at 12:37 PM Till Rohrmann 
> wrote:
>
>> Hi Sumeet,
>>
>> Is there a problem with the documented approaches on how to submit the
>> Python program (not working) or are you asking in general? Given the
>> documentation, I would assume that you can configure the requirements.txt
>> via `set_python_requirements`.
>>
>> I am also pulling in Dian who might be able to tell you more about the
>> Python deployment options.
>>
>> If you are not running on a session cluster, then you can also create a
>> K8s image which contains your user code. That way you ship your job when
>> deploying the cluster.
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 28, 2021 at 10:17 AM Sumeet Malhotra <
>> sumeet.malho...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a PyFlink job that consists of:
>>>
>>>- Multiple Python files.
>>>- Multiple 3rdparty Python dependencies, specified in a
>>>`requirements.txt` file.
>>>- A few Java dependencies, mainly for external connectors.
>>>- An overall job config YAML file.
>>>
>>> Here's a simplified structure of the code layout.
>>>
>>> flink/
>>> ├── deps
>>> │   ├── jar
>>> │   │   ├── flink-connector-kafka_2.11-1.12.2.jar
>>> │   │   └── kafka-clients-2.4.1.jar
>>> │   └── pip
>>> │   └── requirements.txt
>>> ├── conf
>>> │   └── job.yaml
>>> └── job
>>> ├── some_file_x.py
>>> ├── some_file_y.py
>>> └── main.py
>>>
>>> I'm able to execute this job running it locally i.e. invoking something
>>> like:
>>>
>>> python main.py --config 
>>>
>>> I'm loading the jars inside the Python code, using env.add_jars(...).
>>>
>>> Now, the next step is to submit this job to a Flink cluster running on
>>> K8S. I'm looking for any best practices in packaging and specifying
>>> dependencies that people tend to follow. As per the documentation here [1],
>>> various Python files, including the conf YAML, can be specified using the
>>> --pyFiles option and Java dependencies can be specified using --jarfile
>>> option.
>>>
>>> So, how can I specify 3rdparty Python package dependencies? According to
>>> another piece of documentation here [2], I should be able to specify the
>>> requirements.txt directly inside the code and submit it via the --pyFiles
>>> option. Is that right?
>>>
>>> Are there any other best practices folks use to package/submit jobs?
>>>
>>> Thanks,
>>> Sumeet
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program
>>>
>>


Re: Taskmanager killed often after migrating to flink 1.12

2021-04-29 Thread Till Rohrmann
Great, thanks for the update.

On Wed, Apr 28, 2021 at 7:08 PM Sambaran  wrote:

> Hi Till,
>
> Thank you for the response, we are currently running flink with an
> increased memory usage, so far the taskmanager is working fine, we will
> check if there is any further issue and will update you.
>
> Regards
> Sambaran
>
> On Wed, Apr 28, 2021 at 5:33 PM Till Rohrmann 
> wrote:
>
>> Hi Sambaran,
>>
>> could you also share the cause why the checkpoints could not be discarded
>> with us?
>>
>> With Flink 1.10, we introduced a stricter memory model for the
>> TaskManagers. That could be a reason why you see more TaskManagers being
>> killed by the underlying resource management system. You could maybe check
>> whether your resource management system logs that some containers/pods are
>> exceeding their memory limitations. If this is the case, then you should
>> give your Flink processes a bit more memory [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html
>>
>> Cheers,
>> Till
>>
>> On Tue, Apr 27, 2021 at 6:48 PM Sambaran  wrote:
>>
>>> Hi there,
>>>
>>> We have recently migrated to flink 1.12 from 1.7, although the jobs are
>>> running fine, sometimes the task manager is getting killed (much frequently
>>> 2-3 times a day).
>>>
>>> Logs:
>>> INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
>>> RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>>>
>>> While checking more logs we see flink not able to discard old checkpoints
>>> org.apache.flink.runtime.checkpoint.CheckpointsCleaner   [] - Could
>>> not discard completed checkpoint 173.
>>>
>>> We are not sure of what is the reason here, has anyone faced this before?
>>>
>>> Regards
>>> Sambaran
>>>
>>


Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-29 Thread Till Rohrmann
Hi Lars,

I think this is a duplicate message. Let's continue the discussion on your
original message.

Cheers,
Till

On Wed, Apr 28, 2021 at 8:50 PM Lars Skjærven  wrote:

> Hello,
> I ran into an issue when using the new KafkaSourceBuilder (running Flink
> 1.12.2, scala 2.12.13, on ververica platform in a container with java 8).
> Initially it generated warnings on kafka configuration, but the job was
> able to consume and produce messages.
>
> The configuration 'client.id.prefix' was supplied but isn't a known config.
> The configuration 'partition.discovery.interval.ms' was supplied but isn't a 
> known config.
>
>
> Finally the job crashed with a checkpointing error:
>
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: 
> progress source (4/6)#9.
> 
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
> complete snapshot 10 for operator Source: progress source (4/6)#9. Failure 
> reason: Checkpoint was declined.
> ...
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>
>
>
> Switching back to using FlinkKafkaConsumer, the warnings on the kafka
> config disappeared, and the job was able to checkpoint successfully.
>
> I'm wondering if the warnings and the errors are related, and if there is
> a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ?
>
> Thanks,
> L
>


Re: Queryable State unavailable after Kubernetes HA State cleanup

2021-04-29 Thread Till Rohrmann
Hi Sandeep,

I don't fully understand the problematic scenario yet. What exactly is the
HA state maintained by Kubernetes in S3?

Queryable state works by asking for the current state of an operator. If
you use asQueryableState, then you create a reducing state which appends
all stream elements. This should then be stored in the configured state
backend (in your case probably RocksDB). For checkpoints, this state is
stored periodically on S3.

How is the query operation failing? Did you check the cluster logs whether
they contain some suspicious things?

Cheers,
Till

On Wed, Apr 28, 2021 at 5:26 PM Sandeep khanzode 
wrote:

> Hello,
>
> Stuck at this time. Any help will be appreciated.
>
>
> I am able to create a queryable state and also query the state. Everything
> works correctly.
>
> KeyedStream, Key> stream = sourceStream.keyBy(t2 -> t2.f0);
> stream.asQueryableState("queryableVO");
>
>
> I deploy this on a Kubernetes cluster with Flink standalone-job and
> KubernetesHAFactory.
>
> There are two states created. One is the operator and keyed state which is
> stored in a RocksDB Backend in S3.
>
> The other is the HA state maintained by Kubernetes in S3.
>
> If anything changes in the job main class (like removing operators etc.),
> the upgrade does not work seamlessly and I have to delete the HA state from
> S3.
>
> If I delete the S3 state for HA, the queryable state becomes unusable i.e.
> I cannot query from the state anymore. Interestingly, the other operator
> and keyed states in RocksDB backend are still accessible! Just not the
> queryable state.
>
> When I check the UI, I see the checkpointed state for the queryable stream
> has a data size of approx ~50-60KB. But I still cannot query it.
>
>
> Thanks,
> Sandeep
>
>


Re: How to implement a window that emits events at regular intervals and on specific events

2021-04-29 Thread Till Rohrmann
Hi Tim,

I think you could use Flink's trigger API [1] to implement a trigger which
fires when it sees a certain event or after some time.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers

Cheers,
Till

On Wed, Apr 28, 2021 at 5:25 PM Tim Josefsson 
wrote:

> Hello!
>
> I'm trying to figure out how to implement a window that will emit events
> at regular intervals or when a specific event is encountered.
>
> A bit of background. I have a stream of events from devices that will send
> events to our system whenever a user watches a video. These events include
> a unique id (sessionId) shared by all events of the same same session so I
> want to key my stream on this. After that I want to aggregate all the
> events into a session summary and this summary I want to emit every 5
> minutes however I still want to keep the summary in the window (in case
> more events for that session arrives). However if I were to receive an end
> event (sent by the device once a user stops watching the video) I want to
> emit the summary and remove it from the window.
>
> Is it possible to do this with one of the existing windows together with a
> trigger or in some other way? Been trying to figure it out by reading the
> docs but haven't gotten any wiser so turning to the mailing list for help.
>
> Best regards,
> Tim
>


Re: Flink Resuming From Checkpoint With "-s" FAILURE

2021-04-29 Thread Till Rohrmann
Hi Zachary,

How did you configure the Kafka connector to commit the offsets
(periodically, on checkpoint)? One explanation for the graphs you showed is
that you enabled periodic committing of the offsets. If this
automatic commit happens between two checkpoints and you later fall back to
the earlier checkpoint, it should be possible that you see with the next
periodic committing of the offsets that it dropped. Note, that Flink does
not rely on the offset committed to the Kafka broker for fault tolerance.
It stores the actual offset in its internal state.

In order to better understand the scenario let me try to summarize it.
Periodically you restart your infrastructure and then resume the Flink job
from the latest checkpoint. You did this on the 19th of April. Then on the
27th of April you created a savepoint from the job you restarted on the
19th but was running fine since then. And then you submitted a new job
resuming from this savepoint. And all of a sudden, this new job started to
consume data from Kafka starting from the 19th of April. Is this correct?
If this happened like described, then the Flink job seems to not have made
a lot of progress since you restarted it. Without the logs it is really
hard to tell what could be the cause.

Cheers,
Till

On Wed, Apr 28, 2021 at 4:36 PM Zachary Manno 
wrote:

> Hello,
> I am running Flink version 1.10 on Amazon EMR. We use RocksDB/S3 for
> state. I have "Persist Checkpoints Externally" enabled. Periodically I must
> tear down the current infrastructure and bring it back up. To do this, I
> terminate the EMR, bring up a fresh EMR cluster, and then I resume the
> Flink job from the latest checkpoint path in S3 using the "-s" method here:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html#resuming-from-savepoints
>
> I last did this operation on April 19. Then, on April 27 I deployed a new
> version of the code only, using savepointing. This caused a production
> incident because it turned out that on April 19th one of the Kafka
> partition offsets was not committed currently somehow during the resuming
> from checkpoint. When the new code was deployed on the 27th a backfill of
> Kafka messages came in from the 19th to the 27th which caused the issue.
>
> I am attaching screenshots of Datadog metrics for the Kafka consumer
> metrics Flink provides. One graph is:
>
> ".KafkaConsumer.topic.{topic_name}.partition.{partition_number}.committedOffsets"
>
> And the next is:
>
> "KafkaConsumer.topic.{topic_name}.partition.{partition_number}.currentOffsets"
>
>
> The light blue line is partition 3 and that is the one that caused the
> issue. Does anyone have any insight into what could have happened? And what
> I can do to prevent this in the future? Unfortunately since the EMR was
> terminated I cannot provide the full logs. I am able to search for keywords
> or get sections since we have external Splunk logging but cannot get full
> files.
>
> Thanks for any help!!
>
> [image: Committed_Offsets.png]
> [image: Current_Offsets.png]
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>


Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Till Rohrmann
Hi Yegor,

If you want to use Flink's keyed windowing logic, then you need to insert a
keyBy/shuffle operation because Flink currently cannot simply use the
partitioning of the Kinesis shards. The reason is that Flink needs to group
the keys into the correct key groups in order to support rescaling of the
state.

What you can do, though, is to create a custom operator or use a flatMap to
build your own windowing operator. This operator could then use the
partitioning of the Kinesis shards by simply collecting the events until
either 30 seconds or 1000 events are observed.

Cheers,
Till

On Wed, Apr 28, 2021 at 11:12 AM Yegor Roganov  wrote:

> Hello
>
> To learn Flink I'm trying to build a simple application where I want to
> save events coming from Kinesis to S3.
> I want to subscribe to each shard, and within each shard I want to batch
> for 30 seconds, or until 1000 events are observed. These batches should
> then be uploaded to S3.
> What I don't understand is how to key my source on shard id, and do it in
> a way that doesn't induce unnecessary shuffling.
> Is this possible with Flink?
>


Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-29 Thread Till Rohrmann
Hi Lars,

The KafkaSourceBuilder constructs the new KafkaSource which has not been
fully hardened in 1.12.2. In fact, it should not be documented yet. I think
you are running into an instability/bug of. The new Kafka source should be
hardened a lot more in the 1.13.0 release.

Could you tell us exactly how you created the KafkaSource so that we can
verify that this problem has been properly fixed in the 1.13.0 release? I
am also pulling in Becket who is the original author of this connector. He
might be able to tell you more.

Cheers,
Till

On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven  wrote:

> Hello,
> I ran into some issues when using the new KafkaSourceBuilder (running
> Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java
> 8). Initially it generated warnings on kafka configuration, but the job was
> able to consume and produce messages.
>
> The configuration 'client.id.prefix' was supplied but isn't a known config.   
> The configuration 'partition.discovery.interval.ms' was supplied but isn't a 
> known config.
>
>
> Finally the job crashed with a checkpointing error:
>
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: 
> progress source (4/6)#9.
> 
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
> complete snapshot 10 for operator Source: progress source (4/6)#9. Failure 
> reason: Checkpoint was declined.
> ...
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>
>
>
> Switching back to using FlinkKafkaConsumer, the warnings on the kafka
> config disapeared, and the job was able to checkpoint successfully.
>
> I'm wondering if the warnings and the errors are connected, and if there
> is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5
> ?
>
> Thanks,
> L
>
>


Re: Best practice for packaging and deploying Flink jobs on K8S

2021-04-29 Thread Till Rohrmann
Hi Sumeet,

Is there a problem with the documented approaches on how to submit the
Python program (not working) or are you asking in general? Given the
documentation, I would assume that you can configure the requirements.txt
via `set_python_requirements`.

I am also pulling in Dian who might be able to tell you more about the
Python deployment options.

If you are not running on a session cluster, then you can also create a K8s
image which contains your user code. That way you ship your job when
deploying the cluster.

Cheers,
Till

On Wed, Apr 28, 2021 at 10:17 AM Sumeet Malhotra 
wrote:

> Hi,
>
> I have a PyFlink job that consists of:
>
>- Multiple Python files.
>- Multiple 3rdparty Python dependencies, specified in a
>`requirements.txt` file.
>- A few Java dependencies, mainly for external connectors.
>- An overall job config YAML file.
>
> Here's a simplified structure of the code layout.
>
> flink/
> ├── deps
> │   ├── jar
> │   │   ├── flink-connector-kafka_2.11-1.12.2.jar
> │   │   └── kafka-clients-2.4.1.jar
> │   └── pip
> │   └── requirements.txt
> ├── conf
> │   └── job.yaml
> └── job
> ├── some_file_x.py
> ├── some_file_y.py
> └── main.py
>
> I'm able to execute this job running it locally i.e. invoking something
> like:
>
> python main.py --config 
>
> I'm loading the jars inside the Python code, using env.add_jars(...).
>
> Now, the next step is to submit this job to a Flink cluster running on
> K8S. I'm looking for any best practices in packaging and specifying
> dependencies that people tend to follow. As per the documentation here [1],
> various Python files, including the conf YAML, can be specified using the
> --pyFiles option and Java dependencies can be specified using --jarfile
> option.
>
> So, how can I specify 3rdparty Python package dependencies? According to
> another piece of documentation here [2], I should be able to specify the
> requirements.txt directly inside the code and submit it via the --pyFiles
> option. Is that right?
>
> Are there any other best practices folks use to package/submit jobs?
>
> Thanks,
> Sumeet
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html#python-dependency-in-python-program
>


Re: Exception handling

2021-04-28 Thread Till Rohrmann
Hi Jacob,

one of the contracts Flink has is that if a UDF throws an exception then
this means that it has failed and that it needs recovery. Hence, it is the
responsibility of the user to make sure that tolerable exceptions do not
bubble up. If you have dirty input data then it might make sense to put a
sanitization operator directly after the sources which filters out invalid
data so that downstream operators can assume that the data is correct.

For the question about Map you can either convert it to a FlatMap operation
which can output arbitrarily many elements (also zero) or you introduce
something like an Optional type which can represent a null value and a
non-null value. This is something you can do in the user code.

I hope this helps a bit.

Cheers,
Till

On Tue, Apr 27, 2021 at 7:30 PM Jacob Sevart  wrote:

> How do we get uncaught exceptions in operators to skip the problematic
> messages, rather than crash the entire job? Is there an easier or less
> mistake-prone way to do this than wrapping every operator method in
> try/catch?
>
> And what to do about Map? Since it has to return something, we're either
> returning null and then catching it with a *.filter(Objects.nonNull)* in
> the next operator, or converting it to FlatMap. FlatMap conversion is
> annoying, because then we need to mock the Collector for testing.
>
> Obviously it would be best to sanitize inputs so that exceptions don't
> occur, but we've recently encountered some setbacks in the game of
> whack-a-mole with pathological messages, and are hoping to mitigate the
> losses when these do occur.
>
> Jacob
>


Re: Taskmanager killed often after migrating to flink 1.12

2021-04-28 Thread Till Rohrmann
Hi Sambaran,

could you also share the cause why the checkpoints could not be discarded
with us?

With Flink 1.10, we introduced a stricter memory model for the
TaskManagers. That could be a reason why you see more TaskManagers being
killed by the underlying resource management system. You could maybe check
whether your resource management system logs that some containers/pods are
exceeding their memory limitations. If this is the case, then you should
give your Flink processes a bit more memory [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html

Cheers,
Till

On Tue, Apr 27, 2021 at 6:48 PM Sambaran  wrote:

> Hi there,
>
> We have recently migrated to flink 1.12 from 1.7, although the jobs are
> running fine, sometimes the task manager is getting killed (much frequently
> 2-3 times a day).
>
> Logs:
> INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
> RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
>
> While checking more logs we see flink not able to discard old checkpoints
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner   [] - Could
> not discard completed checkpoint 173.
>
> We are not sure of what is the reason here, has anyone faced this before?
>
> Regards
> Sambaran
>


Re: Task Local Recovery with mountable disks in the cloud

2021-04-26 Thread Till Rohrmann
Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state
storage to make it reusable across TaskManager failures. However, it is
also not trivial to do.

Maybe let me first describe how the current task local recovery works and
then see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID
associates a slot on a TaskExecutor with a job and is also used for scoping
the lifetime of a slot wrt a job (theoretically, one and the same slot
could be used to fulfill multiple slot requests of the same job if the slot
allocation is freed in between). Note that the AllocationID is a random ID
and, thus, changes whenever the ResourceManager allocates a new slot on a
TaskExecutor for a job.

Task local recovery is effectively a state cache which is associated with
an AllocationID. So for every checkpoint and every task, a TaskExecutor
copies the state data and stores them in the task local recovery cache. The
cache is maintained as long as the slot allocation is valid (e.g. the slot
has not been freed by the JobMaster and the slot has not timed out). This
makes the lifecycle management of the state data quite easy and makes sure
that a process does not clutter local disks. On the JobMaster side, Flink
remembers for every Execution, where it is deployed (it remembers the
AllocationID). If a failover happens, then Flink tries to re-deploy the
Executions into the slots they were running in before by matching the
AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for
simplicity and because we couldn't guarantee that a failed TaskExecutor X
will be restarted on the same machine again and thereby having access to
the same local disk as before. That's also why Flink deletes the cache
directory when a slot is freed or when the TaskExecutor is shut down
gracefully.

With persistent volumes this changes and we can make the TaskExecutors
"stateful" in the sense that we can reuse an already occupied cache. One
rather simple idea could be to also persist the slot allocations of a
TaskExecutor (which slot is allocated and what is its assigned
AllocationID). This information could be used to re-initialize the
TaskExecutor upon restart. That way, it does not have to register at the
ResourceManager and wait for new slot allocations but could directly start
offering its slots to the jobs it remembered. If the TaskExecutor cannot
find the JobMasters for the respective jobs, it would then free the slots
and clear the cache accordingly.

This could work as long as the ResourceManager does not start new
TaskExecutors whose slots could be used to recover the job. If this is a
problem, then one needs to answer the question how long to wait for the old
TaskExecutors to come back and reusing their local state vs. starting
quickly a fresh instance but having to restore state remotely.

An alternative solution proposal which is probably more powerful albeit
also more complex would be to make the cache information explicit when
registering the TaskExecutor at the ResourceManager and later offering
slots to the JobMaster. For example, the TaskExecutor could tell the
ResourceManager which states it has locally cached (it probably needs to
contain key group ranges for every stored state) and this information could
be used to decide from which TaskExecutor to allocate slots for a job.
Similarly on the JobMaster side we could use this information to calculate
the best mapping between Executions and slots. I think that mechanism could
better deal with rescaling events where there is no perfect match between
Executions and slots because of the changed key group ranges.

So to answer your question: There is currently no way to preserve
AllocationIDs across restarts. However, we could use the persistent volume
to store this information so that we can restore it on restart of a
TaskExecutor. This could enable task local state recovery for cases where
we lose a TaskExecutor and restart it with the same persistent volume.

Cheers,
Till

On Wed, Apr 21, 2021 at 7:26 PM Stephan Ewen  wrote:

> /cc dev@flink
>
>
> On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal 
> wrote:
>
> > Hello,
> >
> > We've been experimenting with Task-local recovery using Kubernetes. We
> > have a way to specify mounting the same disk across Task Manager
> > restarts/deletions for when the pods get recreated. In this scenario, we
> > noticed that task local recovery does not kick in (as expected based on
> the
> > documentation).
> >
> > We did try to comment out the code on the shutdown path which cleaned up
> > the task local directories before the pod went down / was restarted. We
> > noticed that remote recovery kicked in even though the task local state
> was
> > present. I noticed that the slot IDs changed, and was wondering if this
> is
> > the main reason that the t

Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Till Rohrmann
I think you are right that the `GenericCLI` should be the first choice.
>From the top of my head I do not remember why FlinkYarnSessionCli is still
used. Maybe it is in order to support some Yarn specific cli option
parsing. I assume it is either an oversight or some parsing has not been
completely migrated to the GenericCLI.

Cheers,
Till

On Mon, Apr 26, 2021 at 11:07 AM Yangze Guo  wrote:

> Hi, Till,
>
> I agree that we need to resolve the issue by overriding the
> configuration before selecting the CustomCommandLines. However, IIUC,
> after FLINK-15852 the GenericCLI should always be the first choice.
> Could you help me to understand why the FlinkYarnSessionCli can be
> activated?
>
>
> Best,
> Yangze Guo
>
> On Mon, Apr 26, 2021 at 4:48 PM Till Rohrmann 
> wrote:
> >
> > Hi Tony,
> >
> > I think you are right that Flink's cli does not behave super consistent
> at the moment. Case 2. should definitely work because `-t yarn-application`
> should overwrite what is defined in the Flink configuration. The problem
> seems to be that we don't resolve the configuration wrt the specified
> command line options before calling into `CustomCommandLine.isActive`. If
> we parsed first the command line configuration options which can overwrite
> flink-conf.yaml options and then replaced them, then the custom command
> lines (assuming that they use the Configuration as the ground truth) should
> behave consistently.
> >
> > For your questions:
> >
> > 1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on
> purpose when introducing the yarn application mode.
> > 2. See answer 1.
> >
> > I think it is a good idea to extend the description of the config option
> `execution.target`. Do you want to create a ticket and a PR for it?
> >
> > Cheers,
> > Till
> >
> > On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:
> >>
> >> Hi, Tony.
> >>
> >> What is the version of your flink-dist. AFAIK, this issue should be
> >> addressed in FLINK-15852[1]. Could you give the client log of case
> >> 2(set the log level to DEBUG would be better).
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-15852
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei 
> wrote:
> >> >
> >> > Hi Experts,
> >> >
> >> > I recently tried to run yarn-application mode on my yarn cluster, and
> I had a problem related to configuring `execution.target`.
> >> > After reading the source code and doing some experiments, I found
> that there should be some room of improvement for `FlinkYarnSessionCli` or
> `AbstractYarnCli`.
> >> >
> >> > My experiments are:
> >> >
> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
> run `flink run-application -t yarn-application`: run job successfully.
> >> >
> >> > `FlinkYarnSessionCli` is not active
> >> > `GenericCLI` is active
> >> >
> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
> `flink run-application -t yarn-application`: run job failed
> >> >
> >> > failed due to `ClusterDeploymentException` [1]
> >> > `FlinkYarnSessionCli` is active
> >> >
> >> > setting `execution.target: yarn-application` in flink-conf.yaml and
> run `flink run -t yarn-per-job`: run job successfully.
> >> >
> >> > `FlinkYarnSessionCli` is not active
> >> > `GenericCLI` is active
> >> >
> >> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
> `flink run -t yarn-per-job`: run job successfully.
> >> >
> >> > `FlinkYarnSessionCli` is active
> >> >
> >> > From `AbstractYarnCli#isActive` [2] and
> `FlinkYarnSessionCli#isActive` [3], `FlinkYarnSessionCli` will be active
> when `execution.target` is specified with `yarn-per-job` or `yarn-session`.
> >> >
> >> > According to the flink official document [4], I thought the 2nd
> experiment should also work well, but it didn't.
> >> >>
> >> >> The --target will overwrite the execution.target specified in the
> config/flink-config.yaml.
> >> >
> >> >
> >> > The root cause is that `FlinkYarnSessionCli` only overwrite the
> `execution.target` with `yarn-session` or `yarn-per-job` [5], but no
> `yarn-application`.
> >> > So, my question is
> >> >
> >> > should we use `FlinkYarnSessionCli` in

Re: when should `FlinkYarnSessionCli` be included for parsing CLI arguments?

2021-04-26 Thread Till Rohrmann
Hi Tony,

I think you are right that Flink's cli does not behave super consistent at
the moment. Case 2. should definitely work because `-t yarn-application`
should overwrite what is defined in the Flink configuration. The problem
seems to be that we don't resolve the configuration wrt the specified
command line options before calling into `CustomCommandLine.isActive`. If
we parsed first the command line configuration options which can overwrite
flink-conf.yaml options and then replaced them, then the custom command
lines (assuming that they use the Configuration as the ground truth) should
behave consistently.

For your questions:

1. I am not 100% sure. I think the FlinkYarnSessionCli wasn't used on
purpose when introducing the yarn application mode.
2. See answer 1.

I think it is a good idea to extend the description of the config option
`execution.target`. Do you want to create a ticket and a PR for it?

Cheers,
Till

On Mon, Apr 26, 2021 at 8:37 AM Yangze Guo  wrote:

> Hi, Tony.
>
> What is the version of your flink-dist. AFAIK, this issue should be
> addressed in FLINK-15852[1]. Could you give the client log of case
> 2(set the log level to DEBUG would be better).
>
> [1] https://issues.apache.org/jira/browse/FLINK-15852
>
> Best,
> Yangze Guo
>
> On Sun, Apr 25, 2021 at 11:33 AM Tony Wei  wrote:
> >
> > Hi Experts,
> >
> > I recently tried to run yarn-application mode on my yarn cluster, and I
> had a problem related to configuring `execution.target`.
> > After reading the source code and doing some experiments, I found that
> there should be some room of improvement for `FlinkYarnSessionCli` or
> `AbstractYarnCli`.
> >
> > My experiments are:
> >
> > setting `execution.target: yarn-application` in flink-conf.yaml and run
> `flink run-application -t yarn-application`: run job successfully.
> >
> > `FlinkYarnSessionCli` is not active
> > `GenericCLI` is active
> >
> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
> `flink run-application -t yarn-application`: run job failed
> >
> > failed due to `ClusterDeploymentException` [1]
> > `FlinkYarnSessionCli` is active
> >
> > setting `execution.target: yarn-application` in flink-conf.yaml and run
> `flink run -t yarn-per-job`: run job successfully.
> >
> > `FlinkYarnSessionCli` is not active
> > `GenericCLI` is active
> >
> > setting `execution.target: yarn-per-job` in flink-conf.yaml and run
> `flink run -t yarn-per-job`: run job successfully.
> >
> > `FlinkYarnSessionCli` is active
> >
> > From `AbstractYarnCli#isActive` [2] and `FlinkYarnSessionCli#isActive`
> [3], `FlinkYarnSessionCli` will be active when `execution.target` is
> specified with `yarn-per-job` or `yarn-session`.
> >
> > According to the flink official document [4], I thought the 2nd
> experiment should also work well, but it didn't.
> >>
> >> The --target will overwrite the execution.target specified in the
> config/flink-config.yaml.
> >
> >
> > The root cause is that `FlinkYarnSessionCli` only overwrite the
> `execution.target` with `yarn-session` or `yarn-per-job` [5], but no
> `yarn-application`.
> > So, my question is
> >
> > should we use `FlinkYarnSessionCli` in case 2?
> > if we should, how we can improve `FlinkYarnSessionCli` so that we can
> overwrite `execution.target` via `--target`?
> >
> > and one more improvement, the config description for `execution.target`
> [6] should include `yarn-application` as well.
> >
> > [1]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L439-L447
> > [2]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/AbstractYarnCli.java#L54-L66
> > [3]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L373-L377
> > [4]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#selecting-deployment-targets
> > [5]
> https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L397-L413
> > [6]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java#L41-L46
> >
> > best regards,
> >
>


Re: How to know if task-local recovery kicked in for some nodes?

2021-04-12 Thread Till Rohrmann
Hi Sonam,

The state size probably depends a bit on your infrastructure. Assuming you
have 1 GBps network connection and local SSDs, then I guess you should see
a difference if your local state size is  > 1 GB.

Cheers,
Till

On Wed, Apr 7, 2021 at 1:46 PM Sonam Mandal  wrote:

> Hi Till and Dhanesh,
>
> Thanks for the insights into both on how to check that this kicks in and
> on the expected behavior. My understanding too was that if multiple TMs are
> used for the job, any TMs that don’t go down can take advantage of local
> recovery.
>
> Do you have any insights on a good minimum state size we should experiment
> with to check recovery time differences between the two modes?
>
> Thanks,
> Sonam
> --
> *From:* dhanesh arole 
> *Sent:* Wednesday, April 7, 2021 3:43:11 AM
> *To:* Till Rohrmann 
> *Cc:* Sonam Mandal ; Tzu-Li (Gordon) Tai <
> tzuli...@apache.org>; user@flink.apache.org 
> *Subject:* Re: How to know if task-local recovery kicked in for some
> nodes?
>
> Hi Till,
>
> You are right. To give you more context about our setup, we are running 1
> task slot per task manager and total number of task manager replicas equal
> to job parallelism. The issue actually exacerbates during rolling
> deployment of task managers as each TM goes offline and comes back online
> again after some time. So during bouncing of every TM pod somehow task
> allocation changes and finally job stabilises once all TMs are restarted.
> Maybe a proper blue green setup would allow us to make the best use of
> local recovery during restart of TMs. But during intermittent failures of
> one of the TMs local recovery works as expected on the other healthy TM
> instances ( I.e it does not download from remote ).
>
> On Wed, 7 Apr 2021 at 10:35 Till Rohrmann  wrote:
>
> Hi Dhanesh,
>
> if some of the previously used TMs are still available, then Flink should
> try to redeploy tasks onto them also in case of a global failover. Only
> those tasks which have been executed on the lost TaskManager will need new
> slots and have to download the state from the remote storage.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole 
> wrote:
>
> Hi Sonam,
>
> We have a similar setup. What I have observed is, when the task manager
> pod gets killed and restarts again ( i.e. the entire task manager process
> restarts ) then local recovery doesn't happen. Task manager restore process
> actually downloads the latest completed checkpoint from the remote state
> handle even when the older localState data is available. This happens
> because with every run allocation-ids for tasks running on task manager
> change as task manager restart causes global job failure and restart.
>
> Local recovery - i.e task restore process using locally stored checkpoint
> data kicks in when the task manager process is alive but due to some other
> reason ( like timeout from sink or external dependency ) one of the tasks
> fails and the flink job gets restarted by the job manager.
>
> Please CMIIW
>
>
> -
> Dhanesh Arole
>
> On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann 
> wrote:
>
> Hi Sonam,
>
> The easiest way to see whether local state has been used for recovery is
> the recovery time. Apart from that you can also look for "Found registered
> local state for checkpoint {} in subtask ({} - {} - {}" in the logs which
> is logged on debug. This indicates that the local state is available.
> However, it does not say whether it is actually used. E.g. when doing a
> rescaling operation we change the assignment of key group ranges which
> prevents local state from being used. However in case of a recovery the
> above-mentioned log message should indicate that we use local state
> recovery.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi Sonam,
>
> Pulling in Till (cc'ed), I believe he would likely be able to help you
> here.
>
> Cheers,
> Gordon
>
> On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal  wrote:
>
> Hello,
>
> We are experimenting with task local recovery and I wanted to know whether
> there is a way to validate that some tasks of the job recovered from the
> local state rather than the remote state.
>
> We've currently set this up to have 2 Task Managers with 2 slots each, and
> we run a job with parallelism 4. To simulate failure, we kill one of the
> Task Manager pods (we run on Kubernetes). I want to see if the local state
> of the other Task Manager was used or not. I do understand that the state
> for the killed Task Manager will need to be fetched from the checkpoint.
>
> Also, do you have any suggestions on how to test such failure scenarios in
> a better way?
>
> Thanks,
> Sonam
>
> --
> - Dhanesh ( sent from my mobile device. Pardon me for any typos )
>


Re: Task manager local state data after crash / recovery

2021-04-09 Thread Till Rohrmann
Hi Dhanesh,

The way local state works in Flink currently is the following: The user
configures a `taskmanager.state.local.root-dirs` or the tmp directory is
used where Flink creates a "localState" directory. This is the base
directory for all local state. Within this directory a TaskManager creates
for every allocation a sub directory using the `AllocationID`. Inside this
directory, Flink then stores the local state artefacts.

When Flink frees an allocation, then the corresponding directory is
deleted. In case that the process is being killed via a SIGTERM signal,
Flink also registers a shut down hook which tries to delete all directories
for the known `AllocationIDs`. If the shut down hooks do not run (e.g.
killed via SIGKILL), then Flink leaves some residual state.

Now the problem is what happens if the TaskManager process is restarted on
the same machine. In this case, Flink will simply use the same local state
directory but it ignores existing allocation id sub directories. The reason
is that Flink does not know whether these allocation id sub directories are
not used by another Flink process running on the same machine. In order to
make this decision Flink would have to know that it is the owner of these
sub directories. This could work if each TaskManager process is started
with a unique ID and if this ID is reused across restart attempts. This is
currently not for every deployment the case.

Long story short, it is currently expected that Flink can leave some
residual state in case of a hard process stop. Cleaning this state up is at
the moment unfortunately the responsibility of the user.

Cheers,
Till

On Tue, Apr 6, 2021 at 4:55 PM dhanesh arole  wrote:

> Hey all,
>
> We are running a stateful stream processing job on k8s using per-job
> standalone deployment entrypoint. Flink version: 1.12.1
>
> *Problem*: We have observed that whenever a task manager is either
> gracefully shut down or killed ( due to OOM, k8s worker node drain out etc
> ) it doesn't clean up the rocksdb state directories from the local disk.
> But when the task manager restarts and it receives new task allocation from
> the resource manager it rebuilds its local state for those tasks from the
> previous completed checkpoint. Over the period of time after multiple
> restarts, the task manager's local disk ends up accumulating lots of such
> orphan rocksdb directories.
>
> *Questions*: This isn't causing any functional issues to us, but it adds
> up lots of repeated ops overhead of cleaning these disks periodically. As a
> workaround, we are thinking of cleaning the local rocksdb directories
> except for the *taskmanager.state.local.root-dirs *before starting the
> task manager java process. Since, during every task manager restart keyed
> state backends for allocated tasks are anyway restored we feel it is the
> safest option atm and will solve our problem of ever growing disk on task
> manager pods. Is it safe to do so or are there any other consequences of
> it? Is there any config or restart policy that takes care of cleaning up
> such stale rocksdb directories during the statebackend restore process?.
>
> A sort of similar clean up is required when local task recovery is
> enabled. Whenever the task manager is not shut down gracefully the old
> localState doesn't get cleaned up on the next restart. This also causes
> lots of disk space wastage. It's easier to delete rocksdb working
> directories from previou run, but not so straightforward for the localState
> as one has to figure out which one of them are actually stale allocation
> IDs and clean only those one. Or check the latest completed checkpoint and
> delete all localStates directories for older checkpoints and
> allocation-ids. Is there any other solution to this problem? Also would
> like to learn from other users how are you handling these operational tasks
> currently?
>
> configurations:
>
> state.backend.local-recovery: true
> taskmanager.state.local.root-dirs: /data/flink/
>
> RocksDb backend DB storage path:  /data/flink ( set programmatically )
>
>
> -
> Dhanesh Arole
>


Re: Flink: Exception from container-launch exitCode=2

2021-04-09 Thread Till Rohrmann
I actually think that the logging problem is caused by Hadoop 2.7.3 which
pulls in the slf4j-log4j12-1.7.10.jar. This binding is then used but there
is no proper configuration file for log4j because Flink actually uses
log4j2.

Cheers,
Till

On Fri, Apr 9, 2021 at 12:05 PM Till Rohrmann  wrote:

> Hi Yik San,
>
> to me it looks as if there is a problem with the job and the deployment.
> Unfortunately, the logging seems to not have worked. Could you check that
> you have a valid log4j.properties file in your conf directory.
>
> Cheers,
> Till
>
> On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan 
> wrote:
>
>> *The question is cross-posted on Stack
>> Overflow 
>> https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2
>> <https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2>.
>> Viewing the question on Stack Overflow is preferred as I include a few
>> images for better description.*
>>
>> Hi community,
>>
>> ## Flink (Scala) exitCode=2
>>
>> I have a simple Flink job that reads from 2 columns of a Hive table
>> `mysource`, add up the columns, then writes the result to another Hive
>> table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`,
>> and `mysink` has only 1 column `c bigint`.
>>
>> The job submits successfully, however, I observe it keeps retrying.
>>
>> [![enter image description here][1]][1]
>>
>> I click into each attempt, they simply show this.
>>
>> ```
>> AM Container for appattempt_1607399514900_2511_001267 exited with
>> exitCode: 2
>> For more detailed output, check application tracking page:
>> http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then,
>> click on links to logs of each attempt.
>> Diagnostics: Exception from container-launch.
>> Container id: container_e13_1607399514900_2511_1267_01
>> Exit code: 2
>> Stack trace: ExitCodeException exitCode=2:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
>> at org.apache.hadoop.util.Shell.run(Shell.java:479)
>> at
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Container exited with a non-zero exit code 2
>> Failing this attempt
>> ```
>>
>> However, the "Logs" has no useful info - it complains about the logging
>> lib, but I believe they are really warnings, not errors.
>>
>> ```
>> LogType:jobmanager.err
>> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
>> LogLength:1010
>> Log Contents:
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> log4j:WARN No appenders could be found for logger
>> (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
>> log4j:WARN Please initialize the log4j system properly.
>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
>> more info.
>> End of LogType:jobmanager.err
>>
>> LogType:jobmanager.out
>> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
>> LogLength:0
>> Log Contents:
>> End of LogType:jobmanager.out
>> ```
>>
>> This is the job written in Scala.
>>
>> ``

Re: Flink: Exception from container-launch exitCode=2

2021-04-09 Thread Till Rohrmann
Hi Yik San,

to me it looks as if there is a problem with the job and the deployment.
Unfortunately, the logging seems to not have worked. Could you check that
you have a valid log4j.properties file in your conf directory.

Cheers,
Till

On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan 
wrote:

> *The question is cross-posted on Stack
> Overflow 
> https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2
> .
> Viewing the question on Stack Overflow is preferred as I include a few
> images for better description.*
>
> Hi community,
>
> ## Flink (Scala) exitCode=2
>
> I have a simple Flink job that reads from 2 columns of a Hive table
> `mysource`, add up the columns, then writes the result to another Hive
> table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`,
> and `mysink` has only 1 column `c bigint`.
>
> The job submits successfully, however, I observe it keeps retrying.
>
> [![enter image description here][1]][1]
>
> I click into each attempt, they simply show this.
>
> ```
> AM Container for appattempt_1607399514900_2511_001267 exited with
> exitCode: 2
> For more detailed output, check application tracking page:
> http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then,
> click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_e13_1607399514900_2511_1267_01
> Exit code: 2
> Stack trace: ExitCodeException exitCode=2:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
> at org.apache.hadoop.util.Shell.run(Shell.java:479)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Container exited with a non-zero exit code 2
> Failing this attempt
> ```
>
> However, the "Logs" has no useful info - it complains about the logging
> lib, but I believe they are really warnings, not errors.
>
> ```
> LogType:jobmanager.err
> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
> LogLength:1010
> Log Contents:
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
> SLF4J: Found binding in
> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> log4j:WARN No appenders could be found for logger
> (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> End of LogType:jobmanager.err
>
> LogType:jobmanager.out
> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
> LogLength:0
> Log Contents:
> End of LogType:jobmanager.out
> ```
>
> This is the job written in Scala.
>
> ```scala
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.catalog.hive.HiveCatalog
>
> object HiveToyExample {
>   def main(args: Array[String]): Unit = {
> val settings = EnvironmentSettings.newInstance.build
> val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(execEnv, settings)
>
> val hiveCatalog = new HiveCatalog(
>   "myhive",
>   "aiinfra",
>   "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
> )
> tableEnv.registerCatalog("myhive", hiveCatalog)
> tableEnv.useCatalog("myhive")
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>
> tableEnv
>   .executeSql("""
>   |INSERT INTO mysink
>   |SELECT a + b
>   |FROM mysource
>   |""

Re: Flink does not cleanup some disk memory after submitting jar over rest

2021-04-09 Thread Till Rohrmann
Hi,

What you could also do is to create several heap dumps [1] whenever you
submit a new job. This could allow us to analyze whether there is something
increasing the heap memory consumption. Additionally, you could try to
upgrade your cluster to Flink 1.12.2 since we fixed some problems Maciek
mentioned.

[1] https://stackoverflow.com/a/3042463/4815083

Cheers,
Till

On Thu, Apr 8, 2021 at 9:15 PM Maciek Próchniak  wrote:

> Hi,
>
> don't know if this is the problem you're facing, but some time ago we
> encountered two issues connected to REST API and increased disk usage after
> each submission:
>
> https://issues.apache.org/jira/browse/FLINK-21164
>
> https://issues.apache.org/jira/browse/FLINK-9844
>
> - they're closed ATM, but only 1.12.2 contains the fixes.
>
>
> maciek
>
>
> On 08.04.2021 19:52, Great Info wrote:
>
> I have deployed my own flink setup in AWS ECS. One Service for JobManager
> and one Service for task Managers. I am running one ECS task for a job
> manager and 3 ecs tasks for TASK managers.
>
> I have a kind of batch job which I upload using flink rest every-day with
> changing new arguments, when I submit each time disk memory gets increased
> by ~ 600MB, I have given a checkpoint as S3 . Also I have set
> *historyserver.archive.clean-expired-jobs* true .
>
> Since I am running on ECS, I am not able to find why the memory is getting
> increased on every jar upload and execution .
>
> What are the flink config params I should look at to make sure the memory
> is not shooting up?
>
>


Re: Why does flink-quickstart-scala suggests adding connector dependencies in the default scope, while Flink Hive integration docs suggest the opposite

2021-04-09 Thread Till Rohrmann
Hi Yik San,

(1) You could do the same with Kafka. For Hive I believe that the
dependency is simply quite large so that it hurts more if you bundle it
with your user code.

(2) If you change the content in the lib directory, then you have to
restart the cluster.

Cheers,
Till

On Fri, Apr 9, 2021 at 4:02 AM Yik San Chan 
wrote:

> Hi Till, I have 2 follow-ups.
>
> (1) Why is Hive special, while for connectors such as kafka, the docs
> suggest simply bundling the kafka connector dependency with my user code?
>
> (2) it seems the document misses the "before you start the cluster" part -
> does it always require a cluster restart whenever the /lib directory
> changes?
>
> Thanks.
>
> Best,
> Yik San
>
> On Fri, Apr 9, 2021 at 1:07 AM Till Rohrmann  wrote:
>
>> Hi Yik San,
>>
>> for future reference, I copy my answer from the SO here:
>>
>> The reason for this difference is that for Hive it is recommended to
>> start the cluster with the respective Hive dependencies. The documentation
>> [1] states that it's best to put the dependencies into the lib directory
>> before you start the cluster. That way the cluster is enabled to run jobs
>> which use Hive. At the same time, you don't have to bundle this dependency
>> in the user jar which reduces its size. However, there shouldn't be
>> anything preventing you from bundling the Hive dependency with your user
>> code if you want to.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#dependencies
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 8, 2021 at 11:41 AM Yik San Chan 
>> wrote:
>>
>>> The question is cross-posted on Stack Overflow
>>> https://stackoverflow.com/questions/67001326/why-does-flink-quickstart-scala-suggests-adding-connector-dependencies-in-the-de
>>> .
>>>
>>> ## Connector dependencies should be in default scope
>>>
>>> This is what [flink-quickstart-scala](
>>> https://github.com/apache/flink/blob/d12eeedfac6541c3a0711d1580ce3bd68120ca90/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml#L84)
>>> suggests:
>>>
>>> ```
>>> 
>>>
>>> 
>>> ```
>>>
>>> It also aligns with [Flink project configuration](
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies
>>> ):
>>>
>>> > We recommend packaging the application code and all its required
>>> dependencies into one jar-with-dependencies which we refer to as the
>>> application jar. The application jar can be submitted to an already running
>>> Flink cluster, or added to a Flink application container image.
>>> >
>>> > Important: For Maven (and other build tools) to correctly package the
>>> dependencies into the application jar, these application dependencies must
>>> be specified in scope compile (unlike the core dependencies, which must be
>>> specified in scope provided).
>>>
>>> ## Hive connector dependencies should be in provided scope
>>>
>>> However, [Flink Hive Integration docs](
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven)
>>> suggests the opposite:
>>>
>>> > If you are building your own program, you need the following
>>> dependencies in your mvn file. It’s recommended not to include these
>>> dependencies in the resulting jar file. You’re supposed to add dependencies
>>> as stated above at runtime.
>>>
>>> ## Why?
>>>
>>> Thanks!
>>>
>>> Best,
>>> Yik San
>>>
>>


Re: Why does flink-quickstart-scala suggests adding connector dependencies in the default scope, while Flink Hive integration docs suggest the opposite

2021-04-08 Thread Till Rohrmann
Hi Yik San,

for future reference, I copy my answer from the SO here:

The reason for this difference is that for Hive it is recommended to start
the cluster with the respective Hive dependencies. The documentation [1]
states that it's best to put the dependencies into the lib directory before
you start the cluster. That way the cluster is enabled to run jobs which
use Hive. At the same time, you don't have to bundle this dependency in the
user jar which reduces its size. However, there shouldn't be anything
preventing you from bundling the Hive dependency with your user code if you
want to.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#dependencies

Cheers,
Till

On Thu, Apr 8, 2021 at 11:41 AM Yik San Chan 
wrote:

> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/67001326/why-does-flink-quickstart-scala-suggests-adding-connector-dependencies-in-the-de
> .
>
> ## Connector dependencies should be in default scope
>
> This is what [flink-quickstart-scala](
> https://github.com/apache/flink/blob/d12eeedfac6541c3a0711d1580ce3bd68120ca90/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml#L84)
> suggests:
>
> ```
> 
>
> 
> ```
>
> It also aligns with [Flink project configuration](
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#adding-connector-and-library-dependencies
> ):
>
> > We recommend packaging the application code and all its required
> dependencies into one jar-with-dependencies which we refer to as the
> application jar. The application jar can be submitted to an already running
> Flink cluster, or added to a Flink application container image.
> >
> > Important: For Maven (and other build tools) to correctly package the
> dependencies into the application jar, these application dependencies must
> be specified in scope compile (unlike the core dependencies, which must be
> specified in scope provided).
>
> ## Hive connector dependencies should be in provided scope
>
> However, [Flink Hive Integration docs](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven)
> suggests the opposite:
>
> > If you are building your own program, you need the following
> dependencies in your mvn file. It’s recommended not to include these
> dependencies in the resulting jar file. You’re supposed to add dependencies
> as stated above at runtime.
>
> ## Why?
>
> Thanks!
>
> Best,
> Yik San
>


Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Till Rohrmann
Hi Flavio,

I tried to execute the code snippet you have provided and I could not
reproduce the problem.

Concretely I am running this code:

final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
final TableEnvironment tableEnv = TableEnvironment.create(envSettings);

tableEnv.fromValues("foobar").execute().await();

Am I missing something? Maybe you can share a minimal but fully working
example where the problem occurs. Thanks a lot.

Cheers,
Till

On Thu, Apr 8, 2021 at 11:25 AM Flavio Pompermaier 
wrote:

> Any help here? Moreover if I use the DataStream APIs there's no left/right
> outer join yet..are those meant to be added in Flink 1.13 or 1.14?
>
> On Wed, Apr 7, 2021 at 12:27 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> I'm testing writing to a CSV using Flink 1.13 and I get the following
>> error:
>>
>> The matching candidates:
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> Unsupported property keys:
>> format.quote-character
>>
>> I create the table env using this:
>>
>> final EnvironmentSettings envSettings =
>> EnvironmentSettings.newInstance()//
>> .useBlinkPlanner()//
>> // .inBatchMode()//
>> .inStreamingMode()//
>> .build();
>> final TableEnvironment tableEnv =
>> TableEnvironment.create(envSettings);
>>
>> The error is the same both with inBatchMode and inStreamingMode.
>> Is this really not supported or am I using the wrong API?
>>
>> Best,
>> Flavio
>>
>


Re: Reducing Task Manager Count Greatly Increases Savepoint Restore

2021-04-08 Thread Till Rohrmann
Hi Kevin,

when decreasing the TaskManager count I assume that you also decrease the
parallelism of the Flink job. There are three aspects which can then cause
a slower recovery.

1) Each Task gets a larger key range assigned. Therefore, each TaskManager
has to download more data in order to restart the Task. Moreover, there are
fewer nodes downloading larger portions of the data (less parallelization).
2) If you rescaled the parallelism, then it can happen that a Task gets a
key range assigned which requires downloading of multiple key range parts
from the previous run/savepoint. The new key range might not need all the
data from the savepoint parts and hence you download some data which is not
really used in the end.
3) When rescaling the job, then Flink has to rebuild the RocksDB instance
which is an expensive and slow operation. What happens is that Flink
creates for every savepoint part which it needs for its key range a RocksDB
instance and then extracts the part which is only relevant for its key
range into a new RocksDB instance. This causes a lot of read and write
amplification.

Cheers,
Till

On Wed, Apr 7, 2021 at 4:07 PM Kevin Lam  wrote:

> Hi all,
>
> We are trying to benchmark savepoint size vs. restore time.
>
> One thing we've observed is that when we reduce the number of task
> managers, the time to restore from a savepoint increases drastically:
>
> 1/ Restoring from 9.7tb savepoint onto 156 task managers takes 28 minutes
> 2/ Restoring from the save savepoint onto 30 task managers takes over 3
> hours
>
> *Is this expected? How does the restore process work? Is this just a
> matter of having lower restore parallelism for 30 task managers vs 156 task
> managers? *
>
> Some details
>
> - Running on kubernetes
> - Used Rocksdb with a local ssd for state backend
> - Savepoint is hosted on GCS
> - The smaller task manager case is important to us because we expect to
> deploy our application with a high number of task managers, and downscale
> once a backfill is completed
>
> Differences between 1/ and 2/:
>
> 2/ has decreased task manager count 156 -> 30
> 2/ has decreased operator parallelism by a factor of ~10
> 2/ uses a striped SSD (3 ssds mounted as a single logical volume) to hold
> rocksdb files
>
> Thanks in advance for your help!
>


  1   2   3   4   5   6   7   8   9   10   >