Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread SHI Xiaogang
Hi Dadashov,

You may have a look at method YarnResourceManager#onContainersAllocated
which will launch containers (via NMClient#startContainer) after containers
are allocated.
The launching is performed in the main thread of YarnResourceManager and
the launching is synchronous/blocking. Consequently, the containers will be
launched one by one.

Regards,
Xiaogang

Elkhan Dadashov  于2019年8月31日周六 上午2:37写道:

> Thanks  everyone for valuable input and sharing  your experience for
> tackling the issue.
>
> Regarding suggestions :
> - We provision some common jars in all cluster nodes  *-->*  but this
> requires dependence on Infra Team schedule for handling common jars/updating
> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
> size),  did not improve much. Only 100 containers could started in time.
> but then receiving :
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1566422713305 found 1566422560552
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>
>
> - It would be nice to see FLINK-13184
>  , but expected
> version that will get in is 1.10
> - Increase replication factor --> It would be nice to have Flink conf for
> setting replication factor for only Fink job jars, but not the output. It
> is also challenging to set a replication for yet non-existing directory,
> the new files will have default replication factor. Will explore HDFS cache
> option.
>
> Maybe another option can be:
> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
> jars from already started TaskManagers  in P2P fashion, not to have a
> blocker on HDFS replication.
>
> Spark job without any tuning exact same size jar with 800 executors, can
> start without any issue at the same cluster in less than a minute.
>
> *Further questions:*
>
> *@ SHI Xiaogang > :*
>
> I see that all 800 requests are sent concurrently :
>
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
> container with resources . Number pending requests
> 793.
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> e908cb4700d5127a0b67be035e4494f7 with allocation id
> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
> container with resources . Number pending requests
> 794.
> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> e908cb4700d5127a0b67be035e4494f7 with allocation id
> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
> ...
>
> Can you please elaborate the part  "As containers are launched and stopped
> one after another" ? Any pointer to class/method in Flink?
>
> *@ Zhu Zhu > *:
>
> Regarding "One optimization that we take is letting yarn to reuse the
> flink-dist jar which was localized when running previous jobs."
>
> We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS
> (from offline source), to have 1 single pipeline for both batch and
> real-time. So for batch Flink job, the containers will be released once the
> job is done.
> I guess your job is real-time flink, so  you can share the  jars from
> already long-running jobs.
>
> Thanks.
>
>
> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang  wrote:
>
>> I can think of 2 approaches:
>>
>> 1. Allow flink to specify the replication of the submitted uber jar.
>> 2. Allow flink to specify config flink.yarn.lib which is all the flink
>> related jars that are hosted on hdfs. This way users don't need to build
>> and submit a fat uber jar every time. And those flink jars hosted on hdfs
>> can also be specify replication separately.
>>
>>
>>
>> Till Rohrmann  于2019年8月30日周五 下午3:33写道:
>>
>>> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
>>> we can merge it during this release cycle.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang 
>>> wrote:
>>>
 Hi Datashov,

 We faced similar problems in our production clusters.

 Now both lauching and stopping of containers are performed in the main
 thread of YarnResourceManager. As containers are launched and stopped one
 after another, it usually takes long time to boostrap large jobs. Things
 get worse when some node managers 

Re: [DISCUSS] Flink client api enhancement for downstream project

2019-08-30 Thread Zili Chen
Great Kostas! Looking forward to your POC!

Best,
tison.


Jeff Zhang  于2019年8月30日周五 下午11:07写道:

> Awesome, @Kostas Looking forward your POC.
>
> Kostas Kloudas  于2019年8月30日周五 下午8:33写道:
>
> > Hi all,
> >
> > I am just writing here to let you know that I am working on a POC that
> > tries to refactor the current state of job submission in Flink.
> > I want to stress out that it introduces NO CHANGES to the current
> > behaviour of Flink. It just re-arranges things and introduces the
> > notion of an Executor, which is the entity responsible for taking the
> > user-code and submitting it for execution.
> >
> > Given this, the discussion about the functionality that the JobClient
> > will expose to the user can go on independently and the same
> > holds for all the open questions so far.
> >
> > I hope I will have some more new to share soon.
> >
> > Thanks,
> > Kostas
> >
> > On Mon, Aug 26, 2019 at 4:20 AM Yang Wang  wrote:
> > >
> > > Hi Zili,
> > >
> > > It make sense to me that a dedicated cluster is started for a per-job
> > > cluster and will not accept more jobs.
> > > Just have a question about the command line.
> > >
> > > Currently we could use the following commands to start different
> > clusters.
> > > *per-job cluster*
> > > ./bin/flink run -d -p 5 -ynm perjob-cluster1 -m yarn-cluster
> > > examples/streaming/WindowJoin.jar
> > > *session cluster*
> > > ./bin/flink run -p 5 -ynm session-cluster1 -m yarn-cluster
> > > examples/streaming/WindowJoin.jar
> > >
> > > What will it look like after client enhancement?
> > >
> > >
> > > Best,
> > > Yang
> > >
> > > Zili Chen  于2019年8月23日周五 下午10:46写道:
> > >
> > > > Hi Till,
> > > >
> > > > Thanks for your update. Nice to hear :-)
> > > >
> > > > Best,
> > > > tison.
> > > >
> > > >
> > > > Till Rohrmann  于2019年8月23日周五 下午10:39写道:
> > > >
> > > > > Hi Tison,
> > > > >
> > > > > just a quick comment concerning the class loading issues when using
> > the
> > > > per
> > > > > job mode. The community wants to change it so that the
> > > > > StandaloneJobClusterEntryPoint actually uses the user code class
> > loader
> > > > > with child first class loading [1]. Hence, I hope that this problem
> > will
> > > > be
> > > > > resolved soon.
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-13840
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Fri, Aug 23, 2019 at 2:47 PM Kostas Kloudas  >
> > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > On the topic of web submission, I agree with Till that it only
> > seems
> > > > > > to complicate things.
> > > > > > It is bad for security, job isolation (anybody can submit/cancel
> > jobs),
> > > > > > and its
> > > > > > implementation complicates some parts of the code. So, if it were
> > to
> > > > > > redesign the
> > > > > > WebUI, maybe this part could be left out. In addition, I would
> say
> > > > > > that the ability to cancel
> > > > > > jobs could also be left out.
> > > > > >
> > > > > > Also I would also be in favour of removing the "detached" mode,
> for
> > > > > > the reasons mentioned
> > > > > > above (i.e. because now we will have a future representing the
> > result
> > > > > > on which the user
> > > > > > can choose to wait or not).
> > > > > >
> > > > > > Now for the separating job submission and cluster creation, I am
> in
> > > > > > favour of keeping both.
> > > > > > Once again, the reasons are mentioned above by Stephan, Till,
> > Aljoscha
> > > > > > and also Zili seems
> > > > > > to agree. They mainly have to do with security, isolation and
> ease
> > of
> > > > > > resource management
> > > > > > for the user as he knows that "when my job is done, everything
> > will be
> > > > > > cleared up". This is
> > > > > > also the experience you get when launching a process on your
> local
> > OS.
> > > > > >
> > > > > > On excluding the per-job mode from returning a JobClient or not,
> I
> > > > > > believe that eventually
> > > > > > it would be nice to allow users to get back a jobClient. The
> > reason is
> > > > > > that 1) I cannot
> > > > > > find any objective reason why the user-experience should diverge,
> > and
> > > > > > 2) this will be the
> > > > > > way that the user will be able to interact with his running job.
> > > > > > Assuming that the necessary
> > > > > > ports are open for the REST API to work, then I think that the
> > > > > > JobClient can run against the
> > > > > > REST API without problems. If the needed ports are not open, then
> > we
> > > > > > are safe to not return
> > > > > > a JobClient, as the user explicitly chose to close all points of
> > > > > > communication to his running job.
> > > > > >
> > > > > > On the topic of not hijacking the "env.execute()" in order to get
> > the
> > > > > > Plan, I definitely agree but
> > > > > > for the proposal of having a "compile()" method in the env, I
> would
> > > > > > like to have a better look at
> > > > > > the existing code.
> > > > > >
> > > > > > Cheers,
> > > 

Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-08-30 Thread Zhu Zhu
In our production, we usually override the restart delay to be 10 s.
We once encountered cases that external services are overwhelmed by
reconnections from frequent restarted tasks.
As a safer though not optimized option, a default delay larger than 0 s is
better in my opinion.


未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:

> Hi,
>
>
> I thinks it's better to increase the default value. +1
>
>
> Best.
>
>
>
>
> -- 原始邮件 --
> 发件人: "Till Rohrmann";
> 发送时间: 2019年8月30日(星期五) 晚上10:07
> 收件人: "dev"; "user";
> 主题: [SURVEY] Is the default restart delay of 0s causing problems?
>
>
>
> Hi everyone,
>
> I wanted to reach out to you and ask whether decreasing the default delay
> to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
> user reported that he would like to increase the default value because it
> can cause restart storms in case of systematic faults [2].
>
> The downside of increasing the default delay would be a slightly increased
> restart time if this config option is not explicitly set.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9158
> [2] https://issues.apache.org/jira/browse/FLINK-11218
>
> Cheers,
> Till


[ANNOUNCE] Kinesis connector becomes part of Flink releases

2019-08-30 Thread Bowen Li
Hi all,

I'm glad to announce that, as #9494
was merged today,
flink-connector-kinesis is officially of Apache 2.0 license now in master
branch and its artifact will be deployed to Maven central as part of Flink
releases starting from Flink 1.10.0. Users can use the artifact out of
shelf then and no longer have to build and maintain it on their own.

It brings a much better user experience to our large AWS customer base by
making their work simpler, smoother, and more productive!

Thanks everyone who participated in coding and review to drive this
initiative forward.

Cheers,
Bowen


[jira] [Created] (FLINK-13932) PyTest ExecutionConfigTests.test_equals_and_hash fail

2019-08-30 Thread TisonKun (Jira)
TisonKun created FLINK-13932:


 Summary: PyTest ExecutionConfigTests.test_equals_and_hash fail
 Key: FLINK-13932
 URL: https://issues.apache.org/jira/browse/FLINK-13932
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: TisonKun


Not yet found at master, but independent pull requests even trivial fix fail on 
the same case


{code:java}
=== FAILURES ===
__ ExecutionConfigTests.test_equals_and_hash ___
self = 
def test_equals_and_hash(self):

config1 = ExecutionEnvironment.get_execution_environment().get_config()

config2 = ExecutionEnvironment.get_execution_environment().get_config()

self.assertEqual(config1, config2)

>   self.assertEqual(hash(config1), hash(config2))
E   AssertionError: 897378335 != 1596606912
pyflink/common/tests/test_execution_config.py:277: AssertionError
=== warnings summary ===
.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13
.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13
.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13
.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13
.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13
  
/home/travis/build/flink-ci/flink/flink-python/.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13:
 DeprecationWarning: Using or importing the ABCs from 'collections' instead of 
from 'collections.abc' is deprecated, and in 3.8 it will stop working
from collections import (
{code}

https://api.travis-ci.com/v3/job/229672435/log.txt
https://api.travis-ci.com/v3/job/229721832/log.txt

cc [~sunjincheng121] [~dian.fu]




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-30 Thread Becket Qin
Hi Timo,

Thanks for the reply. I am still a little concerned over the mutability of
the Configurable which could be the value in Configuration.

Re: 1

> But in general, people should not use any internal fields.
> Configurable objects are meant for simple little helper POJOs, not
> complex arbitrary nested data structures.

This seems difficult to enforce... Ideally the values in configurations
should actually be immutable. The value can only be changed by explicitly
calling setters in Configuration. Otherwise we may have weird situation
where the Configurable in the same configuration are different in two
places because the configurable is modified in one places and not modified
in another place. So I am a little concerned on putting a Configurable type
in the Configuration map, because the value could be modified without
explicitly setting the configuration. For example, can users do the
following?

Configurable configurable =
configuration.getConfigurable(myConfigurableOption);
configurable.setConfigA(123); // this already changes the configurable
object in the configuration.

Re: 2
Thanks for confirming. As long as users will not have a situation where
they need to set two configurations with the same key but different
descriptions, I think it is OK.

Re: 3
This is actually kind of related to 1. i.e. Whether toConfiguration()
guarantees the exact same object can be rebuilt from the configuration or
not. I am still not quite sure about the use case of toConfiguration()
though. It seems indicating the Configurable is mutable, which might be
dangerous.

Thanks,

Jiangjie (Becket) Qin

On Fri, Aug 30, 2019 at 10:04 PM Timo Walther  wrote:

> Hi Becket,
>
> 1. First of all, you are totally right. The FLIP contains a bug due to
> the last minute changes that Dawid suggested: by having immutable
> objects created by a factory we loose the serializability of the
> Configuration because the factory itself is not stored in the
> Configuration. I would propose to revert the last change and stick to
> the original design, which means that a object must implement the
> Configurable interface and also implements serialization/deserialization
> methods such that also internal fields can be persisted as you
> suggested. But in general, people should not use any internal fields.
> Configurable objects are meant for simple little helper POJOs, not
> complex arbitrary nested data structures.
>
> It is Map because Configuration stores the raw objects.
> If you put a Boolean option into it, it remains Boolean. This makes the
> map very efficient for shipping to the cluster and accessing options
> multiple times. The same for configurable objects. We put the pure
> objects into the map without any serialization/deserialization. The
> provided factory allows to convert the Object into a Configuration and
> we know how to serialize/deserializise a configuration because it is
> just a key/value map.
>
> 2. Yes, this is what we had in mind. It should still be the same
> configuration option. We would like to avoid specialized option keys
> across components (exec.max-para and table.exec.max-para) if they are
> describing basically the same thing. But adding some more description
> like "TableOptions.MAX_PARALLELISM with description_1 + description_2"
> does not hurt.
>
> 3. They should restore the original object given that the
> toConfiguration/fromConfiguration methods have been implemented
> correctly. I will extend the example to make the logic clearer while
> fixing the bug.
>
> Thanks for the healthy discussion,
> Timo
>
>
> On 30.08.19 15:29, Becket Qin wrote:
> > Hi Timo,
> >
> > Thanks again for the clarification. Please see a few more questions
> below.
> >
> > Re: 1
> >
> >> Please also keep in mind that Configuration must not consist of only
> >> strings, it manages a Map for efficient access. Every
> >> map entry can have a string representation for persistence, but in most
> >> cases consists of unserialized objects.
> >
> > I'd like to understand this a bit more. The reason we have a Map > Object> in Configuration was because that Object could be either a
> String,
> > a List or a Map, right? But they eventually always boil down to Strings,
> or
> > maybe one of the predefined type that we know how to serialize. In the
> > current design, can the Object also be Configurable?
> > If the value in the config Map can be Configurable
> objects,
> > how do we serialize them? Calling toConfiguration() seems not quite
> working
> > because there might be some other internal fields that are not part of
> the
> > configuration. The modification to those fields will be lost if we simply
> > use toConfiguration(). So the impact of modifying those Configurable
> > objects seems a little undefined. And it would be difficult to prevent
> > users from doing that.
> > If the value in the config Map cannot be Configurable
> > objects, then it seems a little weird to have all the other ConfigType
> > stored in the ConfigMap in 

Re: [ANNOUNCE] Apache Flink-shaded 8.0 released

2019-08-30 Thread Hequn Cheng
Thanks a lot to Chesney!
Also thanks to everyone who helped to make this release possible.

Best, Hequn

On Fri, Aug 30, 2019 at 7:17 PM jincheng sun 
wrote:

> Thanks a lot Chesnay and to the community for making this release possible
> !
>
> Cheers,
> Jincheng
>
> Chesnay Schepler  于2019年8月30日周五 下午6:56写道:
>
> > The Apache Flink community is very happy to announce the release of
> > Apache Flink-shaded 8.0.
> >
> > The flink-shaded project contains a number of shaded dependencies for
> > Apache Flink.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> > streaming applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345488
> >
> > We would like to thank all contributors of the Apache Flink community
> > who made this release possible!
> >
> > Regards,
> > Chesnay
> >
> >
>


[jira] [Created] (FLINK-13931) Support Hive version 2.0.x

2019-08-30 Thread Xuefu Zhang (Jira)
Xuefu Zhang created FLINK-13931:
---

 Summary: Support Hive version 2.0.x
 Key: FLINK-13931
 URL: https://issues.apache.org/jira/browse/FLINK-13931
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang
 Fix For: 1.10.0


Including 3.1.0, 3.1.1, and 3.1.2.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: Potential block size issue with S3 binary files

2019-08-30 Thread Ken Krugler
Hi Stephan (switching to dev list),

> On Aug 29, 2019, at 2:52 AM, Stephan Ewen  wrote:
> 
> That is a good point.
> 
> Which way would you suggest to go? Not relying on the FS block size at all, 
> but using a fix (configurable) block size?

There’s value to not requiring a fixed block size, as then a file that’s moved 
between different file systems can be read using whatever block size is optimal 
for that system.

Hadoop handles this in sequence files by storing a unique “sync marker” value 
in the file header (essentially a 16 byte UUID), injecting one of these every 
2K bytes or so (in between records), and then code can scan for this to find 
record boundaries without relying on a block size. The idea is that 2^128 is a 
Big Number, so the odds of finding a false-positive sync marker in data is low 
enough to be ignorable.

But that’s a bigger change. Simpler would be to put a header in each part file 
being written, with some signature bytes to aid in detecting old-format files.

Or maybe deprecate SerializedOutputFormat/SerializedInputFormat, and provide 
some wrapper glue to make it easier to write/read Hadoop SequenceFiles that 
have a null key value, and store the POJO as the data value. Then you could 
also leverage Hadoop support for compression at either record or block level.

— Ken

> 
> On Thu, Aug 29, 2019 at 4:49 AM Ken Krugler  > wrote:
> Hi all,
> 
> Wondering if anyone else has run into this.
> 
> We write files to S3 using the SerializedOutputFormat. When we 
> read them back, sometimes we get deserialization errors where the data seems 
> to be corrupt.
> 
> After a lot of logging, the weathervane of blame pointed towards the block 
> size somehow not being the same between the write (where it’s 64MB) and the 
> read (unknown).
> 
> When I added a call to SerializedInputFormat.setBlockSize(64MB), the problems 
> went away.
> 
> It looks like both input and output formats use fs.getDefaultBlockSize() to 
> set this value by default, so maybe the root issue is S3 somehow reporting 
> different values.
> 
> But it does feel a bit odd that we’re relying on this default setting, versus 
> it being recorded in the file during the write phase.
> 
> And it’s awkward to try to set the block size on the write, as you need to 
> set it in the environment conf, which means it applies to all output files in 
> the job.
> 
> — Ken

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



[ANNOUNCEMENT] September 2019 Bay Area Apache Flink Meetup

2019-08-30 Thread Xuefu Zhang
Hi all,

As promised, we planned to have quarterly Flink meetup and now it's about
the time. Thus, I'm happy to announce that the next Bay Area Apache Flink
Meetup [1] is scheduled on Sept. 24 at Yelp, 140 New Montgomery in San
Francisco.

Schedule:

6:30 - Door open
6:30 - 7:00 PM Networking and Refreshments
7:00 - 8:30 PM Short talks

-- Two years of Flink @ Yelp (Enrico Canzonieri, 30m)
-- How BNP Paribas Fortis uses Flink for real-time fraud detectionDavid
Massart (David Massart, tentative)

Please refer to the meetup page [1] for more details.

Many thanks go to Yelp for their sponsorship. At the same time, we might
still have room for one more short talk. Please let me know if interested.


Thanks,

Xuefu

[1] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/262680261/


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Elkhan Dadashov
Thanks  everyone for valuable input and sharing  your experience for
tackling the issue.

Regarding suggestions :
- We provision some common jars in all cluster nodes  *-->*  but this
requires dependence on Infra Team schedule for handling common jars/updating
- Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half size),
did not improve much. Only 100 containers could started in time. but then
receiving :

org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request
to start container.
This token is expired. current time is 1566422713305 found 1566422560552
Note: System times on machines may be out of sync. Check system time
and time zones.


- It would be nice to see FLINK-13184
 , but expected version
that will get in is 1.10
- Increase replication factor --> It would be nice to have Flink conf for
setting replication factor for only Fink job jars, but not the output. It
is also challenging to set a replication for yet non-existing directory,
the new files will have default replication factor. Will explore HDFS cache
option.

Maybe another option can be:
- Letting yet-to-be-started Task Managers (or NodeManagers) download the
jars from already started TaskManagers  in P2P fashion, not to have a
blocker on HDFS replication.

Spark job without any tuning exact same size jar with 800 executors, can
start without any issue at the same cluster in less than a minute.

*Further questions:*

*@ SHI Xiaogang > :*

I see that all 800 requests are sent concurrently :

2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
container with resources . Number pending requests
793.
2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
nativeMemoryInMB=0, networkMemoryInMB=0} for job
e908cb4700d5127a0b67be035e4494f7 with allocation id
AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.

2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
container with resources . Number pending requests
794.
2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
nativeMemoryInMB=0, networkMemoryInMB=0} for job
e908cb4700d5127a0b67be035e4494f7 with allocation id
AllocationID{71bbb917374ade66df4c058c41b81f4e}.
...

Can you please elaborate the part  "As containers are launched and stopped
one after another" ? Any pointer to class/method in Flink?

*@ Zhu Zhu > *:

Regarding "One optimization that we take is letting yarn to reuse the
flink-dist jar which was localized when running previous jobs."

We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS
(from offline source), to have 1 single pipeline for both batch and
real-time. So for batch Flink job, the containers will be released once the
job is done.
I guess your job is real-time flink, so  you can share the  jars from
already long-running jobs.

Thanks.


On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang  wrote:

> I can think of 2 approaches:
>
> 1. Allow flink to specify the replication of the submitted uber jar.
> 2. Allow flink to specify config flink.yarn.lib which is all the flink
> related jars that are hosted on hdfs. This way users don't need to build
> and submit a fat uber jar every time. And those flink jars hosted on hdfs
> can also be specify replication separately.
>
>
>
> Till Rohrmann  于2019年8月30日周五 下午3:33写道:
>
>> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
>> we can merge it during this release cycle.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang 
>> wrote:
>>
>>> Hi Datashov,
>>>
>>> We faced similar problems in our production clusters.
>>>
>>> Now both lauching and stopping of containers are performed in the main
>>> thread of YarnResourceManager. As containers are launched and stopped one
>>> after another, it usually takes long time to boostrap large jobs. Things
>>> get worse when some node managers get lost. Yarn will retry many times to
>>> communicate with them, leading to heartbeat timeout of TaskManagers.
>>>
>>> Following are some efforts we made to help Flink deal with large jobs.
>>>
>>> 1. We provision some common jars in all cluster nodes and ask our users
>>> not to include these jars in their uberjar. When containers bootstrap,
>>> these jars are added to the classpath via JVM options. That way, we can
>>> efficiently reduce the size of uberjars.
>>>
>>> 2. We deploys some asynchronous threads to launch and stop containers in
>>> YarnResourceManager. The bootstrap 

[jira] [Created] (FLINK-13930) Support Hive version 3.1.x

2019-08-30 Thread Xuefu Zhang (Jira)
Xuefu Zhang created FLINK-13930:
---

 Summary: Support Hive version 3.1.x
 Key: FLINK-13930
 URL: https://issues.apache.org/jira/browse/FLINK-13930
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang
 Fix For: 1.10.0


Hive 2.3.6 is released a few days ago. We can trivially support this version as 
well, as we have already provided support for previous 2.3.x releases.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Releasing Flink 1.8.2

2019-08-30 Thread Till Rohrmann
+1 for a 1.8.2 bug fix release. Thanks for kicking this discussion off
Jincheng.

Cheers,
Till

On Fri, Aug 30, 2019 at 6:45 PM Jark Wu  wrote:

> Thanks Jincheng for bringing this up.
>
> +1 to the 1.8.2 release, because it already contains a couple of important
> fixes and it has been a long time since 1.8.1 came out.
> I'm willing to help the community as much as possible. I'm wondering if I
> can be the release manager of 1.8.2 or work with you together @Jincheng?
>
> Best,
> Jark
>
> On Fri, 30 Aug 2019 at 18:58, Hequn Cheng  wrote:
>
> > Hi Jincheng,
> >
> > +1 for a 1.8.2 release.
> > Thanks a lot for raising the discussion. It would be nice to have these
> > critical fixes.
> >
> > Best, Hequn
> >
> >
> > On Fri, Aug 30, 2019 at 6:31 PM Maximilian Michels 
> wrote:
> >
> > > Hi Jincheng,
> > >
> > > +1 I would be for a 1.8.2 release such that we can fix the problems
> with
> > > the nested closure cleaner which currently block 1.8.1 users with Beam:
> > > https://issues.apache.org/jira/browse/FLINK-13367
> > >
> > > Thanks,
> > > Max
> > >
> > > On 30.08.19 11:25, jincheng sun wrote:
> > > > Hi Flink devs,
> > > >
> > > > It has been nearly 2 months since the 1.8.1 released. So, what do you
> > > think
> > > > about releasing Flink 1.8.2 soon?
> > > >
> > > > We already have some blocker and critical fixes in the release-1.8
> > > branch:
> > > >
> > > > [Blocker]
> > > > - FLINK-13159 java.lang.ClassNotFoundException when restore job
> > > > - FLINK-10368 'Kerberized YARN on Docker test' unstable
> > > > - FLINK-12578 Use secure URLs for Maven repositories
> > > >
> > > > [Critical]
> > > > - FLINK-12736 ResourceManager may release TM with allocated slots
> > > > - FLINK-12889 Job keeps in FAILING state
> > > > - FLINK-13484 ConnectedComponents end-to-end test instable with
> > > > NoResourceAvailableException
> > > > - FLINK-13508 CommonTestUtils#waitUntilCondition() may attempt to
> sleep
> > > > with negative time
> > > > - FLINK-13806 Metric Fetcher floods the JM log with errors when TM is
> > > lost
> > > >
> > > > Furthermore, I think the following one blocker issue should be merged
> > > > before 1.8.2 release.
> > > >
> > > > - FLINK-13897: OSS FS NOTICE file is placed in wrong directory
> > > >
> > > > It would also be great if we can have the fix of Elasticsearch6.x
> > > connector
> > > > threads leaking (FLINK-13689) in 1.8.2 release which is identified as
> > > major.
> > > >
> > > > Please let me know what you think?
> > > >
> > > > Cheers,
> > > > Jincheng
> > > >
> > >
> >
>


[jira] [Created] (FLINK-13929) Revisit REST & JM URL

2019-08-30 Thread TisonKun (Jira)
TisonKun created FLINK-13929:


 Summary: Revisit REST & JM URL 
 Key: FLINK-13929
 URL: https://issues.apache.org/jira/browse/FLINK-13929
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration, Runtime / Coordination
Reporter: TisonKun


Currently we have several issues on URL(i.e., ADDRESS and PORT) configurations 
of REST(WebMonitor) and JM(DispatcherRMComponent).
 # Client side code should only retrieve REST PORT but for historical reasons 
we sometimes pass JM PORT. And this doesn't become a problem because some of 
them are unused while others JM PORT is incorrectly set with REST PORT value so 
we do incorrectly twice but conclude in success.
 # Generally speaking, back to the design of 
[FLIP-6|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077],
 there is no concept named {{WebMonitor}}. The responsibility to communicate 
with client is covered by {{Dispatcher}}. So it seems no argument to separate 
{{JobManagerOptions.ADDRESS}} and {{RestOptions.ADDRESS}}. Besides, we 
unfortunately use different PORT because REST server uses a netty connection 
while JM requires an actor system which has to bind to another port. 
Theoretically all message can be passed via the same port, either we handle 
REST requests in Akka scope or handle RPC in netty scope, so that this 
"two-port" requirement is hopefully not required then.
 # nit: Deprecated config {{WebOptions.PORT}} still in use at 
{{YarnEntrypointUtils.loadConfiguration}}. This should be easily resolved by 
replaced with {{RestOptions.PORT}}.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Releasing Flink 1.8.2

2019-08-30 Thread Jark Wu
Thanks Jincheng for bringing this up.

+1 to the 1.8.2 release, because it already contains a couple of important
fixes and it has been a long time since 1.8.1 came out.
I'm willing to help the community as much as possible. I'm wondering if I
can be the release manager of 1.8.2 or work with you together @Jincheng?

Best,
Jark

On Fri, 30 Aug 2019 at 18:58, Hequn Cheng  wrote:

> Hi Jincheng,
>
> +1 for a 1.8.2 release.
> Thanks a lot for raising the discussion. It would be nice to have these
> critical fixes.
>
> Best, Hequn
>
>
> On Fri, Aug 30, 2019 at 6:31 PM Maximilian Michels  wrote:
>
> > Hi Jincheng,
> >
> > +1 I would be for a 1.8.2 release such that we can fix the problems with
> > the nested closure cleaner which currently block 1.8.1 users with Beam:
> > https://issues.apache.org/jira/browse/FLINK-13367
> >
> > Thanks,
> > Max
> >
> > On 30.08.19 11:25, jincheng sun wrote:
> > > Hi Flink devs,
> > >
> > > It has been nearly 2 months since the 1.8.1 released. So, what do you
> > think
> > > about releasing Flink 1.8.2 soon?
> > >
> > > We already have some blocker and critical fixes in the release-1.8
> > branch:
> > >
> > > [Blocker]
> > > - FLINK-13159 java.lang.ClassNotFoundException when restore job
> > > - FLINK-10368 'Kerberized YARN on Docker test' unstable
> > > - FLINK-12578 Use secure URLs for Maven repositories
> > >
> > > [Critical]
> > > - FLINK-12736 ResourceManager may release TM with allocated slots
> > > - FLINK-12889 Job keeps in FAILING state
> > > - FLINK-13484 ConnectedComponents end-to-end test instable with
> > > NoResourceAvailableException
> > > - FLINK-13508 CommonTestUtils#waitUntilCondition() may attempt to sleep
> > > with negative time
> > > - FLINK-13806 Metric Fetcher floods the JM log with errors when TM is
> > lost
> > >
> > > Furthermore, I think the following one blocker issue should be merged
> > > before 1.8.2 release.
> > >
> > > - FLINK-13897: OSS FS NOTICE file is placed in wrong directory
> > >
> > > It would also be great if we can have the fix of Elasticsearch6.x
> > connector
> > > threads leaking (FLINK-13689) in 1.8.2 release which is identified as
> > major.
> > >
> > > Please let me know what you think?
> > >
> > > Cheers,
> > > Jincheng
> > >
> >
>


[jira] [Created] (FLINK-13928) Make windows api more extendable

2019-08-30 Thread zhihao zhang (Jira)
zhihao zhang created FLINK-13928:


 Summary: Make windows api more extendable
 Key: FLINK-13928
 URL: https://issues.apache.org/jira/browse/FLINK-13928
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.9.0
Reporter: zhihao zhang
 Fix For: 2.0.0


I'm creating my own TimeWindow those days, which extends from `TimeWindow`, but 
the new TimeWindow does not work well with existing windows API. 

For example, my own TimeWindow does not work with 
`DynamicEventTimeSessionWindows`, because 

{code:java}
public class DynamicEventTimeSessionWindows extends MergingWindowAssigner {}
{code}

`DynamicEventTimeSessionWindows` does not accept my TimeWindow.

So my proposal is 

{code:java}
public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {}
{code}


If this ticket is ok to go, I would like to take it.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13927) Add note about hadoop dependencies for local debug

2019-08-30 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-13927:
---

 Summary: Add note about hadoop dependencies for local debug
 Key: FLINK-13927
 URL: https://issues.apache.org/jira/browse/FLINK-13927
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem, Documentation, FileSystems
Reporter: Andrey Zagrebin
Assignee: Andrey Zagrebin


Currently if user tries to run the job locally (e.g. from IDE) and uses Hadoop 
fs, it will not work if hadoop dependencies are not on the class path which is 
the case for the example from the quick start.

We can add a hint about adding provided hadoop dependencies to:
[https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/hadoop.html]

and cross reference with:
[https://ci.apache.org/projects/flink/flink-docs-master/ops/filesystems/index.html#hadoop-configuration]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13926) `ProcessingTimeSessionWindows` and `EventTimeSessionWindows` should be generic

2019-08-30 Thread zhihao zhang (Jira)
zhihao zhang created FLINK-13926:


 Summary: `ProcessingTimeSessionWindows` and 
`EventTimeSessionWindows` should be generic 
 Key: FLINK-13926
 URL: https://issues.apache.org/jira/browse/FLINK-13926
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.9.0
Reporter: zhihao zhang
 Fix For: 2.0.0


`ProcessingTimeSessionWindows` and `EventTimeSessionWindows` should be generic 
just like `DynamicEventTimeSessionWindows` and 
`DynamicProcessingTimeSessionWindows`.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13925) ClassLoader in BlobLibraryCacheManager is not using context class loader

2019-08-30 Thread Jira
Jan Lukavský created FLINK-13925:


 Summary: ClassLoader in BlobLibraryCacheManager is not using 
context class loader
 Key: FLINK-13925
 URL: https://issues.apache.org/jira/browse/FLINK-13925
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.0, 1.8.1
Reporter: Jan Lukavský
 Fix For: 1.8.2, 1.9.1


Use thread's current context classloader as parent class loader of flink user 
code class loaders.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Flink client api enhancement for downstream project

2019-08-30 Thread Jeff Zhang
Awesome, @Kostas Looking forward your POC.

Kostas Kloudas  于2019年8月30日周五 下午8:33写道:

> Hi all,
>
> I am just writing here to let you know that I am working on a POC that
> tries to refactor the current state of job submission in Flink.
> I want to stress out that it introduces NO CHANGES to the current
> behaviour of Flink. It just re-arranges things and introduces the
> notion of an Executor, which is the entity responsible for taking the
> user-code and submitting it for execution.
>
> Given this, the discussion about the functionality that the JobClient
> will expose to the user can go on independently and the same
> holds for all the open questions so far.
>
> I hope I will have some more new to share soon.
>
> Thanks,
> Kostas
>
> On Mon, Aug 26, 2019 at 4:20 AM Yang Wang  wrote:
> >
> > Hi Zili,
> >
> > It make sense to me that a dedicated cluster is started for a per-job
> > cluster and will not accept more jobs.
> > Just have a question about the command line.
> >
> > Currently we could use the following commands to start different
> clusters.
> > *per-job cluster*
> > ./bin/flink run -d -p 5 -ynm perjob-cluster1 -m yarn-cluster
> > examples/streaming/WindowJoin.jar
> > *session cluster*
> > ./bin/flink run -p 5 -ynm session-cluster1 -m yarn-cluster
> > examples/streaming/WindowJoin.jar
> >
> > What will it look like after client enhancement?
> >
> >
> > Best,
> > Yang
> >
> > Zili Chen  于2019年8月23日周五 下午10:46写道:
> >
> > > Hi Till,
> > >
> > > Thanks for your update. Nice to hear :-)
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Till Rohrmann  于2019年8月23日周五 下午10:39写道:
> > >
> > > > Hi Tison,
> > > >
> > > > just a quick comment concerning the class loading issues when using
> the
> > > per
> > > > job mode. The community wants to change it so that the
> > > > StandaloneJobClusterEntryPoint actually uses the user code class
> loader
> > > > with child first class loading [1]. Hence, I hope that this problem
> will
> > > be
> > > > resolved soon.
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-13840
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Aug 23, 2019 at 2:47 PM Kostas Kloudas 
> > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > On the topic of web submission, I agree with Till that it only
> seems
> > > > > to complicate things.
> > > > > It is bad for security, job isolation (anybody can submit/cancel
> jobs),
> > > > > and its
> > > > > implementation complicates some parts of the code. So, if it were
> to
> > > > > redesign the
> > > > > WebUI, maybe this part could be left out. In addition, I would say
> > > > > that the ability to cancel
> > > > > jobs could also be left out.
> > > > >
> > > > > Also I would also be in favour of removing the "detached" mode, for
> > > > > the reasons mentioned
> > > > > above (i.e. because now we will have a future representing the
> result
> > > > > on which the user
> > > > > can choose to wait or not).
> > > > >
> > > > > Now for the separating job submission and cluster creation, I am in
> > > > > favour of keeping both.
> > > > > Once again, the reasons are mentioned above by Stephan, Till,
> Aljoscha
> > > > > and also Zili seems
> > > > > to agree. They mainly have to do with security, isolation and ease
> of
> > > > > resource management
> > > > > for the user as he knows that "when my job is done, everything
> will be
> > > > > cleared up". This is
> > > > > also the experience you get when launching a process on your local
> OS.
> > > > >
> > > > > On excluding the per-job mode from returning a JobClient or not, I
> > > > > believe that eventually
> > > > > it would be nice to allow users to get back a jobClient. The
> reason is
> > > > > that 1) I cannot
> > > > > find any objective reason why the user-experience should diverge,
> and
> > > > > 2) this will be the
> > > > > way that the user will be able to interact with his running job.
> > > > > Assuming that the necessary
> > > > > ports are open for the REST API to work, then I think that the
> > > > > JobClient can run against the
> > > > > REST API without problems. If the needed ports are not open, then
> we
> > > > > are safe to not return
> > > > > a JobClient, as the user explicitly chose to close all points of
> > > > > communication to his running job.
> > > > >
> > > > > On the topic of not hijacking the "env.execute()" in order to get
> the
> > > > > Plan, I definitely agree but
> > > > > for the proposal of having a "compile()" method in the env, I would
> > > > > like to have a better look at
> > > > > the existing code.
> > > > >
> > > > > Cheers,
> > > > > Kostas
> > > > >
> > > > > On Fri, Aug 23, 2019 at 5:52 AM Zili Chen 
> > > wrote:
> > > > > >
> > > > > > Hi Yang,
> > > > > >
> > > > > > It would be helpful if you check Stephan's last comment,
> > > > > > which states that isolation is important.
> > > > > >
> > > > > > For per-job mode, we run a dedicated cluster(maybe it
> > > > > > should have been a couple of JM and TMs during 

[jira] [Created] (FLINK-13924) Add summarizer and summary for sparse vector and dense vector.

2019-08-30 Thread Xu Yang (Jira)
Xu Yang created FLINK-13924:
---

 Summary: Add summarizer and summary for sparse vector and dense 
vector.
 Key: FLINK-13924
 URL: https://issues.apache.org/jira/browse/FLINK-13924
 Project: Flink
  Issue Type: Sub-task
  Components: Library / Machine Learning
Reporter: Xu Yang


Summarizer is the class for calculating statistics, summary is the result class 
of summarizer. Summary defines methods to get statistics. Assuming that the 
data has dense vector and sparse vector, vectors size are not equal also, so if 
DenseVectorSummarizer visit a sparse vector, it will change to 
SparseVectorSummarizer. 
Statistics include vectorSize, count, mean, variance, min, max, 
standardDeviation, normL1, normL2.
 * Add SparseVectorSummarizer which will calculate statistics for sparse vector.
 * Add SparseVectorSummary which can get statistics of sparse vector.
 * Add DenseVectorSummarizer which will calculate statistics for dense vector.
 * Add DenseVectorSummary which can get statistics of sparse vector.
 * Add StatisticsUtil which provides utility functions for summarizer and 
summary.
 * Add VectorSummarizerUtil which provides utility functions for 
VectorSummarizer.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: State of FLIPs

2019-08-30 Thread Dian Fu
Hi Chesnay,

Thanks a lot for the remind. FLIP-38 has been released in 1.9 and I have
updated the status in the wiki page.

Regards,
Dian

On Fri, Aug 30, 2019 at 9:38 PM Becket Qin  wrote:

> Hi Chesnay,
>
> You are right. FLIP-36 actually has not passed the vote yet. In fact some
> of the key designs may have to change due to the later code changes. I'll
> update the wiki and start a new vote.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Aug 30, 2019 at 8:44 PM Chesnay Schepler 
> wrote:
>
> > The following FLIPs are marked as "Under discussion" in the wiki
> > <
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >,
> > but actually seem to be in progress (i.e. have open pull requests) and
> some
> > even  have code merged to master:
> >
> >- FLIP-36 (Interactive Programming)
> >- FLIP-38 (Python Table API)
> >- FLIP-44 (Support Local Aggregation)
> >- FLIP-50 (Spill-able Heap Keyed State Backend)
> >
> > I would like to find out what the _actual_ state is, and then discuss how
> > we handle these FLIPs from now on (e.g., retcon history and mark them as
> > accepted, freeze further development until a vote, ...).
> >
> > I've cc'd all people who create the wiki pages for said FLIPs.
> >
> >
> >
>


Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-08-30 Thread Till Rohrmann
Hi Jan,

this looks to me like a bug for which you could create a JIRA and PR to fix
it. Just to make sure, I've pulled in Aljoscha who is the author of this
change to check with him whether we are forgetting something.

Cheers,
Till

On Fri, Aug 30, 2019 at 3:44 PM Jan Lukavský  wrote:

> Hi,
>
> I have come across an issue with classloading in Flink's MiniCluster.
> The issue is that when I run local flink job from a thread, that has a
> non-default context classloader (for whatever reason), this classloader
> is not taken into account when classloading user defined functions. This
> is due to [1]. Is this behavior intentional, or can I file a JIRA and
> use Thread.currentThread.getContextClassLoader() there? I have validated
> that it fixes issues I'm facing.
>
> Jan
>
> [1]
>
> https://github.com/apache/flink/blob/ce557839d762b5f1ec92aa1885fd3d2ae33d0d0b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java#L280
>
>


??????[SURVEY] Is the default restart delay of 0s causing problems?

2019-08-30 Thread ????????
Hi,


I thinks it's better to increase the default value. +1


Best.




--  --
??: "Till Rohrmann"; 
: 2019??8??30??(??) 10:07
??: "dev"; "user"; 
: [SURVEY] Is the default restart delay of 0s causing problems?



Hi everyone,

I wanted to reach out to you and ask whether decreasing the default delay
to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
user reported that he would like to increase the default value because it
can cause restart storms in case of systematic faults [2].

The downside of increasing the default delay would be a slightly increased
restart time if this config option is not explicitly set.

[1] https://issues.apache.org/jira/browse/FLINK-9158
[2] https://issues.apache.org/jira/browse/FLINK-11218

Cheers,
Till

Re: [DISCUSS] Simplify Flink's cluster level RestartStrategy configuration

2019-08-30 Thread Till Rohrmann
After an offline discussion with Stephan, we concluded that changing the
default restart strategy for batch jobs is not that easy because the
cluster level restart configuration does not necessarily know about the
type of job which is submitted. We concluded that we would like to keep the
batch behaviour as is (NoRestartStrategy) and revisit this issue at a later
point in time.

On Fri, Aug 30, 2019 at 3:24 PM Till Rohrmann  wrote:

> The current default behaviour for batch is `NoRestartStrategy` if nothing
> is configured. We could say that we set the default value of
> `restart-strategy` to `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
> independent of the checkpointing. The only downside I could see is that
> some faulty batch jobs might get stuck in a restart loop without reaching a
> terminal state.
>
> @Dawid, I don't intend to touch the ExecutionConfig. This change only
> targets the cluster level configuration of the RestartStrategy.
>
> Cheers,
> Till
>
> On Fri, Aug 30, 2019 at 3:14 PM Dawid Wysakowicz 
> wrote:
>
>> Also +1 in general.
>>
>> I have a few questions though:
>>
>> - does it only apply to the logic in
>>
>> org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory#createRestartStrategyFactory,
>> which is only the cluster side configuration? Or do you want to change
>> the logic also on the job side in ExecutionConfig?
>>
>> - if the latter, does that mean deprecated methods in ExecutionConfig
>> like: setNumberOfExecutionRetries, setExecutionRetryDelay will have no
>> effect? I think this would be a good idea, but would suggest to remove
>> the corresponding fields and methods. This is not that simple though. I
>> tried to do that for other parameters that have no effect already like
>> codeAnalysisMode & failTaskOnCheckpointError. The are two problems:
>>
>> 1) setNumberOfExecutionRetires are effectively marked with @Public
>> annotation (the codeAnalysisMode & failTaskOnCheckpointError don't have
>> this problem). Therefore this would be a binary incompatible change.
>>
>> 2) ExecutionConfig is stored in state as part of PojoSerializer in
>> pre flink 1.7. It should not be a problem for numberOfExecutionRetries &
>> executionRetryDelays as they are of primitive types. It is a problem for
>> codeAnalysisMode (we cannot remove the class, as this breaks
>> serialization). I wanted to mention that anyway, just to be aware of that.
>>
>> Best,
>>
>> Dawid
>>
>> On 30/08/2019 14:48, Stephan Ewen wrote:
>> > +1 in general
>> >
>> > What is the default in batch, though? No restarts? I always found that
>> > somewhat uncommon.
>> > Should we also change that part, if we are changing the default anyways?
>> >
>> >
>> > On Fri, Aug 30, 2019 at 2:35 PM Till Rohrmann 
>> wrote:
>> >
>> >> Hi everyone,
>> >>
>> >> I wanted to discuss how to simplify Flink's cluster level
>> RestartStrategy
>> >> configuration [1]. Currently, Flink's behaviour with respect to
>> configuring
>> >> the {{RestartStrategies}} is quite complicated and convoluted. The
>> reason
>> >> for this is that we evolved the way it has been configured and wanted
>> to
>> >> keep it backwards compatible. Due to this, we have currently the
>> following
>> >> behaviour:
>> >>
>> >> * If the config option `restart-strategy` is configured, then Flink
>> uses
>> >> this `RestartStrategy` (so far so simple)
>> >> * If the config option `restart-strategy` is not configured, then
>> >> ** If `restart-strategy.fixed-delay.attempts` or
>> >> `restart-strategy.fixed-delay.delay` are defined, then instantiate
>> >> `FixedDelayRestartStrategy(restart-strategy.fixed-delay.attempts,
>> >> restart-strategy.fixed-delay.delay)`
>> >> ** If `restart-strategy.fixed-delay.attempts` and
>> >> `restart-strategy.fixed-delay.delay` are not defined, then
>> >> *** If checkpointing is disabled, then choose `NoRestartStrategy`
>> >> *** If checkpointing is enabled, then choose
>> >> `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
>> >>
>> >> I would like to simplify the configuration by removing the "If
>> >> `restart-strategy.fixed-delay.attempts` or
>> >> `restart-strategy.fixed-delay.delay`, then" condition. That way, the
>> logic
>> >> would be the following:
>> >>
>> >> * If the config option `restart-strategy` is configured, then Flink
>> uses
>> >> this `RestartStrategy`
>> >> * If the config option `restart-strategy` is not configured, then
>> >> ** If checkpointing is disabled, then choose `NoRestartStrategy`
>> >> ** If checkpointing is enabled, then choose
>> >> `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
>> >>
>> >> That way we retain the user friendliness that jobs restart if the user
>> >> enabled checkpointing and we make it clear that any `
>> >> restart-strategy.fixed-delay.xyz` setting will only be respected if
>> >> `restart-strategy` has been set to `fixed-delay`.
>> >>
>> >> This simplification would, however, change Flink's behaviour and might
>> >> break existing setups. Since we 

[SURVEY] Is the default restart delay of 0s causing problems?

2019-08-30 Thread Till Rohrmann
Hi everyone,

I wanted to reach out to you and ask whether decreasing the default delay
to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
user reported that he would like to increase the default value because it
can cause restart storms in case of systematic faults [2].

The downside of increasing the default delay would be a slightly increased
restart time if this config option is not explicitly set.

[1] https://issues.apache.org/jira/browse/FLINK-9158
[2] https://issues.apache.org/jira/browse/FLINK-11218

Cheers,
Till


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-30 Thread Timo Walther

Hi Becket,

1. First of all, you are totally right. The FLIP contains a bug due to 
the last minute changes that Dawid suggested: by having immutable 
objects created by a factory we loose the serializability of the 
Configuration because the factory itself is not stored in the 
Configuration. I would propose to revert the last change and stick to 
the original design, which means that a object must implement the 
Configurable interface and also implements serialization/deserialization 
methods such that also internal fields can be persisted as you 
suggested. But in general, people should not use any internal fields. 
Configurable objects are meant for simple little helper POJOs, not 
complex arbitrary nested data structures.


It is Map because Configuration stores the raw objects. 
If you put a Boolean option into it, it remains Boolean. This makes the 
map very efficient for shipping to the cluster and accessing options 
multiple times. The same for configurable objects. We put the pure 
objects into the map without any serialization/deserialization. The 
provided factory allows to convert the Object into a Configuration and 
we know how to serialize/deserializise a configuration because it is 
just a key/value map.


2. Yes, this is what we had in mind. It should still be the same 
configuration option. We would like to avoid specialized option keys 
across components (exec.max-para and table.exec.max-para) if they are 
describing basically the same thing. But adding some more description 
like "TableOptions.MAX_PARALLELISM with description_1 + description_2" 
does not hurt.


3. They should restore the original object given that the 
toConfiguration/fromConfiguration methods have been implemented 
correctly. I will extend the example to make the logic clearer while 
fixing the bug.


Thanks for the healthy discussion,
Timo


On 30.08.19 15:29, Becket Qin wrote:

Hi Timo,

Thanks again for the clarification. Please see a few more questions below.

Re: 1


Please also keep in mind that Configuration must not consist of only
strings, it manages a Map for efficient access. Every
map entry can have a string representation for persistence, but in most
cases consists of unserialized objects.


I'd like to understand this a bit more. The reason we have a Map in Configuration was because that Object could be either a String,
a List or a Map, right? But they eventually always boil down to Strings, or
maybe one of the predefined type that we know how to serialize. In the
current design, can the Object also be Configurable?
If the value in the config Map can be Configurable objects,
how do we serialize them? Calling toConfiguration() seems not quite working
because there might be some other internal fields that are not part of the
configuration. The modification to those fields will be lost if we simply
use toConfiguration(). So the impact of modifying those Configurable
objects seems a little undefined. And it would be difficult to prevent
users from doing that.
If the value in the config Map cannot be Configurable
objects, then it seems a little weird to have all the other ConfigType
stored in the ConfigMap in their own native type and accessed via
getInteger() / getBoolean(), etc, while having ConfigurableType to be
different from others because one have to use ConfigurableFactory to get
the configurations.

Re: 2


I think about the withExtendedDescription as a helper getter in a
different place, so that the option is easier to find in a different
module from it was defined.
The MAX_PARALLELISM option in TableOptions would conceptually be equal to:
public ConfigOption getMaxParallelismOption() {
 return CoreOptions.MAX_PARALLELISM;
}

Just to make sure I understand it correctly, does that mean users will see
something like following?
  - CoreOptions.MAX_PARALLELISM with description_1;
  - TableOptions.MAX_PARALLELISM with description_1 + description_2.
  - DataStreamOptions.MAX_PARALLELISM with description_1 + description_3.
And users will only configure exactly one MAX_PARALLELISM cross the board.
So they won't be confused by setting two MAX_PARALLELISM config for two
different modules, while they are actually the same config. If that is the
case, I don't have further concern.

Re: 3
Maybe I am thinking too much. I thought toBytes() / fromBytes() actually
restore the original Object. But fromConfiguration() and toConfiguration()
does not do that, anything not in the configuration of the original object
will be lost. So it would be good to make that clear. Maybe a clear Java
doc is sufficient.

Thanks,

Jiangjie (Becket) Qin

On Fri, Aug 30, 2019 at 4:08 PM Dawid Wysakowicz 
wrote:


Hi,

Ad. 1

The advantage of our approach is that you have the type definition close
to the option definition. The only difference is that it enables
expressing simple pojos with the primitives like ConfigOption,
ConfigOption etc. Otherwise as Timo said you will start having

the parsing logic scattered everywhere in the 

ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-08-30 Thread Jan Lukavský

Hi,

I have come across an issue with classloading in Flink's MiniCluster. 
The issue is that when I run local flink job from a thread, that has a 
non-default context classloader (for whatever reason), this classloader 
is not taken into account when classloading user defined functions. This 
is due to [1]. Is this behavior intentional, or can I file a JIRA and 
use Thread.currentThread.getContextClassLoader() there? I have validated 
that it fixes issues I'm facing.


Jan

[1] 
https://github.com/apache/flink/blob/ce557839d762b5f1ec92aa1885fd3d2ae33d0d0b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java#L280




Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-30 Thread Becket Qin
Hi Timo,

Thanks again for the clarification. Please see a few more questions below.

Re: 1

> Please also keep in mind that Configuration must not consist of only
> strings, it manages a Map for efficient access. Every
> map entry can have a string representation for persistence, but in most
> cases consists of unserialized objects.


I'd like to understand this a bit more. The reason we have a Map in Configuration was because that Object could be either a String,
a List or a Map, right? But they eventually always boil down to Strings, or
maybe one of the predefined type that we know how to serialize. In the
current design, can the Object also be Configurable?
If the value in the config Map can be Configurable objects,
how do we serialize them? Calling toConfiguration() seems not quite working
because there might be some other internal fields that are not part of the
configuration. The modification to those fields will be lost if we simply
use toConfiguration(). So the impact of modifying those Configurable
objects seems a little undefined. And it would be difficult to prevent
users from doing that.
If the value in the config Map cannot be Configurable
objects, then it seems a little weird to have all the other ConfigType
stored in the ConfigMap in their own native type and accessed via
getInteger() / getBoolean(), etc, while having ConfigurableType to be
different from others because one have to use ConfigurableFactory to get
the configurations.

Re: 2

> I think about the withExtendedDescription as a helper getter in a
> different place, so that the option is easier to find in a different
> module from it was defined.
> The MAX_PARALLELISM option in TableOptions would conceptually be equal to:
> public ConfigOption getMaxParallelismOption() {
> return CoreOptions.MAX_PARALLELISM;
> }

Just to make sure I understand it correctly, does that mean users will see
something like following?
 - CoreOptions.MAX_PARALLELISM with description_1;
 - TableOptions.MAX_PARALLELISM with description_1 + description_2.
 - DataStreamOptions.MAX_PARALLELISM with description_1 + description_3.
And users will only configure exactly one MAX_PARALLELISM cross the board.
So they won't be confused by setting two MAX_PARALLELISM config for two
different modules, while they are actually the same config. If that is the
case, I don't have further concern.

Re: 3
Maybe I am thinking too much. I thought toBytes() / fromBytes() actually
restore the original Object. But fromConfiguration() and toConfiguration()
does not do that, anything not in the configuration of the original object
will be lost. So it would be good to make that clear. Maybe a clear Java
doc is sufficient.

Thanks,

Jiangjie (Becket) Qin

On Fri, Aug 30, 2019 at 4:08 PM Dawid Wysakowicz 
wrote:

> Hi,
>
> Ad. 1
>
> The advantage of our approach is that you have the type definition close
> to the option definition. The only difference is that it enables
> expressing simple pojos with the primitives like ConfigOption,
> ConfigOption etc. Otherwise as Timo said you will start having
>
> the parsing logic scattered everywhere in the code base as it is now.
> The string representation in our proposal is exactly the same as you
> explained for those three options. The only difference is that you don't
> have to parse the elements of a List, Map etc. afterwards.
>
> Ad. 2
>
> I think about the withExtendedDescription as a helper getter in a
> different place, so that the option is easier to find in a different
> module from it was defined.
>
> The MAX_PARALLELISM option in TableOptions would conceptually be equal to:
>
> public ConfigOption getMaxParallelismOption() {
>
> return CoreOptions.MAX_PARALLELISM;
>
> }
>
> This allows to further clarify the description of the option in the
> context of a different module and end up in a seperate page in
> documentation (but with a link to the original one). In the end it is
> exactly the same option. It has exactly same key, type, parsing logic,
> it is in the end forwarded to the same place.
>
> Ad. 3
>
> Not sure if I understand your concerns here. As Timo said it is in the
> end sth similar to toBytes/fromBytes, but it puts itself to a
> Configuration. Also just wanted to make sure we adjusted this part
> slightly and now the ConfigOption takes ConfigurableFactory.
>
> Best,
>
> Dawid
>
>
> On 30/08/2019 09:39, Timo Walther wrote:
> > Hi Becket,
> >
> > thanks for the discussion.
> >
> > 1. ConfigOptions in their current design are bound to classes.
> > Regarding, the option is "creating some Configurable objects instead
> > of defining the config to create
> > those Configurable"? We just moved this logic to a factory, this
> > factory can then also be used for other purposes. However, how the
> > option and objects are serialized to Configuration is still not part
> > of the option. The option is just pure declaration.
> >
> > If we would allow only List, implementers would need to start
> > implementing own parsing 

Re: [DISCUSS] Simplify Flink's cluster level RestartStrategy configuration

2019-08-30 Thread Till Rohrmann
The current default behaviour for batch is `NoRestartStrategy` if nothing
is configured. We could say that we set the default value of
`restart-strategy` to `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
independent of the checkpointing. The only downside I could see is that
some faulty batch jobs might get stuck in a restart loop without reaching a
terminal state.

@Dawid, I don't intend to touch the ExecutionConfig. This change only
targets the cluster level configuration of the RestartStrategy.

Cheers,
Till

On Fri, Aug 30, 2019 at 3:14 PM Dawid Wysakowicz 
wrote:

> Also +1 in general.
>
> I have a few questions though:
>
> - does it only apply to the logic in
>
> org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory#createRestartStrategyFactory,
> which is only the cluster side configuration? Or do you want to change
> the logic also on the job side in ExecutionConfig?
>
> - if the latter, does that mean deprecated methods in ExecutionConfig
> like: setNumberOfExecutionRetries, setExecutionRetryDelay will have no
> effect? I think this would be a good idea, but would suggest to remove
> the corresponding fields and methods. This is not that simple though. I
> tried to do that for other parameters that have no effect already like
> codeAnalysisMode & failTaskOnCheckpointError. The are two problems:
>
> 1) setNumberOfExecutionRetires are effectively marked with @Public
> annotation (the codeAnalysisMode & failTaskOnCheckpointError don't have
> this problem). Therefore this would be a binary incompatible change.
>
> 2) ExecutionConfig is stored in state as part of PojoSerializer in
> pre flink 1.7. It should not be a problem for numberOfExecutionRetries &
> executionRetryDelays as they are of primitive types. It is a problem for
> codeAnalysisMode (we cannot remove the class, as this breaks
> serialization). I wanted to mention that anyway, just to be aware of that.
>
> Best,
>
> Dawid
>
> On 30/08/2019 14:48, Stephan Ewen wrote:
> > +1 in general
> >
> > What is the default in batch, though? No restarts? I always found that
> > somewhat uncommon.
> > Should we also change that part, if we are changing the default anyways?
> >
> >
> > On Fri, Aug 30, 2019 at 2:35 PM Till Rohrmann 
> wrote:
> >
> >> Hi everyone,
> >>
> >> I wanted to discuss how to simplify Flink's cluster level
> RestartStrategy
> >> configuration [1]. Currently, Flink's behaviour with respect to
> configuring
> >> the {{RestartStrategies}} is quite complicated and convoluted. The
> reason
> >> for this is that we evolved the way it has been configured and wanted to
> >> keep it backwards compatible. Due to this, we have currently the
> following
> >> behaviour:
> >>
> >> * If the config option `restart-strategy` is configured, then Flink uses
> >> this `RestartStrategy` (so far so simple)
> >> * If the config option `restart-strategy` is not configured, then
> >> ** If `restart-strategy.fixed-delay.attempts` or
> >> `restart-strategy.fixed-delay.delay` are defined, then instantiate
> >> `FixedDelayRestartStrategy(restart-strategy.fixed-delay.attempts,
> >> restart-strategy.fixed-delay.delay)`
> >> ** If `restart-strategy.fixed-delay.attempts` and
> >> `restart-strategy.fixed-delay.delay` are not defined, then
> >> *** If checkpointing is disabled, then choose `NoRestartStrategy`
> >> *** If checkpointing is enabled, then choose
> >> `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
> >>
> >> I would like to simplify the configuration by removing the "If
> >> `restart-strategy.fixed-delay.attempts` or
> >> `restart-strategy.fixed-delay.delay`, then" condition. That way, the
> logic
> >> would be the following:
> >>
> >> * If the config option `restart-strategy` is configured, then Flink uses
> >> this `RestartStrategy`
> >> * If the config option `restart-strategy` is not configured, then
> >> ** If checkpointing is disabled, then choose `NoRestartStrategy`
> >> ** If checkpointing is enabled, then choose
> >> `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
> >>
> >> That way we retain the user friendliness that jobs restart if the user
> >> enabled checkpointing and we make it clear that any `
> >> restart-strategy.fixed-delay.xyz` setting will only be respected if
> >> `restart-strategy` has been set to `fixed-delay`.
> >>
> >> This simplification would, however, change Flink's behaviour and might
> >> break existing setups. Since we introduced `RestartStrategies` with
> Flink
> >> 1.0.0 and deprecated the prior configuration mechanism which enables
> >> restarting if either the `attempts` or the `delay` has been set, I think
> >> that the number of broken jobs should be minimal if not non-existent.
> >>
> >> I'm sure that one can simplify the way RestartStrategies are
> >> programmatically configured as well but for the sake of
> simplicity/scoping
> >> I'd like to not touch it right away.
> >>
> >> What do you think about this behaviour change?
> >>
> >> [1] 

Re: [DISCUSS] Simplify Flink's cluster level RestartStrategy configuration

2019-08-30 Thread Dawid Wysakowicz
Also +1 in general.

I have a few questions though:

- does it only apply to the logic in
org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory#createRestartStrategyFactory,
which is only the cluster side configuration? Or do you want to change
the logic also on the job side in ExecutionConfig?

- if the latter, does that mean deprecated methods in ExecutionConfig
like: setNumberOfExecutionRetries, setExecutionRetryDelay will have no
effect? I think this would be a good idea, but would suggest to remove
the corresponding fields and methods. This is not that simple though. I
tried to do that for other parameters that have no effect already like
codeAnalysisMode & failTaskOnCheckpointError. The are two problems:

    1) setNumberOfExecutionRetires are effectively marked with @Public
annotation (the codeAnalysisMode & failTaskOnCheckpointError don't have
this problem). Therefore this would be a binary incompatible change.

    2) ExecutionConfig is stored in state as part of PojoSerializer in
pre flink 1.7. It should not be a problem for numberOfExecutionRetries &
executionRetryDelays as they are of primitive types. It is a problem for
codeAnalysisMode (we cannot remove the class, as this breaks
serialization). I wanted to mention that anyway, just to be aware of that.

Best,

Dawid

On 30/08/2019 14:48, Stephan Ewen wrote:
> +1 in general
>
> What is the default in batch, though? No restarts? I always found that
> somewhat uncommon.
> Should we also change that part, if we are changing the default anyways?
>
>
> On Fri, Aug 30, 2019 at 2:35 PM Till Rohrmann  wrote:
>
>> Hi everyone,
>>
>> I wanted to discuss how to simplify Flink's cluster level RestartStrategy
>> configuration [1]. Currently, Flink's behaviour with respect to configuring
>> the {{RestartStrategies}} is quite complicated and convoluted. The reason
>> for this is that we evolved the way it has been configured and wanted to
>> keep it backwards compatible. Due to this, we have currently the following
>> behaviour:
>>
>> * If the config option `restart-strategy` is configured, then Flink uses
>> this `RestartStrategy` (so far so simple)
>> * If the config option `restart-strategy` is not configured, then
>> ** If `restart-strategy.fixed-delay.attempts` or
>> `restart-strategy.fixed-delay.delay` are defined, then instantiate
>> `FixedDelayRestartStrategy(restart-strategy.fixed-delay.attempts,
>> restart-strategy.fixed-delay.delay)`
>> ** If `restart-strategy.fixed-delay.attempts` and
>> `restart-strategy.fixed-delay.delay` are not defined, then
>> *** If checkpointing is disabled, then choose `NoRestartStrategy`
>> *** If checkpointing is enabled, then choose
>> `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
>>
>> I would like to simplify the configuration by removing the "If
>> `restart-strategy.fixed-delay.attempts` or
>> `restart-strategy.fixed-delay.delay`, then" condition. That way, the logic
>> would be the following:
>>
>> * If the config option `restart-strategy` is configured, then Flink uses
>> this `RestartStrategy`
>> * If the config option `restart-strategy` is not configured, then
>> ** If checkpointing is disabled, then choose `NoRestartStrategy`
>> ** If checkpointing is enabled, then choose
>> `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
>>
>> That way we retain the user friendliness that jobs restart if the user
>> enabled checkpointing and we make it clear that any `
>> restart-strategy.fixed-delay.xyz` setting will only be respected if
>> `restart-strategy` has been set to `fixed-delay`.
>>
>> This simplification would, however, change Flink's behaviour and might
>> break existing setups. Since we introduced `RestartStrategies` with Flink
>> 1.0.0 and deprecated the prior configuration mechanism which enables
>> restarting if either the `attempts` or the `delay` has been set, I think
>> that the number of broken jobs should be minimal if not non-existent.
>>
>> I'm sure that one can simplify the way RestartStrategies are
>> programmatically configured as well but for the sake of simplicity/scoping
>> I'd like to not touch it right away.
>>
>> What do you think about this behaviour change?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13921
>>
>> Cheers,
>> Till
>>



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] FLIP-59: Enable execution configuration from Configuration object

2019-08-30 Thread Gyula Fóra
Hi Dawid,

Sorry I misread one of the interfaces a little (Configuration instead of
ConfigurationReader), you are right.
I was referring to:


   -

   void StreamExecutionEnvironment.configure(ConfigurationReader)


This might be slightly orthogonal to the changes that you made here but
what I meant is that instead of adding methods to the
StreamExecutionEnvironment we could make this an external interface:

EnvironmentConfigurer {
  void configure(StreamExecutionEnvironment, ConfigurationReader)
}

We could then have a default implementation of the EnvironmentConfigurer
that would understand built in options.  We could also allow users to pass
custom implementations of this, which could configure the
StreamExecutionEnvironment based on user defined config options. This is
just a rough idea for extensibility and probably out of scope at first.

Cheers,
Gyula

On Fri, Aug 30, 2019 at 12:13 PM Dawid Wysakowicz 
wrote:

> Hi Gyula,
>
> Thank you for the support on those changes.
>
> I am not sure if I understood your idea for the "reconfiguration" logic.
>
> The configure method on those objects would take ConfigurationReader. So
> user can provide a thin wrapper around Configuration for e.g. filtering
> certain logic, changing values based on other parameters etc. Is that
> what you had in mind?
>
> Best,
>
> Dawid
>
> On 29/08/2019 19:21, Gyula Fóra wrote:
> > Hi!
> >
> > Huuuge +1 from me, this has been an operational pain for years.
> > This would also introduce a nice and simple way to extend it in the
> future
> > if we need.
> >
> > Ship it!
> >
> > Gyula
> >
> > On Thu, Aug 29, 2019 at 5:05 PM Dawid Wysakowicz  >
> > wrote:
> >
> >> Hi,
> >>
> >> I wanted to propose a new, additional way of configuring execution
> >> parameters that can currently be set only on such objects like
> >> ExecutionConfig, CheckpointConfig and StreamExecutionEnvironment. This
> >> poses problems such as:
> >>
> >>- no easy way to configure those from a file
> >>- there is no easy way to pass a configuration from layers built on
> >>top of StreamExecutionEnvironment. (e.g. when we want to configure
> those
> >>options from TableEnvironment)
> >>- they are not automatically documented
> >>
> >> Note that there are a few concepts from FLIP-54[1] that this FLIP is
> based
> >> on.
> >>
> >> Would be really grateful to know if you think this would be a valuable
> >> addition and any other feedback.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> Wiki page:
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-59%3A+Enable+execution+configuration+from+Configuration+object
> >>
> >> Google doc:
> >>
> https://docs.google.com/document/d/1l8jW2NjhwHH1mVPbLvFolnL2vNvf4buUMDZWMfN_hFM/edit?usp=sharing
> >>
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> >>
> >>
> >>
>
>


Re: instable checkpointing after migration to flink 1.8

2019-08-30 Thread Stephan Ewen
Hi all!

A thought would be that this has something to do with timers. Does the task
with that behavior use timers (windows, or process function)?

If that is the case, some theories to check:
  - Could it be a timer firing storm coinciding with a checkpoint?
Currently, that storm synchronously fires, checkpoints cannot preempt that,
which should change in 1.10 with the new mailbox model.
  - Could the timer-async checkpointing changes have something to do with
that? Does some of the usually small "preparation work" (happening
synchronously) lead to an unwanted effect?
  - Are you using TTL for state in that operator?
  - There were some changes made to support timers in RocksDB recently.
Could that have something to do with it?

Best,
Stephan


On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu  wrote:

> CC flink dev mail list
> Update for those who may be interested in this issue, we'are still
> diagnosing this problem currently.
>
> Best,
> Congxian
>
>
> Congxian Qiu  于2019年8月29日周四 下午8:58写道:
>
> > Hi Bekir
> >
> > Currently, from what we have diagnosed, there is some task complete its
> > checkpoint too late (maybe 15 mins), but we checked the kafka broker log
> > and did not find any interesting things there. could we run another job,
> > that did not commit offset to kafka, this wants to check if it is the
> > "commit offset to kafka" step consumes too much time.
> >
> > Best,
> > Congxian
> >
> >
> > Bekir Oguz  于2019年8月28日周三 下午4:19写道:
> >
> >> Hi Congxian,
> >> sorry for the late reply, but no progress on this issue yet. I checked
> >> also the kafka broker logs, found nothing interesting there.
> >> And we still have 15 min duration checkpoints quite often. Any more
> ideas
> >> on where to look at?
> >>
> >> Regards,
> >> Bekir
> >>
> >> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu 
> >> wrote:
> >>
> >>> Hi Bekir
> >>>
> >>> Do you come back to work now, does there any more findings of this
> >>> problem?
> >>>
> >>> Best,
> >>> Congxian
> >>>
> >>>
> >>> Bekir Oguz  于2019年8月13日周二 下午4:39写道:
> >>>
>  Hi Congxian,
>  Thanks for following up this issue. It is still unresolved and I am on
>  vacation at the moment.  Hopefully my collegues Niels and Vlad can
> spare
>  some time to look into this.
> 
>  @Niels, @Vlad: do you guys also think that this issue might be Kafka
>  related? We could also check kafka broker logs at the time of long
>  checkpointing.
> 
>  Thanks,
>  Bekir
> 
>  Verstuurd vanaf mijn iPhone
> 
>  Op 12 aug. 2019 om 15:18 heeft Congxian Qiu 
>  het volgende geschreven:
> 
>  Hi Bekir
> 
>  Is there any progress about this problem?
> 
>  Best,
>  Congxian
> 
> 
>  Congxian Qiu  于2019年8月8日周四 下午11:17写道:
> 
> > hi Bekir
> > Thanks for the information.
> >
> > - Source's checkpoint was triggered by RPC calls, so it has the
> > "Trigger checkpoint xxx" log,
> > - other task's checkpoint was triggered after received all the
> barrier
> > of upstreams, it didn't log the "Trigger checkpoint XXX" :(
> >
> > Your diagnose makes sense to me, we need to check the Kafka log.
> > I also find out that we always have a log like
> > "org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> Marking
> > the coordinator 172.19.200.73:9092 (id: 2147483646 rack: null) dead
> > for group userprofileaggregator
> > 2019-08-06 13:58:49,872 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask   -
> Notifica",
> >
> > I checked the doc of kafka[1], only find that the default of `
> > transaction.max.timeout.ms` is 15 min
> >
> > Please let me know there you have any finds. thanks
> >
> > PS: maybe you can also checkpoint the log for task
> > `d0aa98767c852c97ae8faf70a54241e3`, JM received its ack message late
> also.
> >
> > [1] https://kafka.apache.org/documentation/
> > Best,
> > Congxian
> >
> >
> > Bekir Oguz  于2019年8月7日周三 下午6:48写道:
> >
> >> Hi Congxian,
> >> Thanks for checking the logs. What I see from the logs is:
> >>
> >> - For the tasks like "Source:
> >> profileservice-snowplow-clean-events_kafka_source -> Filter” {17,
> 27, 31,
> >> 33, 34} / 70 : We have the ’Triggering checkpoint’ and also ‘Confirm
> >> checkpoint’ log lines, with 15 mins delay in between.
> >> - For the tasks like “KeyedProcess -> (Sink:
> >> profileservice-userprofiles_kafka_sink, Sink:
> >> profileservice-userprofiles_kafka_deletion_marker, Sink:
> >> profileservice-profiledeletion_kafka_sink” {1,2,3,4,5}/70 : We DO
> NOT have
> >> the “Triggering checkpoint” log, but only the ‘Confirm checkpoint’
> lines.
> >>
> >> And as a final point, we ALWAYS have Kafka AbstractCoordinator logs
> >> about lost connection to Kafka at the same time we have the
> checkpoints
> >> confirmed. This 15 minutes delay might be because 

[jira] [Created] (FLINK-13922) Support anchors for restart strategies in Chinese documentation

2019-08-30 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-13922:
-

 Summary: Support anchors for restart strategies in Chinese 
documentation
 Key: FLINK-13922
 URL: https://issues.apache.org/jira/browse/FLINK-13922
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.10.0
Reporter: Till Rohrmann
 Fix For: 1.10.0


In order to support anchors in the documentation for the different restart 
strategies, I propose to change the Chinese headings in 
{{task_failure_recovery.zh.md}} back to its English form.

This change can be reverted once we have resolved FLINK-13909.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Simplify Flink's cluster level RestartStrategy configuration

2019-08-30 Thread Stephan Ewen
+1 in general

What is the default in batch, though? No restarts? I always found that
somewhat uncommon.
Should we also change that part, if we are changing the default anyways?


On Fri, Aug 30, 2019 at 2:35 PM Till Rohrmann  wrote:

> Hi everyone,
>
> I wanted to discuss how to simplify Flink's cluster level RestartStrategy
> configuration [1]. Currently, Flink's behaviour with respect to configuring
> the {{RestartStrategies}} is quite complicated and convoluted. The reason
> for this is that we evolved the way it has been configured and wanted to
> keep it backwards compatible. Due to this, we have currently the following
> behaviour:
>
> * If the config option `restart-strategy` is configured, then Flink uses
> this `RestartStrategy` (so far so simple)
> * If the config option `restart-strategy` is not configured, then
> ** If `restart-strategy.fixed-delay.attempts` or
> `restart-strategy.fixed-delay.delay` are defined, then instantiate
> `FixedDelayRestartStrategy(restart-strategy.fixed-delay.attempts,
> restart-strategy.fixed-delay.delay)`
> ** If `restart-strategy.fixed-delay.attempts` and
> `restart-strategy.fixed-delay.delay` are not defined, then
> *** If checkpointing is disabled, then choose `NoRestartStrategy`
> *** If checkpointing is enabled, then choose
> `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
>
> I would like to simplify the configuration by removing the "If
> `restart-strategy.fixed-delay.attempts` or
> `restart-strategy.fixed-delay.delay`, then" condition. That way, the logic
> would be the following:
>
> * If the config option `restart-strategy` is configured, then Flink uses
> this `RestartStrategy`
> * If the config option `restart-strategy` is not configured, then
> ** If checkpointing is disabled, then choose `NoRestartStrategy`
> ** If checkpointing is enabled, then choose
> `FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`
>
> That way we retain the user friendliness that jobs restart if the user
> enabled checkpointing and we make it clear that any `
> restart-strategy.fixed-delay.xyz` setting will only be respected if
> `restart-strategy` has been set to `fixed-delay`.
>
> This simplification would, however, change Flink's behaviour and might
> break existing setups. Since we introduced `RestartStrategies` with Flink
> 1.0.0 and deprecated the prior configuration mechanism which enables
> restarting if either the `attempts` or the `delay` has been set, I think
> that the number of broken jobs should be minimal if not non-existent.
>
> I'm sure that one can simplify the way RestartStrategies are
> programmatically configured as well but for the sake of simplicity/scoping
> I'd like to not touch it right away.
>
> What do you think about this behaviour change?
>
> [1] https://issues.apache.org/jira/browse/FLINK-13921
>
> Cheers,
> Till
>


Re: instable checkpointing after migration to flink 1.8

2019-08-30 Thread Congxian Qiu
CC flink dev mail list
Update for those who may be interested in this issue, we'are still
diagnosing this problem currently.

Best,
Congxian


Congxian Qiu  于2019年8月29日周四 下午8:58写道:

> Hi Bekir
>
> Currently, from what we have diagnosed, there is some task complete its
> checkpoint too late (maybe 15 mins), but we checked the kafka broker log
> and did not find any interesting things there. could we run another job,
> that did not commit offset to kafka, this wants to check if it is the
> "commit offset to kafka" step consumes too much time.
>
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年8月28日周三 下午4:19写道:
>
>> Hi Congxian,
>> sorry for the late reply, but no progress on this issue yet. I checked
>> also the kafka broker logs, found nothing interesting there.
>> And we still have 15 min duration checkpoints quite often. Any more ideas
>> on where to look at?
>>
>> Regards,
>> Bekir
>>
>> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu 
>> wrote:
>>
>>> Hi Bekir
>>>
>>> Do you come back to work now, does there any more findings of this
>>> problem?
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Bekir Oguz  于2019年8月13日周二 下午4:39写道:
>>>
 Hi Congxian,
 Thanks for following up this issue. It is still unresolved and I am on
 vacation at the moment.  Hopefully my collegues Niels and Vlad can spare
 some time to look into this.

 @Niels, @Vlad: do you guys also think that this issue might be Kafka
 related? We could also check kafka broker logs at the time of long
 checkpointing.

 Thanks,
 Bekir

 Verstuurd vanaf mijn iPhone

 Op 12 aug. 2019 om 15:18 heeft Congxian Qiu 
 het volgende geschreven:

 Hi Bekir

 Is there any progress about this problem?

 Best,
 Congxian


 Congxian Qiu  于2019年8月8日周四 下午11:17写道:

> hi Bekir
> Thanks for the information.
>
> - Source's checkpoint was triggered by RPC calls, so it has the
> "Trigger checkpoint xxx" log,
> - other task's checkpoint was triggered after received all the barrier
> of upstreams, it didn't log the "Trigger checkpoint XXX" :(
>
> Your diagnose makes sense to me, we need to check the Kafka log.
> I also find out that we always have a log like
> "org.apache.kafka.clients.consumer.internals.AbstractCoordinator  Marking
> the coordinator 172.19.200.73:9092 (id: 2147483646 rack: null) dead
> for group userprofileaggregator
> 2019-08-06 13:58:49,872 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Notifica",
>
> I checked the doc of kafka[1], only find that the default of `
> transaction.max.timeout.ms` is 15 min
>
> Please let me know there you have any finds. thanks
>
> PS: maybe you can also checkpoint the log for task
> `d0aa98767c852c97ae8faf70a54241e3`, JM received its ack message late also.
>
> [1] https://kafka.apache.org/documentation/
> Best,
> Congxian
>
>
> Bekir Oguz  于2019年8月7日周三 下午6:48写道:
>
>> Hi Congxian,
>> Thanks for checking the logs. What I see from the logs is:
>>
>> - For the tasks like "Source:
>> profileservice-snowplow-clean-events_kafka_source -> Filter” {17, 27, 31,
>> 33, 34} / 70 : We have the ’Triggering checkpoint’ and also ‘Confirm
>> checkpoint’ log lines, with 15 mins delay in between.
>> - For the tasks like “KeyedProcess -> (Sink:
>> profileservice-userprofiles_kafka_sink, Sink:
>> profileservice-userprofiles_kafka_deletion_marker, Sink:
>> profileservice-profiledeletion_kafka_sink” {1,2,3,4,5}/70 : We DO NOT 
>> have
>> the “Triggering checkpoint” log, but only the ‘Confirm checkpoint’ lines.
>>
>> And as a final point, we ALWAYS have Kafka AbstractCoordinator logs
>> about lost connection to Kafka at the same time we have the checkpoints
>> confirmed. This 15 minutes delay might be because of some timeout at the
>> Kafka client (maybe 15 mins timeout), and then marking kafka  coordinator
>> dead, and then discovering kafka coordinator again.
>>
>> If the kafka connection is IDLE during 15 mins, Flink cannot confirm
>> the checkpoints, cannot send the async offset commit request to Kafka. 
>> This
>> could be the root cause of the problem. Please see the attached logs
>> filtered on the Kafka AbstractCoordinator. Every time we have a 15 
>> minutes
>> checkpoint, we have this kafka issue. (Happened today at 9:14 and 9:52)
>>
>>
>> I will enable Kafka DEBUG logging to see more and let you know about
>> the findings.
>>
>> Thanks a lot for your support,
>> Bekir Oguz
>>
>>
>>
>> Op 7 aug. 2019, om 12:06 heeft Congxian Qiu 
>> het volgende geschreven:
>>
>> Hi
>>
>> Received all the files, as a first glance, the program uses at least
>> once checkpoint mode, from the tm log, maybe we need to check checkpoint 
>> of
>> 

State of FLIPs

2019-08-30 Thread Chesnay Schepler
The following FLIPs are marked as "Under discussion" in the wiki 
, 
but actually seem to be in progress (i.e. have open pull requests) and 
some even  have code merged to master:


 * FLIP-36 (Interactive Programming)
 * FLIP-38 (Python Table API)
 * FLIP-44 (Support Local Aggregation)
 * FLIP-50 (Spill-able Heap Keyed State Backend)

I would like to find out what the _actual_ state is, and then discuss 
how we handle these FLIPs from now on (e.g., retcon history and mark 
them as accepted, freeze further development until a vote, ...).


I've cc'd all people who create the wiki pages for said FLIPs.




[DISCUSS] Simplify Flink's cluster level RestartStrategy configuration

2019-08-30 Thread Till Rohrmann
Hi everyone,

I wanted to discuss how to simplify Flink's cluster level RestartStrategy
configuration [1]. Currently, Flink's behaviour with respect to configuring
the {{RestartStrategies}} is quite complicated and convoluted. The reason
for this is that we evolved the way it has been configured and wanted to
keep it backwards compatible. Due to this, we have currently the following
behaviour:

* If the config option `restart-strategy` is configured, then Flink uses
this `RestartStrategy` (so far so simple)
* If the config option `restart-strategy` is not configured, then
** If `restart-strategy.fixed-delay.attempts` or
`restart-strategy.fixed-delay.delay` are defined, then instantiate
`FixedDelayRestartStrategy(restart-strategy.fixed-delay.attempts,
restart-strategy.fixed-delay.delay)`
** If `restart-strategy.fixed-delay.attempts` and
`restart-strategy.fixed-delay.delay` are not defined, then
*** If checkpointing is disabled, then choose `NoRestartStrategy`
*** If checkpointing is enabled, then choose
`FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`

I would like to simplify the configuration by removing the "If
`restart-strategy.fixed-delay.attempts` or
`restart-strategy.fixed-delay.delay`, then" condition. That way, the logic
would be the following:

* If the config option `restart-strategy` is configured, then Flink uses
this `RestartStrategy`
* If the config option `restart-strategy` is not configured, then
** If checkpointing is disabled, then choose `NoRestartStrategy`
** If checkpointing is enabled, then choose
`FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`

That way we retain the user friendliness that jobs restart if the user
enabled checkpointing and we make it clear that any `
restart-strategy.fixed-delay.xyz` setting will only be respected if
`restart-strategy` has been set to `fixed-delay`.

This simplification would, however, change Flink's behaviour and might
break existing setups. Since we introduced `RestartStrategies` with Flink
1.0.0 and deprecated the prior configuration mechanism which enables
restarting if either the `attempts` or the `delay` has been set, I think
that the number of broken jobs should be minimal if not non-existent.

I'm sure that one can simplify the way RestartStrategies are
programmatically configured as well but for the sake of simplicity/scoping
I'd like to not touch it right away.

What do you think about this behaviour change?

[1] https://issues.apache.org/jira/browse/FLINK-13921

Cheers,
Till


Re: [DISCUSS] Flink client api enhancement for downstream project

2019-08-30 Thread Kostas Kloudas
Hi all,

I am just writing here to let you know that I am working on a POC that
tries to refactor the current state of job submission in Flink.
I want to stress out that it introduces NO CHANGES to the current
behaviour of Flink. It just re-arranges things and introduces the
notion of an Executor, which is the entity responsible for taking the
user-code and submitting it for execution.

Given this, the discussion about the functionality that the JobClient
will expose to the user can go on independently and the same
holds for all the open questions so far.

I hope I will have some more new to share soon.

Thanks,
Kostas

On Mon, Aug 26, 2019 at 4:20 AM Yang Wang  wrote:
>
> Hi Zili,
>
> It make sense to me that a dedicated cluster is started for a per-job
> cluster and will not accept more jobs.
> Just have a question about the command line.
>
> Currently we could use the following commands to start different clusters.
> *per-job cluster*
> ./bin/flink run -d -p 5 -ynm perjob-cluster1 -m yarn-cluster
> examples/streaming/WindowJoin.jar
> *session cluster*
> ./bin/flink run -p 5 -ynm session-cluster1 -m yarn-cluster
> examples/streaming/WindowJoin.jar
>
> What will it look like after client enhancement?
>
>
> Best,
> Yang
>
> Zili Chen  于2019年8月23日周五 下午10:46写道:
>
> > Hi Till,
> >
> > Thanks for your update. Nice to hear :-)
> >
> > Best,
> > tison.
> >
> >
> > Till Rohrmann  于2019年8月23日周五 下午10:39写道:
> >
> > > Hi Tison,
> > >
> > > just a quick comment concerning the class loading issues when using the
> > per
> > > job mode. The community wants to change it so that the
> > > StandaloneJobClusterEntryPoint actually uses the user code class loader
> > > with child first class loading [1]. Hence, I hope that this problem will
> > be
> > > resolved soon.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-13840
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Aug 23, 2019 at 2:47 PM Kostas Kloudas 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > On the topic of web submission, I agree with Till that it only seems
> > > > to complicate things.
> > > > It is bad for security, job isolation (anybody can submit/cancel jobs),
> > > > and its
> > > > implementation complicates some parts of the code. So, if it were to
> > > > redesign the
> > > > WebUI, maybe this part could be left out. In addition, I would say
> > > > that the ability to cancel
> > > > jobs could also be left out.
> > > >
> > > > Also I would also be in favour of removing the "detached" mode, for
> > > > the reasons mentioned
> > > > above (i.e. because now we will have a future representing the result
> > > > on which the user
> > > > can choose to wait or not).
> > > >
> > > > Now for the separating job submission and cluster creation, I am in
> > > > favour of keeping both.
> > > > Once again, the reasons are mentioned above by Stephan, Till, Aljoscha
> > > > and also Zili seems
> > > > to agree. They mainly have to do with security, isolation and ease of
> > > > resource management
> > > > for the user as he knows that "when my job is done, everything will be
> > > > cleared up". This is
> > > > also the experience you get when launching a process on your local OS.
> > > >
> > > > On excluding the per-job mode from returning a JobClient or not, I
> > > > believe that eventually
> > > > it would be nice to allow users to get back a jobClient. The reason is
> > > > that 1) I cannot
> > > > find any objective reason why the user-experience should diverge, and
> > > > 2) this will be the
> > > > way that the user will be able to interact with his running job.
> > > > Assuming that the necessary
> > > > ports are open for the REST API to work, then I think that the
> > > > JobClient can run against the
> > > > REST API without problems. If the needed ports are not open, then we
> > > > are safe to not return
> > > > a JobClient, as the user explicitly chose to close all points of
> > > > communication to his running job.
> > > >
> > > > On the topic of not hijacking the "env.execute()" in order to get the
> > > > Plan, I definitely agree but
> > > > for the proposal of having a "compile()" method in the env, I would
> > > > like to have a better look at
> > > > the existing code.
> > > >
> > > > Cheers,
> > > > Kostas
> > > >
> > > > On Fri, Aug 23, 2019 at 5:52 AM Zili Chen 
> > wrote:
> > > > >
> > > > > Hi Yang,
> > > > >
> > > > > It would be helpful if you check Stephan's last comment,
> > > > > which states that isolation is important.
> > > > >
> > > > > For per-job mode, we run a dedicated cluster(maybe it
> > > > > should have been a couple of JM and TMs during FLIP-6
> > > > > design) for a specific job. Thus the process is prevented
> > > > > from other jobs.
> > > > >
> > > > > In our cases there was a time we suffered from multi
> > > > > jobs submitted by different users and they affected
> > > > > each other so that all ran into an error state. Also,
> > > > > run the client inside the cluster could save client
> > > > > 

[jira] [Created] (FLINK-13921) Simplify cluster level RestartStrategy configuration

2019-08-30 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-13921:
-

 Summary: Simplify cluster level RestartStrategy configuration
 Key: FLINK-13921
 URL: https://issues.apache.org/jira/browse/FLINK-13921
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Till Rohrmann
 Fix For: 1.10.0


Currently, Flink's behaviour with respect to configuring the 
{{RestartStrategies}} is quite complicated and convoluted. The reason for this 
is that we evolved the way it has been configured and wanted to keep it 
backwards compatible. Due to this, we have currently the following behaviour:

* If the config option {{restart-strategy}} is configured, then Flink uses this 
{{RestartStrategy}} (so far so simple :-)
* If the config option {{restart-strategy}} is not configured, then 
** If {{restart-strategy.fixed-delay.attempts}} or 
{{restart-strategy.fixed-delay.delay}} are defined, then instantiate 
{{FixedDelayRestartStrategy(restart-strategy.fixed-delay.attempts, 
restart-strategy.fixed-delay.delay)}}
** If {{restart-strategy.fixed-delay.attempts}} and 
{{restart-strategy.fixed-delay.delay}} are not defined, then
*** If checkpointing is disabled, then choose {{NoRestartStrategy}}
*** If checkpointing is enabled, then choose 
{{FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")}}

I would like to simplify the configuration by removing the "If 
{{restart-strategy.fixed-delay.attempts}} or 
{{restart-strategy.fixed-delay.delay}}, then" condition. That way, the logic 
would be the following:

* If the config option {{restart-strategy}} is configured, then Flink uses this 
{{RestartStrategy}} (so far so simple :-)
* If the config option {{restart-strategy}} is not configured, then 
** If checkpointing is disabled, then choose {{NoRestartStrategy}}
** If checkpointing is enabled, then choose 
{{FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")}}

That way we retain the user friendliness that their jobs restart if they enable 
checkpointing and we make it clear that any {{restart-strategy.fixed-delay}} 
setting will only be respected if {{restart-strategy}} has been set to 
{{fixed-delay}}.

This simplification would, however, change Flink's behaviour and might break 
existing setups. Since we introduced {{RestartStrategies}} with Flink {{1.0.0}} 
and deprecated the prior configuration mechanism which enables restarting if 
either the {{attempts}} or the {{delay}} has been set, I think that the number 
of broken jobs should be minimal if not non-existent.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-60: Restructure the Table API & SQL documentation

2019-08-30 Thread Dawid Wysakowicz
+1 to the idea of restructuring the docs.

My only suggestion to consider is how about moving the
User-Defined-Extensions subpages to corresponding broader topics?

Sources & Sinks >> Connect to external systems

Catalogs >> Connect to external systems

and then have a Functions sections with subsections:

functions

    |- built in functions

    |- user defined functions


Best,

Dawid

On 30/08/2019 10:59, Timo Walther wrote:
> Hi everyone,
>
> the Table API & SQL documentation was already in a very good shape in
> Flink 1.8. However, in the past it was mostly presented as an addition
> to DataStream API. As the Table and SQL world is growing quickly,
> stabilizes in its concepts, and is considered as another top-level API
> and closed ecosystem, it is time to restructure the docs a little bit
> to represent the vision of FLIP-32.
>
> Current state:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/
>
> We would like to propose the following FLIP-60 for a new structure:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=127405685
>
>
> Looking forward to feedback.
>
> Thanks,
>
> Timo
>
>
>



signature.asc
Description: OpenPGP digital signature


Re: [VOTE] FLIP-54: Evolve ConfigOption and Configuration

2019-08-30 Thread Dawid Wysakowicz
+1 to the design

On 29/08/2019 15:53, Timo Walther wrote:
> I converted the mentioned Google doc into a wiki page:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>
>
> The core semantics have not changed.
>
> Happy voting,
> Timo
>
> On 29.08.19 04:30, Zili Chen wrote:
>> The design looks good to me.
>>
>> +1 go ahead!
>>
>> Best,
>> tison.
>>
>>
>> Jark Wu  于2019年8月28日周三 下午6:08写道:
>>
>>> Hi Timo,
>>>
>>> The new changes looks good to me.
>>>
>>> +1 to the FLIP.
>>>
>>>
>>> Cheers,
>>> Jark
>>>
>>> On Wed, 28 Aug 2019 at 16:02, Timo Walther  wrote:
>>>
 Hi everyone,

 after some last minute changes yesterday, I would like to start a new
 vote on FLIP-54. The discussion seems to have reached an agreement. Of
 course this doesn't mean that we can't propose further improvements on
 ConfigOption's and Flink configuration in general in the future. It is
 just one step towards having a better unified configuration for the
 project.

 Please vote for the following design document:



>>> https://docs.google.com/document/d/1IQ7nwXqmhCy900t2vQLEL3N2HIdMg-JO8vTzo1BtyKU/edit#
>>>
 The discussion can be found at:



>>> https://lists.apache.org/thread.html/a56c6b52e5f828d4a737602b031e36b5dd6eaa97557306696a8063a9@%3Cdev.flink.apache.org%3E
>>>
 This voting will be open for at least 72 hours. I'll try to close
 it on
 2019-09-02 8:00 UTC, unless there is an objection or not enough votes.

 I will convert it to a Wiki page afterwards.

 Thanks,

 Timo


>



signature.asc
Description: OpenPGP digital signature


Re: [ANNOUNCE] Apache Flink-shaded 8.0 released

2019-08-30 Thread jincheng sun
Thanks a lot Chesnay and to the community for making this release possible !

Cheers,
Jincheng

Chesnay Schepler  于2019年8月30日周五 下午6:56写道:

> The Apache Flink community is very happy to announce the release of
> Apache Flink-shaded 8.0.
>
> The flink-shaded project contains a number of shaded dependencies for
> Apache Flink.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345488
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Chesnay
>
>


[jira] [Created] (FLINK-13920) Move list of old releases into _config.yml

2019-08-30 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-13920:


 Summary: Move list of old releases into _config.yml
 Key: FLINK-13920
 URL: https://issues.apache.org/jira/browse/FLINK-13920
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


When adding new releases to the downloads page, some information must be 
updated in _config.yml (used in the upper parts of the downloads page), but the 
list of past releases must be updated separately on both download pages 
(english/chinese).

This is error-prone; we should move the list of old releases into _config.yml 
as well.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13919) Remove 1.6.4 from downloads page and SVN

2019-08-30 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-13919:


 Summary: Remove 1.6.4 from downloads page and SVN
 Key: FLINK-13919
 URL: https://issues.apache.org/jira/browse/FLINK-13919
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Reporter: Chesnay Schepler


1.6.4 is still listed on the download page, and the release artifacts are still 
present on dist.apache.org .

Given that we no longer support 1.6 and the last release (1.6.4) was 6 months 
ago I would say it is time to remove these.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Releasing Flink 1.8.2

2019-08-30 Thread Hequn Cheng
Hi Jincheng,

+1 for a 1.8.2 release.
Thanks a lot for raising the discussion. It would be nice to have these
critical fixes.

Best, Hequn


On Fri, Aug 30, 2019 at 6:31 PM Maximilian Michels  wrote:

> Hi Jincheng,
>
> +1 I would be for a 1.8.2 release such that we can fix the problems with
> the nested closure cleaner which currently block 1.8.1 users with Beam:
> https://issues.apache.org/jira/browse/FLINK-13367
>
> Thanks,
> Max
>
> On 30.08.19 11:25, jincheng sun wrote:
> > Hi Flink devs,
> >
> > It has been nearly 2 months since the 1.8.1 released. So, what do you
> think
> > about releasing Flink 1.8.2 soon?
> >
> > We already have some blocker and critical fixes in the release-1.8
> branch:
> >
> > [Blocker]
> > - FLINK-13159 java.lang.ClassNotFoundException when restore job
> > - FLINK-10368 'Kerberized YARN on Docker test' unstable
> > - FLINK-12578 Use secure URLs for Maven repositories
> >
> > [Critical]
> > - FLINK-12736 ResourceManager may release TM with allocated slots
> > - FLINK-12889 Job keeps in FAILING state
> > - FLINK-13484 ConnectedComponents end-to-end test instable with
> > NoResourceAvailableException
> > - FLINK-13508 CommonTestUtils#waitUntilCondition() may attempt to sleep
> > with negative time
> > - FLINK-13806 Metric Fetcher floods the JM log with errors when TM is
> lost
> >
> > Furthermore, I think the following one blocker issue should be merged
> > before 1.8.2 release.
> >
> > - FLINK-13897: OSS FS NOTICE file is placed in wrong directory
> >
> > It would also be great if we can have the fix of Elasticsearch6.x
> connector
> > threads leaking (FLINK-13689) in 1.8.2 release which is identified as
> major.
> >
> > Please let me know what you think?
> >
> > Cheers,
> > Jincheng
> >
>


[ANNOUNCE] Apache Flink-shaded 8.0 released

2019-08-30 Thread Chesnay Schepler
The Apache Flink community is very happy to announce the release of 
Apache Flink-shaded 8.0.


The flink-shaded project contains a number of shaded dependencies for 
Apache Flink.


Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications.


The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345488

We would like to thank all contributors of the Apache Flink community 
who made this release possible!


Regards,
Chesnay



Re: [DISCUSS] Releasing Flink 1.8.2

2019-08-30 Thread Dian Fu
Hi Jincheng,

Thanks a lot for bring up this discussion. +1 for this release.

Regards,
Dian

> 在 2019年8月30日,下午6:31,Maximilian Michels  写道:
> 
> Hi Jincheng,
> 
> +1 I would be for a 1.8.2 release such that we can fix the problems with the 
> nested closure cleaner which currently block 1.8.1 users with Beam: 
> https://issues.apache.org/jira/browse/FLINK-13367
> 
> Thanks,
> Max
> 
> On 30.08.19 11:25, jincheng sun wrote:
>> Hi Flink devs,
>> It has been nearly 2 months since the 1.8.1 released. So, what do you think
>> about releasing Flink 1.8.2 soon?
>> We already have some blocker and critical fixes in the release-1.8 branch:
>> [Blocker]
>> - FLINK-13159 java.lang.ClassNotFoundException when restore job
>> - FLINK-10368 'Kerberized YARN on Docker test' unstable
>> - FLINK-12578 Use secure URLs for Maven repositories
>> [Critical]
>> - FLINK-12736 ResourceManager may release TM with allocated slots
>> - FLINK-12889 Job keeps in FAILING state
>> - FLINK-13484 ConnectedComponents end-to-end test instable with
>> NoResourceAvailableException
>> - FLINK-13508 CommonTestUtils#waitUntilCondition() may attempt to sleep
>> with negative time
>> - FLINK-13806 Metric Fetcher floods the JM log with errors when TM is lost
>> Furthermore, I think the following one blocker issue should be merged
>> before 1.8.2 release.
>> - FLINK-13897: OSS FS NOTICE file is placed in wrong directory
>> It would also be great if we can have the fix of Elasticsearch6.x connector
>> threads leaking (FLINK-13689) in 1.8.2 release which is identified as major.
>> Please let me know what you think?
>> Cheers,
>> Jincheng



Re: [DISCUSS] Releasing Flink 1.8.2

2019-08-30 Thread Maximilian Michels

Hi Jincheng,

+1 I would be for a 1.8.2 release such that we can fix the problems with 
the nested closure cleaner which currently block 1.8.1 users with Beam: 
https://issues.apache.org/jira/browse/FLINK-13367


Thanks,
Max

On 30.08.19 11:25, jincheng sun wrote:

Hi Flink devs,

It has been nearly 2 months since the 1.8.1 released. So, what do you think
about releasing Flink 1.8.2 soon?

We already have some blocker and critical fixes in the release-1.8 branch:

[Blocker]
- FLINK-13159 java.lang.ClassNotFoundException when restore job
- FLINK-10368 'Kerberized YARN on Docker test' unstable
- FLINK-12578 Use secure URLs for Maven repositories

[Critical]
- FLINK-12736 ResourceManager may release TM with allocated slots
- FLINK-12889 Job keeps in FAILING state
- FLINK-13484 ConnectedComponents end-to-end test instable with
NoResourceAvailableException
- FLINK-13508 CommonTestUtils#waitUntilCondition() may attempt to sleep
with negative time
- FLINK-13806 Metric Fetcher floods the JM log with errors when TM is lost

Furthermore, I think the following one blocker issue should be merged
before 1.8.2 release.

- FLINK-13897: OSS FS NOTICE file is placed in wrong directory

It would also be great if we can have the fix of Elasticsearch6.x connector
threads leaking (FLINK-13689) in 1.8.2 release which is identified as major.

Please let me know what you think?

Cheers,
Jincheng



[jira] [Created] (FLINK-13918) Add class for TokenizerMapper.

2019-08-30 Thread Xu Yang (Jira)
Xu Yang created FLINK-13918:
---

 Summary: Add class for TokenizerMapper.
 Key: FLINK-13918
 URL: https://issues.apache.org/jira/browse/FLINK-13918
 Project: Flink
  Issue Type: Sub-task
  Components: Library / Machine Learning
Reporter: Xu Yang


TokenizerMapper is a transformer to transform all words into lower case, and 
split it by white space.
 * Add TokenizerMapper for the operation of the TokenizerMapper.
 * Add TokenizerMapperParams for the params of TokenizerMapper.
 * Add TokenizerMapperTest for the test example.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13917) Add class for RegexTokenizerMapper.

2019-08-30 Thread Xu Yang (Jira)
Xu Yang created FLINK-13917:
---

 Summary: Add  class  for  RegexTokenizerMapper.
 Key: FLINK-13917
 URL: https://issues.apache.org/jira/browse/FLINK-13917
 Project: Flink
  Issue Type: Sub-task
  Components: Library / Machine Learning
Reporter: Xu Yang


RegexTokenizerMapper  is  a  transformer  to  extract  tokens  or  repeatedly  
match  patterns.
 * Add  RegexTokenizerMapper  for  the  operation  of  the  
RegexTokenizerMapper.
 * Add  RegexTokenizerMapperParams  for  the  params  of  RegexTokenizerMapper.
 * Add  RegexTokenizerMapperTest  for  the  test  example.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13916) Add class for NGramMapper.

2019-08-30 Thread Xu Yang (Jira)
Xu Yang created FLINK-13916:
---

 Summary: Add  class  for  NGramMapper.
 Key: FLINK-13916
 URL: https://issues.apache.org/jira/browse/FLINK-13916
 Project: Flink
  Issue Type: Sub-task
  Components: Library / Machine Learning
Reporter: Xu Yang


NGramMapper  is  a  transformer  to  convert  a  document  into  a  new  
document  composed  of  all  its  nGrams.
 * Add  NGramMapper  for  the  operation  of  the  NGramMapper.
 * Add  HasNDefaultAs2  for  the  params  of  NGramMapper.
 * Add  NGramMapperParams  for  the  params  of  NGramMapper.
 * Add  NGramMapperTest  for  the  test  example.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13913) Add class for NLPConstant.

2019-08-30 Thread Xu Yang (Jira)
Xu Yang created FLINK-13913:
---

 Summary: Add  class  for  NLPConstant.
 Key: FLINK-13913
 URL: https://issues.apache.org/jira/browse/FLINK-13913
 Project: Flink
  Issue Type: Sub-task
  Components: Library / Machine Learning
Reporter: Xu Yang


NLPConstant  is  a  class  to  store  the  common  used  delimiters  for  NLP.
 * Add  NLPConstant  to  store  the  common  used  delimiters.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13912) Remove ClusterClient#getClusterConnectionInfo

2019-08-30 Thread TisonKun (Jira)
TisonKun created FLINK-13912:


 Summary: Remove ClusterClient#getClusterConnectionInfo
 Key: FLINK-13912
 URL: https://issues.apache.org/jira/browse/FLINK-13912
 Project: Flink
  Issue Type: Improvement
  Components: Command Line Client, Runtime / Coordination
Affects Versions: 1.10.0
Reporter: TisonKun
 Fix For: 1.10.0


As discussed in FLINK-13750, we actually doesn't need this method any more. All 
configuration needed is WebMonitor address and port in standalone HA mode. We 
can safely remove this method and replace its usages with 
{{ClusterClient#getWebInterfaceURL}}

cc [~till.rohrmann]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13911) Add the interface of ModelDataConverter, and several base classes that implement this interface.

2019-08-30 Thread Xu Yang (Jira)
Xu Yang created FLINK-13911:
---

 Summary: Add the interface of ModelDataConverter, and several base 
classes that implement this interface.
 Key: FLINK-13911
 URL: https://issues.apache.org/jira/browse/FLINK-13911
 Project: Flink
  Issue Type: Sub-task
  Components: Library / Machine Learning
Reporter: Xu Yang


ModelDataConverter is an interface that defines methods for converting a 
generic model data object to a collection of rows and the other way around. It 
is used when we are exporting the model data to a table, as well as loading 
model data from a table for inference.
 * Add ModelDataConverter which defines the model data conversion methods.
 * Add SimpleModelDataConverter which is a specific model data converter where 
the model data is represented as meta data and a collection of strings.
 * Add LabeledModelDataConverter which is a specific model data converter where 
the model data is represented as meta data, a collection of strings, and a 
collection of label values.
 * Add RichModelDataConverter which is a specific model data converter where 
the model data is represented as meta data, a collection of strings, and a 
collection of auxiliary rows.
 * Add ModelConverterUtils which provides utility functions for implementing 
the above model data converters.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[DISCUSS] Releasing Flink 1.8.2

2019-08-30 Thread jincheng sun
Hi Flink devs,

It has been nearly 2 months since the 1.8.1 released. So, what do you think
about releasing Flink 1.8.2 soon?

We already have some blocker and critical fixes in the release-1.8 branch:

[Blocker]
- FLINK-13159 java.lang.ClassNotFoundException when restore job
- FLINK-10368 'Kerberized YARN on Docker test' unstable
- FLINK-12578 Use secure URLs for Maven repositories

[Critical]
- FLINK-12736 ResourceManager may release TM with allocated slots
- FLINK-12889 Job keeps in FAILING state
- FLINK-13484 ConnectedComponents end-to-end test instable with
NoResourceAvailableException
- FLINK-13508 CommonTestUtils#waitUntilCondition() may attempt to sleep
with negative time
- FLINK-13806 Metric Fetcher floods the JM log with errors when TM is lost

Furthermore, I think the following one blocker issue should be merged
before 1.8.2 release.

- FLINK-13897: OSS FS NOTICE file is placed in wrong directory

It would also be great if we can have the fix of Elasticsearch6.x connector
threads leaking (FLINK-13689) in 1.8.2 release which is identified as major.

Please let me know what you think?

Cheers,
Jincheng


[DISCUSS] FLIP-60: Restructure the Table API & SQL documentation

2019-08-30 Thread Timo Walther

Hi everyone,

the Table API & SQL documentation was already in a very good shape in 
Flink 1.8. However, in the past it was mostly presented as an addition 
to DataStream API. As the Table and SQL world is growing quickly, 
stabilizes in its concepts, and is considered as another top-level API 
and closed ecosystem, it is time to restructure the docs a little bit to 
represent the vision of FLIP-32.


Current state:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/

We would like to propose the following FLIP-60 for a new structure:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=127405685

Looking forward to feedback.

Thanks,

Timo




[jira] [Created] (FLINK-13910) Many serializable classes have no explicit 'serialVersionUID'

2019-08-30 Thread Yun Tang (Jira)
Yun Tang created FLINK-13910:


 Summary: Many serializable classes have no explicit 
'serialVersionUID'
 Key: FLINK-13910
 URL: https://issues.apache.org/jira/browse/FLINK-13910
 Project: Flink
  Issue Type: Bug
Reporter: Yun Tang
 Attachments: SerializableNoSerialVersionUIDField

Currently, many serializable classes in Flink have no explicit 
'serialVersionUID'. As [official 
doc|https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization]
 said, {{Serializable classes must define a Serial Version UID}}. 

No 'serialVersionUID' would cause compatibility problem. Take 
{{TwoPhaseCommitSinkFunction}} for example, since no explicit 
'serialVersionUID' defined, after 
[FLINK-10455|https://github.com/apache/flink/commit/489be82a6d93057ed4a3f9bf38ef50d01d11d96b]
 introduced, its default 'serialVersionUID' has changed from 
"4584405056408828651" to "4064406918549730832". In other words, if we submit a 
job from Flink-1.6.3 local home to remote Flink-1.6.2 cluster with the usage of 
{{TwoPhaseCommitSinkFunction}}, we would get exception like:
{code:java}
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:104)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException: 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; local 
class incompatible: stream classdesc serialVersionUID = 4584405056408828651, 
local class serialVersionUID = 4064406918549730832
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:537)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:524)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:512)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:473)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
... 4 more
{code}

Similar problems existed in  
{{org.apache.flink.streaming.api.operators.SimpleOperatorFactory}} which has 
different 'serialVersionUID' from release-1.9 and current master branch.

IMO, we might have two options to fix this bug:
# Add explicit serialVersionUID for those classes which is identical to latest 
Flink-1.9.0 release code.
# Use similar mechanism like {{FailureTolerantObjectInputStream}} in 
{{InstantiationUtil}} to ignore serialVersionUID mismatch.

I have collected all production classes without serialVersionUID from latest 
master branch in the attachment, which counts to 639 classes.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [PROPOSAL] Force rebase on master before merge

2019-08-30 Thread Chesnay Schepler
I think this is a non-issue; every committer I know checks beforehand if 
the build passes.


Piotr has provided good arguments for why this approach isn't practical.
Additionally, there are simply technical limitations that prevent this 
from working as expected.


a) we cannot attach Travis checks via CiBot due to lack of permissions
b) It is not possible AFAIK to force a PR to be up-to-date with current 
master when Travis runs. In other words, I can open a PR, travis passes, 
and so long as no new merge conflicts arise I could _still_ merge it 2 
months later.


On 30/08/2019 10:34, Piotr Nowojski wrote:

Hi,

Thanks for the proposal. I have similar concerns as Kurt.

If we enforced such rule I would be afraid that everybody would be waiting for 
tests on his PR to complete, racing others committers to be “the first guy that 
clicks the merge button”, then forcing all of the others to rebase manually and 
race again. For example it wouldn’t be possible to push a final version of the 
PR, wait for the tests to complete overnight and merge it next day. Unless we 
would allow for merging without green travis after a final rebase, but that for 
me would be almost exactly what we have now.

Is this a big issue in the first place? I don’t feel it that way, but maybe I’m 
working in not very contested parts of the code?

If it’s an issue, I would suggest to go for the merging bot, that would have a 
queue of PRs to be:
1. Automatically rebased on the latest master
2. If no conflicts in 1., run the tests
3. If no test failures merge

Piotrek


On 30 Aug 2019, at 09:38, Till Rohrmann  wrote:

Hi Tison,

thanks for starting this discussion. In general, I'm in favour of
automations which remove human mistakes out of the equation.

Do you know how these status checks work concretely? Will Github reject
commits for which there is no passed Travis run? How would hotfix commits
being distinguished from PR commits for which a Travis run should exist? So
I guess my question is how would enabling the status checks change how
committers interact with the Github repository?

Cheers,
Till

On Fri, Aug 30, 2019 at 4:46 AM Zili Chen  wrote:


Hi Kurt,

Thanks for your reply!

I find two concerns about the downside from your email. Correct
me if I misunderstanding.

1. Rebase times. Typically commits are independent one another, rebase
just fast-forward changes so that contributors rarely resolve conflicts
by himself. Reviews doesn't get blocked by this force rebase if there is
a green travis report ever -- just require contributor rebase and test
again, which generally doesn't involve changes(unless resolve conflicts).
Contributor rebases his pull request when he has spare time or is required
by reviewer/before getting merged. This should not inflict too much works.

2. Testing time. It is a separated topic that discussed in this thread[1].
I don't think we finally live with a long testing time, so it won't be a
problem then we trigger multiple tests.

Simply sum up, for trivial cases, works are trivial and it
prevents accidentally
failures; for complicated cases, it already requires rebase and fully
tests.

Best,
tison.

[1]

https://lists.apache.org/x/thread.html/b90aa518fcabce94f8e1de4132f46120fae613db6e95a2705f1bd1ea@%3Cdev.flink.apache.org%3E


Kurt Young  于2019年8月30日周五 上午9:15写道:


Hi Zili,

Thanks for the proposal, I had similar confusion in the past with your
point #2.
Force rebase to master before merging can solve some problems, but it

also

introduces new problem. Given the CI testing time is quite long (couple

of

hours)
now, it's highly possible that before your test which triggered by

rebasing

finishes,
the master will get some more new commits. This situation will get worse

if

more
people are doing this. One possible solution is let the committer decide
what should
do before he/she merges it. If it's a trivial issue, just merge it if
travis passes is
fine. But if it's a rather big one, and some related codes just got

merged

in to master,
I will choose to rebase to master and push it to my own repo to trigger

my

personal
CI test on it because this can guarantee the testing time.

To summarize: I think this should be decided by the committer who is
merging the PR,
but not be forced.

Best,
Kurt


On Thu, Aug 29, 2019 at 11:07 PM Zili Chen  wrote:


Hi devs,

GitHub provides a mechanism which is able to require branches to be
up to date before merged[1](point 6). I can see several advantages
enabling it. Thus propose our project to turn on this switch. Below are
my concerns. Looking forward to your insights.

1. Avoid CI failures in pr which fixed by another commit. We now merge

a

pull request even if CI fails but the failures knowns as flaky tests.
We doesn't resolve this by turn on the switch but it helps to find any
other potential valid failures.

2. Avoid CI failures in master after pull request merged. Actually, CI
tests the branch that pull request bind exactly. Even if it gave green
it is 

Re: [PROPOSAL] Force rebase on master before merge

2019-08-30 Thread Piotr Nowojski
Hi,

Thanks for the proposal. I have similar concerns as Kurt. 

If we enforced such rule I would be afraid that everybody would be waiting for 
tests on his PR to complete, racing others committers to be “the first guy that 
clicks the merge button”, then forcing all of the others to rebase manually and 
race again. For example it wouldn’t be possible to push a final version of the 
PR, wait for the tests to complete overnight and merge it next day. Unless we 
would allow for merging without green travis after a final rebase, but that for 
me would be almost exactly what we have now.

Is this a big issue in the first place? I don’t feel it that way, but maybe I’m 
working in not very contested parts of the code?

If it’s an issue, I would suggest to go for the merging bot, that would have a 
queue of PRs to be:
1. Automatically rebased on the latest master
2. If no conflicts in 1., run the tests
3. If no test failures merge

Piotrek

> On 30 Aug 2019, at 09:38, Till Rohrmann  wrote:
> 
> Hi Tison,
> 
> thanks for starting this discussion. In general, I'm in favour of
> automations which remove human mistakes out of the equation.
> 
> Do you know how these status checks work concretely? Will Github reject
> commits for which there is no passed Travis run? How would hotfix commits
> being distinguished from PR commits for which a Travis run should exist? So
> I guess my question is how would enabling the status checks change how
> committers interact with the Github repository?
> 
> Cheers,
> Till
> 
> On Fri, Aug 30, 2019 at 4:46 AM Zili Chen  wrote:
> 
>> Hi Kurt,
>> 
>> Thanks for your reply!
>> 
>> I find two concerns about the downside from your email. Correct
>> me if I misunderstanding.
>> 
>> 1. Rebase times. Typically commits are independent one another, rebase
>> just fast-forward changes so that contributors rarely resolve conflicts
>> by himself. Reviews doesn't get blocked by this force rebase if there is
>> a green travis report ever -- just require contributor rebase and test
>> again, which generally doesn't involve changes(unless resolve conflicts).
>> Contributor rebases his pull request when he has spare time or is required
>> by reviewer/before getting merged. This should not inflict too much works.
>> 
>> 2. Testing time. It is a separated topic that discussed in this thread[1].
>> I don't think we finally live with a long testing time, so it won't be a
>> problem then we trigger multiple tests.
>> 
>> Simply sum up, for trivial cases, works are trivial and it
>> prevents accidentally
>> failures; for complicated cases, it already requires rebase and fully
>> tests.
>> 
>> Best,
>> tison.
>> 
>> [1]
>> 
>> https://lists.apache.org/x/thread.html/b90aa518fcabce94f8e1de4132f46120fae613db6e95a2705f1bd1ea@%3Cdev.flink.apache.org%3E
>> 
>> 
>> Kurt Young  于2019年8月30日周五 上午9:15写道:
>> 
>>> Hi Zili,
>>> 
>>> Thanks for the proposal, I had similar confusion in the past with your
>>> point #2.
>>> Force rebase to master before merging can solve some problems, but it
>> also
>>> introduces new problem. Given the CI testing time is quite long (couple
>> of
>>> hours)
>>> now, it's highly possible that before your test which triggered by
>> rebasing
>>> finishes,
>>> the master will get some more new commits. This situation will get worse
>> if
>>> more
>>> people are doing this. One possible solution is let the committer decide
>>> what should
>>> do before he/she merges it. If it's a trivial issue, just merge it if
>>> travis passes is
>>> fine. But if it's a rather big one, and some related codes just got
>> merged
>>> in to master,
>>> I will choose to rebase to master and push it to my own repo to trigger
>> my
>>> personal
>>> CI test on it because this can guarantee the testing time.
>>> 
>>> To summarize: I think this should be decided by the committer who is
>>> merging the PR,
>>> but not be forced.
>>> 
>>> Best,
>>> Kurt
>>> 
>>> 
>>> On Thu, Aug 29, 2019 at 11:07 PM Zili Chen  wrote:
>>> 
 Hi devs,
 
 GitHub provides a mechanism which is able to require branches to be
 up to date before merged[1](point 6). I can see several advantages
 enabling it. Thus propose our project to turn on this switch. Below are
 my concerns. Looking forward to your insights.
 
 1. Avoid CI failures in pr which fixed by another commit. We now merge
>> a
 pull request even if CI fails but the failures knowns as flaky tests.
 We doesn't resolve this by turn on the switch but it helps to find any
 other potential valid failures.
 
 2. Avoid CI failures in master after pull request merged. Actually, CI
 tests the branch that pull request bind exactly. Even if it gave green
 it is still possible a systematic failure introduced because conflicts
 with another new commit merged in master but not merged in this branch.
 
 For the downside, it might require contributors rebase his pull
>> requests
 some times before getting merged. But it 

[jira] [Created] (FLINK-13909) LinkElement does not support different anchors required for localization

2019-08-30 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-13909:
-

 Summary: LinkElement does not support different anchors required 
for localization
 Key: FLINK-13909
 URL: https://issues.apache.org/jira/browse/FLINK-13909
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.9.0, 1.10.0
Reporter: Till Rohrmann


While addressing FLINK-13898 we realized that the {{LinkElement}} does not 
support multiple anchors which are needed to support localisation. Due to the 
translation into Chinese the anchors are not the same across Flink's English 
and Chinese documentation.

Either we keep anchors the same in both versions or we have a way to support 
multiple anchors, one for each localisation.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13907) Master documentation cannot be built

2019-08-30 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-13907:


 Summary: Master documentation cannot be built
 Key: FLINK-13907
 URL: https://issues.apache.org/jira/browse/FLINK-13907
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.10.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.10.0


The documentation currently cannot be built on buildbot because due to recent 
changes we require a newer ruby version.
While a newer ruby version is installed, I can't find a way to actually 
activate it.

For the time being I'll revert these changes:
* ef74a61f54f190926a8388f46db7919e0e94420b
* 065de4b573a05b0c3436ff2d3af3e0c16589a1a7
* f802e16b06b0c3a3682af7f9017f9c0a69e5d4de
* ac1b8dbf15c405d0646671a138a53c9953153165
* c64e167b8003b7379545c1b83e54d9491164b7a8



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13908) Broken markdown of "Breaking the lines of too long statements" section

2019-08-30 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-13908:


 Summary: Broken markdown of "Breaking the lines of too long 
statements" section
 Key: FLINK-13908
 URL: https://issues.apache.org/jira/browse/FLINK-13908
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Project Website
Reporter: Dawid Wysakowicz


The section "Breaking the lines of too long statements" in code style 
guidelines is wrongly rendered.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[DISCUSS] StreamingFile with ParquetBulkWriter bucketing limitations

2019-08-30 Thread Enrico Agnoli
StreamingFile limitations

Hi community,

I'm working toward the porting of our code from `BucketingSink<>` to 
`StreamingFileSink`.
In this case we use the sink to write AVRO via Parquet and the suggested 
implementation of the Sink should be something like:

```
val parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(mySchemaClass)
StreamingFileSink.forBulkFormat(basePath, 
parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner)
```

In this design the BucketAssigner is concatenated after the bulkFormat step. 
The problem that I'm having with this design is that I have an object that 
contains information that should be used to construct the path and a sub-object 
that contains the data to serialize. A simple example

myClass
|- country
|- cityClass extends SpecificRecordBase)

Let's say I receive myClass as a stream and I want to serialize the cityClass 
data via the logic above. The problem is that the `forBulkFormat(..)` needs to 
run on a subType of `SpecificRecordBase`, so myClass doesn't work.
If I extract cityClass from myClass then I will not have country available in 
the `withBucketAssigner(..)` to be able to store the data in the right folder...


Am I missing something or I do have to write my own version of the 
`ParquetBulkWriter` class so to be able to handle `myClass`?

Thanks for any idea and suggestion.
Enrico


[DISCUSS] Rework Behavior of "within" In CEP Library

2019-08-30 Thread Yufei Liu
Hi all,
I‘ve got several troubles when I use library CEP.

1. The funtion "within" in PatternAPI is kind of misleading. I can set within 
time in each part of pattern, but only the smallest one is functional. 
Pattern.begin("begin").where(...)
  .followBy("middle0").where(...).within(Time.second(1))
  .followBy("middle1").where(...).within(Time.second(2))
  .followBy("middle2").where(...).within(Time.second(3))

2. "within" is valid only when there are subsequent events triggered advance 
time, it might cause state leak in some cases.

3. CEP didn't support end with "notFollowBy" because it's meaningless in 
unbounded events, but end with "notFollowBy().within()" is meaningful. eg: 
Tracing user is inactive for a period of time. 
Maybe there is a way to bypass limit, like exclude matched followBy events, but 
I think it would be better if the framework can support this feature.


Here is my opinion:
I found the implemention of “within" is a property "windowTime" in NFA, which 
is decide whether the current partialMatches are timeout or not when 
advanceTime. It look like a state retention time for me, is't much more like a 
config of pattern stream, rather than a condition of PatternAPI. 
I think the real meaning of “within" is the maximum time interval between 
pages, and can set separately in each page.
(Change the meaning of current API is not a good idea, we can use another 
keyword instead of "within")

These are my initial idea about the features, 
1. Implement a TimeCondition extend IterativeCondition, and treat "within" as a 
condition in state transitions. And behavior of filter is compare createTime 
for previous and current state. 

2. Register a timer to clean timeout computationState, but it can increase the 
memory usage.

3. Create a special node if pattern end with notFollowBy().within(), and if 
reached this node then register a timer to enter a empty event if time arrived. 

The design of CEP might have their own concerns or trade off. My participation 
in this project is still relatively short, these just my personal opinion and 
some aspects may not be considered. If we can discuss these fetures and give 
some advice that would be great.

Best! :)

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-30 Thread Dawid Wysakowicz
Hi,

Ad. 1

The advantage of our approach is that you have the type definition close
to the option definition. The only difference is that it enables
expressing simple pojos with the primitives like ConfigOption,
ConfigOption etc. Otherwise as Timo said you will start having

the parsing logic scattered everywhere in the code base as it is now.
The string representation in our proposal is exactly the same as you
explained for those three options. The only difference is that you don't
have to parse the elements of a List, Map etc. afterwards.

Ad. 2

I think about the withExtendedDescription as a helper getter in a
different place, so that the option is easier to find in a different
module from it was defined.

The MAX_PARALLELISM option in TableOptions would conceptually be equal to:

public ConfigOption getMaxParallelismOption() {

    return CoreOptions.MAX_PARALLELISM;

}

This allows to further clarify the description of the option in the
context of a different module and end up in a seperate page in
documentation (but with a link to the original one). In the end it is
exactly the same option. It has exactly same key, type, parsing logic,
it is in the end forwarded to the same place.

Ad. 3

Not sure if I understand your concerns here. As Timo said it is in the
end sth similar to toBytes/fromBytes, but it puts itself to a
Configuration. Also just wanted to make sure we adjusted this part
slightly and now the ConfigOption takes ConfigurableFactory.

Best,

Dawid


On 30/08/2019 09:39, Timo Walther wrote:
> Hi Becket,
>
> thanks for the discussion.
>
> 1. ConfigOptions in their current design are bound to classes.
> Regarding, the option is "creating some Configurable objects instead
> of defining the config to create
> those Configurable"? We just moved this logic to a factory, this
> factory can then also be used for other purposes. However, how the
> option and objects are serialized to Configuration is still not part
> of the option. The option is just pure declaration.
>
> If we would allow only List, implementers would need to start
> implementing own parsing and validation logic all the time. We would
> like to avoid that.
>
> Please also keep in mind that Configuration must not consist of only
> strings, it manages a Map for efficient access. Every
> map entry can have a string representation for persistence, but in
> most cases consists of unserialized objects.
>
> 2. MAX_PARALLELISM is still defined just once. We don't overwrite
> keys, types or default values. But different layers might want to add
> helpful information. In our concrete use case for FLIP-59,
> ExecutionConfig has 50 properties and many of them are not relevant
> for the Table layer or have no effect at all. We would like to list
> and mention the most important config options again in the Table
> Configuration section, so that users are not confused, but with a
> strong link to the core option. E.g.: registered kryo serializers are
> also important also for Table users, we would like to add the comment
> "This option allows to modify the serialization of the ANY SQL data
> type.". I think we should not spam the core configuration page with
> comments from all layers, connectors, or libraries but keep this in
> the corresponding component documentation.
>
> 3. But it is something like fromBytes() and toBytes()? It serializes
> and deserializes T from a configuration?
>
> Regards,
> Timo
>
> On 29.08.19 19:14, Becket Qin wrote:
>> Hi Timo and Stephan,
>>
>> Thanks for the detail explanation.
>>
>> 1. I agree that each config should be in a human readable format. My
>> concern is that the current List looks going a little
>> too far
>> from what the configuration is supposed to do. They are essentially
>> creating some Configurable objects instead of defining the config to
>> create
>> those Configurable. This mixes ConfigOption and the usage of it. API
>> wise
>> it would be good to keep the configs and their usages (such as how to
>> create objects using the ConfigOption) apart from each other.
>> I am wondering if we can just make List also only take string. For
>> example,
>> is the following definition of map and list sufficient?
>>
>> A MapConfigOption is ConfigOption>. It can be
>> defined
>> as:
>> map_config_name: k1=v1, k2=v2, k3=v3, ...
>>
>> A ListConfigOption is ConfigOption>. It can be defined as:
>> list_config_name: v1, v2, v3, ...
>>
>> A ListOfMapConfigOption is ConfigOption>. It
>> can
>> be defined as:
>> list_of_map_config_name: k1=v1, k2=v2; k3=v3, k4=v4;
>>
>> All the key and values in the configuration are String. This also
>> guarantees that the configuration is always serializable.
>> If we want to do one more step, we can allow the ConfigOption to set all
>> the primitive types and parse that for the users. So something like
>> List, List> seems fine.
>>
>> The configuration class could also have util methods to create a list of
>> configurable such as:
>>  List > Class clazz).
>> But the 

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Zhu Zhu
One optimization that we take is letting yarn to reuse the flink-dist jar
which was localized when running previous jobs.

Thanks,
Zhu Zhu

Jörn Franke  于2019年8月30日周五 下午4:02写道:

> Increase replication factor and/or use HDFS cache
> https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
> Try to reduce the size of the Jar, eg the Flink libraries do not need to
> be included.
>
> Am 30.08.2019 um 01:09 schrieb Elkhan Dadashov  >:
>
> Dear Flink developers,
>
> Having  difficulty of getting  a Flink job started.
>
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
> containers.
>
> The default HDFS replication is 3.
>
> *The Yarn queue is empty, and 800 containers  are allocated
> almost immediately  by Yarn  RM.*
>
> It takes very long time until all 800 nodes (node managers) will download
> Uberjar from HDFS to local machines.
>
> *Q1:*
>
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
> size = HDFS replication size)
>
> b) Or Do Flink TM's can replicate from each other  ? or  already started
> TM's replicate  to  yet-started  nodes?
>
> Most probably answer is (a), but  want to confirm.
>
> *Q2:*
>
> What  is the recommended way of handling  400MB+ Uberjar with 800+
> containers ?
>
> Any specific params to tune?
>
> Thanks.
>
> Because downloading the UberJar takes really   long time, after around 15
> minutes since the job kicked, facing this exception:
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>   at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>   at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>   at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Jörn Franke
Increase replication factor and/or use HDFS cache 
https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
Try to reduce the size of the Jar, eg the Flink libraries do not need to be 
included.

> Am 30.08.2019 um 01:09 schrieb Elkhan Dadashov :
> 
> Dear Flink developers,
> 
> Having  difficulty of getting  a Flink job started.
> 
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+ 
> containers.  
> 
> The default HDFS replication is 3.
> 
> The Yarn queue is empty, and 800 containers  are allocated  almost 
> immediately  by Yarn  RM.
> 
> It takes very long time until all 800 nodes (node managers) will download 
> Uberjar from HDFS to local machines.
> 
> Q1:
> 
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch size 
> = HDFS replication size)
> 
> b) Or Do Flink TM's can replicate from each other  ? or  already started  
> TM's replicate  to  yet-started  nodes?
> 
> Most probably answer is (a), but  want to confirm.
> 
> Q2:
> 
> What  is the recommended way of handling  400MB+ Uberjar with 800+ containers 
> ?
> 
> Any specific params to tune?
> 
> Thanks.
> 
> Because downloading the UberJar takes really   long time, after around 15 
> minutes since the job kicked, facing this exception:
> 
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>   at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>   at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>   at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 


[jira] [Created] (FLINK-13906) ExecutionConfigTests.test_equals_and_hash failed on Travis

2019-08-30 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-13906:
-

 Summary: ExecutionConfigTests.test_equals_and_hash failed on Travis
 Key: FLINK-13906
 URL: https://issues.apache.org/jira/browse/FLINK-13906
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.10.0
Reporter: Till Rohrmann
 Fix For: 1.10.0


The {{ExecutionConfigTests.test_equals_and_hash}} Python test failed on Travis 
with

{code}
=== FAILURES ===
__ ExecutionConfigTests.test_equals_and_hash ___

self = 

def test_equals_and_hash(self):

config1 = ExecutionEnvironment.get_execution_environment().get_config()

config2 = ExecutionEnvironment.get_execution_environment().get_config()

self.assertEqual(config1, config2)

>   self.assertEqual(hash(config1), hash(config2))
E   AssertionError: 1609772339 != -295934785

pyflink/common/tests/test_execution_config.py:277: AssertionError
 1 failed, 373 passed in 50.62 seconds =
ERROR: InvocationError for command 
/home/travis/build/flink-ci/flink/flink-python/.tox/py27/bin/pytest (exited 
with code 1)
{code}

https://api.travis-ci.com/v3/job/229361674/log.txt



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Jeff Zhang
I can think of 2 approaches:

1. Allow flink to specify the replication of the submitted uber jar.
2. Allow flink to specify config flink.yarn.lib which is all the flink
related jars that are hosted on hdfs. This way users don't need to build
and submit a fat uber jar every time. And those flink jars hosted on hdfs
can also be specify replication separately.



Till Rohrmann  于2019年8月30日周五 下午3:33写道:

> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
> we can merge it during this release cycle.
>
> [1] https://issues.apache.org/jira/browse/FLINK-13184
>
> Cheers,
> Till
>
> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang 
> wrote:
>
>> Hi Datashov,
>>
>> We faced similar problems in our production clusters.
>>
>> Now both lauching and stopping of containers are performed in the main
>> thread of YarnResourceManager. As containers are launched and stopped one
>> after another, it usually takes long time to boostrap large jobs. Things
>> get worse when some node managers get lost. Yarn will retry many times to
>> communicate with them, leading to heartbeat timeout of TaskManagers.
>>
>> Following are some efforts we made to help Flink deal with large jobs.
>>
>> 1. We provision some common jars in all cluster nodes and ask our users
>> not to include these jars in their uberjar. When containers bootstrap,
>> these jars are added to the classpath via JVM options. That way, we can
>> efficiently reduce the size of uberjars.
>>
>> 2. We deploys some asynchronous threads to launch and stop containers in
>> YarnResourceManager. The bootstrap time can be efficiently  reduced when
>> launching a large amount of containers. We'd like to contribute it to the
>> community very soon.
>>
>> 3. We deploys a timeout timer for each launching container. If a task
>> manager does not register in time after its container has been launched, a
>> new container will be allocated and launched. That will lead to certain
>> waste of resources, but can reduce the effects caused by slow or
>> problematic nodes.
>>
>> Now the community is considering the refactoring of ResourceManager. I
>> think it will be the time for improving its efficiency.
>>
>> Regards,
>> Xiaogang
>>
>> Elkhan Dadashov  于2019年8月30日周五 上午7:10写道:
>>
>>> Dear Flink developers,
>>>
>>> Having  difficulty of getting  a Flink job started.
>>>
>>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>>> containers.
>>>
>>> The default HDFS replication is 3.
>>>
>>> *The Yarn queue is empty, and 800 containers  are allocated
>>> almost immediately  by Yarn  RM.*
>>>
>>> It takes very long time until all 800 nodes (node managers) will
>>> download Uberjar from HDFS to local machines.
>>>
>>> *Q1:*
>>>
>>> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
>>> size = HDFS replication size)
>>>
>>> b) Or Do Flink TM's can replicate from each other  ? or  already
>>> started  TM's replicate  to  yet-started  nodes?
>>>
>>> Most probably answer is (a), but  want to confirm.
>>>
>>> *Q2:*
>>>
>>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>>> containers ?
>>>
>>> Any specific params to tune?
>>>
>>> Thanks.
>>>
>>> Because downloading the UberJar takes really   long time, after around
>>> 15 minutes since the job kicked, facing this exception:
>>>
>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>>> start container.
>>> This token is expired. current time is 1567116179193 found 1567116001610
>>> Note: System times on machines may be out of sync. Check system time and 
>>> time zones.
>>> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
>>> Source)
>>> at 
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> at 
>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>> at 
>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>> at 
>>> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>> at 
>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at 
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at 

Re: Using Avro SpecficRecord serialization instead of slower ReflectDatumWriter/GenericDatumWriter

2019-08-30 Thread Till Rohrmann
Hi Roshan,

these kind of questions should be posted to Flink's user mailing list. I've
cross posted it now.

If you are using Flink's latest version and your type extends
`SpecificRecord`, then Flink's AvroSerializer should use the
`SpecificDatumWriter`. If this is not the case, then this sounds like a
bug. Could you maybe provide us with a bit more details about the Flink
version you are using and the actual job you are executing. Ideally you
link a git repo which contains an example to reproduce the problem.

Cheers,
Till

On Fri, Aug 30, 2019 at 5:55 AM Roshan Naik 
wrote:

> Noticing that Flink takes very long inside collect(..) due to Avro
> serialization that relies on  ReflectDatumWriter & GenericDatumWriter.
>  The object being serialized here is an Avro object that implements
> SpecificRecordBase. It is somewhat about large (~50Kb) and complex.
>
> Looking for a way to use SpecificDatumWriter for the serialization instead
> of the generic/reflection based stuff to speed it up. But don't see a way
> to influence that change.
>
>
>
>
>
>
>
>
>


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-30 Thread Timo Walther

Hi Becket,

thanks for the discussion.

1. ConfigOptions in their current design are bound to classes. 
Regarding, the option is "creating some Configurable objects instead of 
defining the config to create
those Configurable"? We just moved this logic to a factory, this factory 
can then also be used for other purposes. However, how the option and 
objects are serialized to Configuration is still not part of the option. 
The option is just pure declaration.


If we would allow only List, implementers would need to start 
implementing own parsing and validation logic all the time. We would 
like to avoid that.


Please also keep in mind that Configuration must not consist of only 
strings, it manages a Map for efficient access. Every 
map entry can have a string representation for persistence, but in most 
cases consists of unserialized objects.


2. MAX_PARALLELISM is still defined just once. We don't overwrite keys, 
types or default values. But different layers might want to add helpful 
information. In our concrete use case for FLIP-59, ExecutionConfig has 
50 properties and many of them are not relevant for the Table layer or 
have no effect at all. We would like to list and mention the most 
important config options again in the Table Configuration section, so 
that users are not confused, but with a strong link to the core option. 
E.g.: registered kryo serializers are also important also for Table 
users, we would like to add the comment "This option allows to modify 
the serialization of the ANY SQL data type.". I think we should not spam 
the core configuration page with comments from all layers, connectors, 
or libraries but keep this in the corresponding component documentation.


3. But it is something like fromBytes() and toBytes()? It serializes and 
deserializes T from a configuration?


Regards,
Timo

On 29.08.19 19:14, Becket Qin wrote:

Hi Timo and Stephan,

Thanks for the detail explanation.

1. I agree that each config should be in a human readable format. My
concern is that the current List looks going a little too far
from what the configuration is supposed to do. They are essentially
creating some Configurable objects instead of defining the config to create
those Configurable. This mixes ConfigOption and the usage of it. API wise
it would be good to keep the configs and their usages (such as how to
create objects using the ConfigOption) apart from each other.
I am wondering if we can just make List also only take string. For example,
is the following definition of map and list sufficient?

A MapConfigOption is ConfigOption>. It can be defined
as:
map_config_name: k1=v1, k2=v2, k3=v3, ...

A ListConfigOption is ConfigOption>. It can be defined as:
list_config_name: v1, v2, v3, ...

A ListOfMapConfigOption is ConfigOption>. It can
be defined as:
list_of_map_config_name: k1=v1, k2=v2; k3=v3, k4=v4;

All the key and values in the configuration are String. This also
guarantees that the configuration is always serializable.
If we want to do one more step, we can allow the ConfigOption to set all
the primitive types and parse that for the users. So something like
List, List> seems fine.

The configuration class could also have util methods to create a list of
configurable such as:
 List  clazz).
But the configuration class will not take arbitrary Configurable as the
value of its config.

2. I might have misunderstood this. But my concern on the description
extension is in the following example.

public static final ConfigOption MAX_PARALLELISM =

CoreOptions.MAX_PARALLELISM.withExtendedDescription(
"Note: That this property means that a table program has a side-effect
XYZ.");

In this case, we will have two MAX_PARALLELISM configs now. One is
CoreOptions.MAX_PARALLELISM. The other one is defined here. I suppose users
will see both configurations. One with an extended description and one
without. Let's say there is a third component which also users
MAX_PARALLELISM, will there be yet another MAX_PARALLELISM ConfigOption? If
so, what would that ConfigOption's description look like?

Ideally, we would want to have just one CoreOptions.MAX_PARALLELISM and the
description should clearly state all the usage of this ConfigOption.

3. I see, in that case, how about we name it something like
extractConfiguration()? I am just trying to see if we can make it clear
this is not something like fromBytes() and toBytes().

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 6:09 PM Timo Walther  wrote:


Hi Becket,

let me try to clarify some of your questions:

1. For every option, we also needed to think about how to represent it
in a human readable format. We do not want to allow arbitrary nesting
because that would easily allow to bypass the flattened hierarchy of
config options (`session.memory.min`). The current design allows to
represent every option type as a list. E.g.:

`myIntOption: 12` can be `myIntListOption: 12;12`
`myObjectOption: field=12,other=true` can be `myObjectListOption:

Re: [PROPOSAL] Force rebase on master before merge

2019-08-30 Thread Till Rohrmann
Hi Tison,

thanks for starting this discussion. In general, I'm in favour of
automations which remove human mistakes out of the equation.

Do you know how these status checks work concretely? Will Github reject
commits for which there is no passed Travis run? How would hotfix commits
being distinguished from PR commits for which a Travis run should exist? So
I guess my question is how would enabling the status checks change how
committers interact with the Github repository?

Cheers,
Till

On Fri, Aug 30, 2019 at 4:46 AM Zili Chen  wrote:

> Hi Kurt,
>
> Thanks for your reply!
>
> I find two concerns about the downside from your email. Correct
> me if I misunderstanding.
>
> 1. Rebase times. Typically commits are independent one another, rebase
> just fast-forward changes so that contributors rarely resolve conflicts
> by himself. Reviews doesn't get blocked by this force rebase if there is
> a green travis report ever -- just require contributor rebase and test
> again, which generally doesn't involve changes(unless resolve conflicts).
> Contributor rebases his pull request when he has spare time or is required
> by reviewer/before getting merged. This should not inflict too much works.
>
> 2. Testing time. It is a separated topic that discussed in this thread[1].
> I don't think we finally live with a long testing time, so it won't be a
> problem then we trigger multiple tests.
>
> Simply sum up, for trivial cases, works are trivial and it
> prevents accidentally
> failures; for complicated cases, it already requires rebase and fully
> tests.
>
> Best,
> tison.
>
> [1]
>
> https://lists.apache.org/x/thread.html/b90aa518fcabce94f8e1de4132f46120fae613db6e95a2705f1bd1ea@%3Cdev.flink.apache.org%3E
>
>
> Kurt Young  于2019年8月30日周五 上午9:15写道:
>
> > Hi Zili,
> >
> > Thanks for the proposal, I had similar confusion in the past with your
> > point #2.
> > Force rebase to master before merging can solve some problems, but it
> also
> > introduces new problem. Given the CI testing time is quite long (couple
> of
> > hours)
> > now, it's highly possible that before your test which triggered by
> rebasing
> > finishes,
> > the master will get some more new commits. This situation will get worse
> if
> > more
> > people are doing this. One possible solution is let the committer decide
> > what should
> > do before he/she merges it. If it's a trivial issue, just merge it if
> > travis passes is
> > fine. But if it's a rather big one, and some related codes just got
> merged
> > in to master,
> > I will choose to rebase to master and push it to my own repo to trigger
> my
> > personal
> > CI test on it because this can guarantee the testing time.
> >
> > To summarize: I think this should be decided by the committer who is
> > merging the PR,
> > but not be forced.
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Aug 29, 2019 at 11:07 PM Zili Chen  wrote:
> >
> > > Hi devs,
> > >
> > > GitHub provides a mechanism which is able to require branches to be
> > > up to date before merged[1](point 6). I can see several advantages
> > > enabling it. Thus propose our project to turn on this switch. Below are
> > > my concerns. Looking forward to your insights.
> > >
> > > 1. Avoid CI failures in pr which fixed by another commit. We now merge
> a
> > > pull request even if CI fails but the failures knowns as flaky tests.
> > > We doesn't resolve this by turn on the switch but it helps to find any
> > > other potential valid failures.
> > >
> > > 2. Avoid CI failures in master after pull request merged. Actually, CI
> > > tests the branch that pull request bind exactly. Even if it gave green
> > > it is still possible a systematic failure introduced because conflicts
> > > with another new commit merged in master but not merged in this branch.
> > >
> > > For the downside, it might require contributors rebase his pull
> requests
> > > some times before getting merged. But it should not inflict too much
> > > works.
> > >
> > > Best,
> > > tison.
> > >
> > > [1]
> https://help.github.com/en/articles/enabling-required-status-checks
> > >
> >
>


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Till Rohrmann
For point 2. there exists already a JIRA issue [1] and a PR. I hope that we
can merge it during this release cycle.

[1] https://issues.apache.org/jira/browse/FLINK-13184

Cheers,
Till

On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang  wrote:

> Hi Datashov,
>
> We faced similar problems in our production clusters.
>
> Now both lauching and stopping of containers are performed in the main
> thread of YarnResourceManager. As containers are launched and stopped one
> after another, it usually takes long time to boostrap large jobs. Things
> get worse when some node managers get lost. Yarn will retry many times to
> communicate with them, leading to heartbeat timeout of TaskManagers.
>
> Following are some efforts we made to help Flink deal with large jobs.
>
> 1. We provision some common jars in all cluster nodes and ask our users
> not to include these jars in their uberjar. When containers bootstrap,
> these jars are added to the classpath via JVM options. That way, we can
> efficiently reduce the size of uberjars.
>
> 2. We deploys some asynchronous threads to launch and stop containers in
> YarnResourceManager. The bootstrap time can be efficiently  reduced when
> launching a large amount of containers. We'd like to contribute it to the
> community very soon.
>
> 3. We deploys a timeout timer for each launching container. If a task
> manager does not register in time after its container has been launched, a
> new container will be allocated and launched. That will lead to certain
> waste of resources, but can reduce the effects caused by slow or
> problematic nodes.
>
> Now the community is considering the refactoring of ResourceManager. I
> think it will be the time for improving its efficiency.
>
> Regards,
> Xiaogang
>
> Elkhan Dadashov  于2019年8月30日周五 上午7:10写道:
>
>> Dear Flink developers,
>>
>> Having  difficulty of getting  a Flink job started.
>>
>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>> containers.
>>
>> The default HDFS replication is 3.
>>
>> *The Yarn queue is empty, and 800 containers  are allocated
>> almost immediately  by Yarn  RM.*
>>
>> It takes very long time until all 800 nodes (node managers) will download
>> Uberjar from HDFS to local machines.
>>
>> *Q1:*
>>
>> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
>> size = HDFS replication size)
>>
>> b) Or Do Flink TM's can replicate from each other  ? or  already started
>> TM's replicate  to  yet-started  nodes?
>>
>> Most probably answer is (a), but  want to confirm.
>>
>> *Q2:*
>>
>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>> containers ?
>>
>> Any specific params to tune?
>>
>> Thanks.
>>
>> Because downloading the UberJar takes really   long time, after around 15
>> minutes since the job kicked, facing this exception:
>>
>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>> start container.
>> This token is expired. current time is 1567116179193 found 1567116001610
>> Note: System times on machines may be out of sync. Check system time and 
>> time zones.
>>  at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
>> Source)
>>  at 
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>  at 
>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>  at 
>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>  at 
>> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>  at 
>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>  at 
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>  at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>  at 
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>  at 
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

Re: [DISCUSS] FLIP-50: Spill-able Heap Keyed State Backend

2019-08-30 Thread Yu Li
Done. Thanks for the reminder Tison!

Best Regards,
Yu


On Thu, 29 Aug 2019 at 21:03, Zili Chen  wrote:

> Hi Yu,
>
> Notice that the wiki is still marked as "*Under Discussion*" state.
>
> I think you can update it correspondingly.
>
> Best,
> tison.
>
>
> Yu Li  于2019年8月20日周二 下午10:28写道:
>
> > Sorry for the lag but since we've got a consensus days ago, I started a
> > vote thread which will have a result by EOD, thus I'm closing this
> > discussion thread. Thanks all for the participation and
> > comments/suggestions!
> >
> > Best Regards,
> > Yu
> >
> >
> > On Fri, 16 Aug 2019 at 09:09, Till Rohrmann 
> wrote:
> >
> > > +1 for this FLIP and the feature. I think this feature will be super
> > > helpful for many Flink users.
> > >
> > > Once the SpillableHeapKeyedStateBackend has proven to be superior to
> the
> > > HeapKeyedStateBackend we should think about removing the latter
> > completely
> > > to reduce maintenance burden.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Aug 16, 2019 at 4:06 AM Congxian Qiu 
> > > wrote:
> > >
> > > > Big +1 for this feature.
> > > >
> > > > This FLIP can help improves at least the following two scenarios:
> > > > - Temporary data peak when using Heap StateBackend
> > > > - Heap State Backend has better performance than RocksDBStateBackend,
> > > > especially on SATA disk. there are some guys ever told me that they
> > > > increased the parallelism of operators(and use HeapStateBackend)
> other
> > > than
> > > > use RocksDBStateBackend to get better performance. But increase
> > > parallelism
> > > > will have some other problems, after this FLIP, we can run Flink Job
> > with
> > > > the same parallelism as RocksDBStateBackend and get better
> performance
> > > > also.
> > > >
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > Yu Li  于2019年8月16日周五 上午12:14写道:
> > > >
> > > > > Thanks all for the reviews and comments!
> > > > >
> > > > > bq. From the implementation plan, it looks like this exists purely
> > in a
> > > > new
> > > > > module and does not require any changes in other parts of Flink's
> > code.
> > > > Can
> > > > > you confirm that?
> > > > > Confirmed, thanks!
> > > > >
> > > > > Best Regards,
> > > > > Yu
> > > > >
> > > > >
> > > > > On Thu, 15 Aug 2019 at 18:04, Tzu-Li (Gordon) Tai <
> > tzuli...@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > +1 to start a VOTE for this FLIP.
> > > > > >
> > > > > > Given the properties of this new state backend and that it will
> > exist
> > > > as
> > > > > a
> > > > > > new module without touching the original heap backend, I don't
> see
> > a
> > > > harm
> > > > > > in including this.
> > > > > > Regarding design of the feature, I've already mentioned my
> comments
> > > in
> > > > > the
> > > > > > original discussion thread.
> > > > > >
> > > > > > Cheers,
> > > > > > Gordon
> > > > > >
> > > > > > On Thu, Aug 15, 2019 at 5:53 PM Yun Tang 
> wrote:
> > > > > >
> > > > > > > Big +1 for this feature.
> > > > > > >
> > > > > > > Our customers including me, have ever met dilemma where we have
> > to
> > > > use
> > > > > > > window to aggregate events in applications like real-time
> > > monitoring.
> > > > > The
> > > > > > > larger of timer and window state, the poor performance of
> > RocksDB.
> > > > > > However,
> > > > > > > switching to use FsStateBackend would always make me feel fear
> > > about
> > > > > the
> > > > > > > OOM errors.
> > > > > > >
> > > > > > > Look forward for more powerful enrichment to state-backend, and
> > > help
> > > > > > Flink
> > > > > > > to achieve better performance together.
> > > > > > >
> > > > > > > Best
> > > > > > > Yun Tang
> > > > > > > 
> > > > > > > From: Stephan Ewen 
> > > > > > > Sent: Thursday, August 15, 2019 23:07
> > > > > > > To: dev 
> > > > > > > Subject: Re: [DISCUSS] FLIP-50: Spill-able Heap Keyed State
> > Backend
> > > > > > >
> > > > > > > +1 for this feature. I think this will be appreciated by users,
> > as
> > > a
> > > > > way
> > > > > > to
> > > > > > > use the HeapStateBackend with a safety-net against OOM errors.
> > > > > > > And having had major production exposure is great.
> > > > > > >
> > > > > > > From the implementation plan, it looks like this exists purely
> > in a
> > > > new
> > > > > > > module and does not require any changes in other parts of
> Flink's
> > > > code.
> > > > > > Can
> > > > > > > you confirm that?
> > > > > > >
> > > > > > > Other that that, I have no further questions and we could
> proceed
> > > to
> > > > > vote
> > > > > > > on this FLIP, from my side.
> > > > > > >
> > > > > > > Best,
> > > > > > > Stephan
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Aug 13, 2019 at 10:00 PM Yu Li 
> wrote:
> > > > > > >
> > > > > > > > Sorry for forgetting to give the link of the FLIP, here it
> is:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 

[jira] [Created] (FLINK-13905) Separate checkpoint triggering into stages

2019-08-30 Thread Biao Liu (Jira)
Biao Liu created FLINK-13905:


 Summary: Separate checkpoint triggering into stages
 Key: FLINK-13905
 URL: https://issues.apache.org/jira/browse/FLINK-13905
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.10.0


Currently {{CheckpointCoordinator#triggerCheckpoint}} includes some heavy IO 
operations. We plan to separate the triggering into different stages. The IO 
operations are executed in IO threads, while other on-memory operations are not.

This is a preparation for making all on-memory operations of 
{{CheckpointCoordinator}} single threaded (in main thread).
Note that we could not put on-memory operations of triggering into main thread 
directly now. Because there are still some operations on a heavy lock 
(coordinator-wide).



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Best practice to run flink on kubernetes

2019-08-30 Thread Yang Wang
Hi Zhenghua,

You are right. For per-job cluster, the taskmanagers will be allocated

dynamically by KubernetesResourceManager. For session cluster, we hope

taskmangers could be pre-allocated even though it does not work now.

Please navigate to the doc[1] for more details.




Hi Thomas,

We have no doubt that flink only need to support #1 and #3. For #1,

we need external deployment management tools to make it in production.

I also think kubernetes operator is good choice. It makes managing multiple

flink jobs and long running streaming applications easier.


Also in some companies, they have their own flink job management platform.

Platform users submit flink job through webui. Update the flink
configuration

and restart the the job.


For #3, we just want to make it possible to start flink job cluster and
session

cluster through cli. These users who used to run flink workloads on yarn
are

very convenient to migrate to kubernetes cluster. Compared to #1, the
dynamic

resource allocation is an important advantage. Maybe it could also be
introduced

to #1 in the future by some way.




[1].
https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing

Thomas Weise  于2019年8月29日周四 下午10:24写道:

> Till had already summed it up, but I want to emphasize that Flink as
> project only needs to provide #1 (reactive mode) and #3 (active mode, which
> necessarily is tied to the cluster manager of choice). The latter would be
> needed for Flink jobs to be elastic (in the future), although we may want
> to discuss how such capability can be made easier with #1 as well.
>
> For users #1 alone is of little value, since they need to solve their
> deployment problem. So it will be good to list options such as the Lyft
> Flink k8s operator on the ecosystem page and possibly point to that from
> the Flink documentation as well.
>
> I also want to point out that #3, while it looks easy to start with, has an
> important limitation when it comes to manage long running streaming
> applications. Such application essentially will be a sequence of jobs that
> come and go across stateful upgrades or rollbacks. Any solution that is
> designed to manage a single Flink job instance can't address that need.
> That is why the k8s operator was created. It specifically understands the
> concept of an application.
>
> Thomas
>
>
> On Wed, Aug 28, 2019 at 7:56 PM Zhenghua Gao  wrote:
>
> > Thanks Yang for bringing this up. I think option1 is very useful for
> early
> > adopters.
> > People do not know much about k8s and can easily set up on minikube to
> have
> > a taste.
> >
> > For option2 and option3, i prefer option3 because i am familiar yarn and
> > don't have much concept of k8s.
> > And there is some doube about starting a session cluster in option3:
> >
> > > ./bin/kubernetes-session.sh -d -n 2 -tm 512 -s 4 -nm
> > flink-session-example
> > > -i flink:latest -kD kubernetes.service.exposed.type=NODE_PORT
> >
> > Is the -n option means number of TaskManager?
> > Do we pre-running taskmanager pods or requesting and launching
> taskmanager
> > pods dynamically?
> >
> > *Best Regards,*
> > *Zhenghua Gao*
> >
> >
> > On Fri, Aug 9, 2019 at 9:12 PM Yang Wang  wrote:
> >
> > > Hi all,
> > >
> > > Currently cloud native architectures has been introduced to many
> > companies
> > > in production. They use kubernetes to run deep learning, web server,
> etc.
> > > If we could deploy the per-job/session flink cluster on kubernetes to
> > make
> > > it mix-run with other workloads, the cluster resource utilization will
> be
> > > better. Also many kubernetes users are more easier to have a taste on
> the
> > > flink.
> > >
> > > By now we have three options to run flink jobs on k8s.
> > >
> > > [1]. Create jm/tm/service yaml and apply, then you will get a flink
> > > standalone cluster on k8s. Use flink run to submit job to the existed
> > flink
> > > cluster. Some companies may have their own deploy system to manage the
> > > flink cluster.
> > >
> > > [2]. Use flink-k8s-operator to manage multiple flink clusters,
> including
> > > session and perjob. It could manage the complete deployment lifecycle
> of
> > > the application. I think this option is really easy to use for the k8s
> > > users. They are familiar with k8s-opertor, kubectl and other tools of
> > k8s.
> > > They could debug and run the flink cluster just like other k8s
> > > applications.
> > >
> > > [3]. Natively integration with k8s, use the flink run or
> > > kubernetes-session.sh to start a flink cluster. It is very similar to
> > > submitting an flink cluster to Yarn. KubernetesClusterDescriptor talks
> to
> > > k8s api server to start a flink master deployment of 1.
> > > KubernetesResourceManager dynamically allocates resource from k8s to
> > start
> > > task manager as demand. This option is very easy for flink users to get
> > > started. In the simplest case, we just need to update the '-m
> > yarn-cluster'
> > > to -m '-m 

[jira] [Created] (FLINK-13904) Avoid competition between different rounds of checkpoint triggering

2019-08-30 Thread Biao Liu (Jira)
Biao Liu created FLINK-13904:


 Summary: Avoid competition between different rounds of checkpoint 
triggering
 Key: FLINK-13904
 URL: https://issues.apache.org/jira/browse/FLINK-13904
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Biao Liu
 Fix For: 1.10.0


As a part of {{CheckpointCoordinator}} refactoring, I'd like to simplify the 
concurrent triggering logic.
The different rounds of checkpoint triggering would be processed sequentially. 
The final target is getting rid of timer thread and {{triggerLock}}.

Note that we can't avoid all competitions of triggering for now. There is still 
a competition between normal checkpoint triggering and savepoint triggering. We 
could avoid this competition by executing triggering in main thread. But it 
could not be achieved until all blocking operations are handled well in IO 
threads.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)