Re: How do I run SQL query on a dataStream that generates my custom type.

2018-02-15 Thread Timo Walther

Hi,

In your case you don't have to convert to row if you don't want to. The 
Table API will do automatic conversion once the stream of Event is 
converted into a table. However, this only works if Event is a POJO.


If you want to specify own type information your MapFunction can 
implement the ResultTypeQueryable interface. In the getProducedType 
method you can specify your row infomation. Like Types.ROW_NAMED(...).


Hope this helps.

Regards,
Timo

Am 2/14/18 um 10:35 PM schrieb nikhilsimha:

I have a stream of events of a custom type. I want to know how to convert
these events into Rows that can be queried. More specifically how do I
attach type information to the stream of rows that is generated?

I have the following code

```
 val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val tableEnv: StreamTableEnvironment =
TableEnvironment.getTableEnvironment(execEnv)

 val eventStream : DataStream[Event] = ...
 val rowStream: DataStream[Row] = eventStream.map({e: Event => toRow(e)})
 tableEnv.registerDataStream("some_name", rowStream)

 tableEnv.sql("select bedrooms from some_name")
```

This code compiles. But clearly wouldn't work because I didn't specify the
type or position of `bedrooms`.


So what API do I use to specify the typeInformation of my rowStream?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: How do I run SQL query on a dataStream that generates my custom type.

2018-02-15 Thread Timo Walther

Or even easier:

You can do specify the type after the map call:

eventStream.map({e: Event => toRow(e)})(Types.ROW_NAMED(...))

Regards,
Timo


Am 2/15/18 um 9:55 AM schrieb Timo Walther:

Hi,

In your case you don't have to convert to row if you don't want to. 
The Table API will do automatic conversion once the stream of Event is 
converted into a table. However, this only works if Event is a POJO.


If you want to specify own type information your MapFunction can 
implement the ResultTypeQueryable interface. In the getProducedType 
method you can specify your row infomation. Like Types.ROW_NAMED(...).


Hope this helps.

Regards,
Timo

Am 2/14/18 um 10:35 PM schrieb nikhilsimha:
I have a stream of events of a custom type. I want to know how to 
convert

these events into Rows that can be queried. More specifically how do I
attach type information to the stream of rows that is generated?

I have the following code

```
 val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val tableEnv: StreamTableEnvironment =
TableEnvironment.getTableEnvironment(execEnv)

 val eventStream : DataStream[Event] = ...
 val rowStream: DataStream[Row] = eventStream.map({e: Event => 
toRow(e)})

 tableEnv.registerDataStream("some_name", rowStream)

 tableEnv.sql("select bedrooms from some_name")
```

This code compiles. But clearly wouldn't work because I didn't 
specify the

type or position of `bedrooms`.


So what API do I use to specify the typeInformation of my rowStream?



--
Sent from: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/






Re: Deploying Flink with JobManager HA on Docker Swarm/Kubernetes

2018-02-15 Thread Aljoscha Krettek
Hi,

AFAIK, the JobGraph itself is not stored in ZK but in HDFS. ZK only stores a 
handle to the serialised JobGraph.

Best,
Aljoscha

> On 15. Feb 2018, at 04:59, Chirag Dewan  wrote:
> 
> Thanks a lot Aljoscha.
> 
> I was doing a silly mistake. TaskManagers can now register with JobManager.
> 
> One more thing, does Flink now store Job Graphs on ZK too?
> 
> Regards,
> 
> Chirag
> 
> On Wednesday, 14 February, 2018, 8:06:14 PM IST, Aljoscha Krettek 
>  wrote:
> 
> 
> It should be roughly the same settings that you use in your JobManager. They 
> are described here: 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#zookeeper-based-ha-mode
>  
> 
> 
>> On 14. Feb 2018, at 15:32, Chirag Dewan > > wrote:
>> 
>> Thanks Aljoscha.
>> 
>> I haven't checked that bit. Is there any configuration for TaskManagers to 
>> find ZK?
>> 
>> Regards,
>> 
>> Chirag
>> 
>> Sent from Yahoo Mail on Android 
>> 
>> On Wed, 14 Feb 2018 at 7:43 PM, Aljoscha Krettek
>> mailto:aljos...@apache.org>> wrote:
>> Do you see in the logs whether the TaskManager correctly connect to 
>> ZooKeeper as well? They need this in order to find the JobManager leader.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 14. Feb 2018, at 06:12, Chirag Dewan >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I am trying to deploy a Flink cluster (1 JM, 2TM) on a Docker Swarm. For 
>>> JobManager HA, I have started a 3 node zookeeper service on the same swarm 
>>> network and configured Flink's zookeeper quorum with zookeeper service 
>>> instances. 
>>> 
>>> JobManager gets started with the LeaderElectionService and gets assigned a 
>>> LeaderSessionID too, which I can see from the following log 
>>> statements(attaching only related logs) :
>>> 
>>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
>>> Starting ZooKeeperLeaderElectionService   
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
>>> Starting ZooKeeperLeaderRetrievalService.
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
>>> Starting ZooKeeperLeaderRetrievalService.
>>> JobManager akka.tcp://flink@jobmanager:6123/user/jobmanager <> was granted 
>>> leadership with leader session ID 
>>> Some(1f3b2ec6-77b6-4532-928f-ad8befd5202f).
>>>  Trying to associate with JobManager leader 
>>> akka.tcp://flink@jobmanager:6123/user/jobmanager <>
>>>  Resource Manager associating with leading JobManager 
>>> Actor[akka://flink/user/jobmanager#590681231 <>] - leader session 
>>> 1f3b2ec6-77b6-4532-928f-ad8befd5202f
>>> 
>>> 
>>> But TaskManagers are not able to register with the JobManager and gives the 
>>> following error:
>>> 
>>> Discard message 
>>> LeaderSessionMessage(----,RegisterTaskManager(4fc8aceeae1e27e42b9f16df6c0cf5e3,4fc8aceeae1e27e42b9f16df6c0cf5e3
>>>  @ a118cdf39114 (dataPort=43017),cores=1, physMem=1044111360, 
>>> heap=536870912, managed=324208384,1)) because the expected leader session 
>>> ID 1f3b2ec6-77b6-4532-928f-ad8befd5202f did not equal the received leader 
>>> session ID ----.
>>> 
>>> Seems like the ResourceManager was not able to retrieve the LeaderSessionID 
>>> and passed 00 ID. 
>>> 
>>> One interesting thing I observed was a ZK version log:
>>> 
>>> The version of ZooKeeper being used doesn't support Container nodes. 
>>> CreateMode.PERSISTENT will be used instead.
>>> 
>>> Is this a ZK version problem? Should I be using ZK 3.4.6?
>>> 
>>> My configuration:
>>> 
>>> Flink Version : 1.4.0
>>> ZK version : 3.4.11 (I just pulled the latest image)
>>> 
>>> Thanks in advance. 
>>> 
>>> Chirag
>>> 
>> 
> 



Retrieving name of last external checkpoint directory

2018-02-15 Thread Dawid Wysakowicz
Hi,

We are running few jobs on yarn and in case of some failure (that the job could 
not recover from on its own) we want to use last successful external checkpoint 
to restore the job from manually. The problem is that the
${state.checkpoints.dir} contains checkpoint directories for all jobs that we 
are running. How can we find out the last successful external checkpoint for 
some particular job? Will be grateful for any pointers.

Regards,
Dawid


signature.asc
Description: Message signed with OpenPGP


Re: Flink + Consul as HA backend. What do you think?

2018-02-15 Thread Krzysztof Białek
Alright, just came across the first real-life problem with my Consul HA
implementation.
In Consul KV store there is a limit of 512kB per node and JobGraph of one
of my apps exceeded it.
In ZK there seems to be similar zNode Limit = 1MB
How did you workaround it? Or maybe I serialize the JobGraph wrong?

On Thu, Feb 15, 2018 at 8:47 AM, Krzysztof Białek  wrote:

> I have very little experience with ZK and cannot explain the differences
> between ZK and Consul by myself. However there are some comparisions
> available:
> * https://www.consul.io/intro/vs/zookeeper.html - done by Consul so may
> be biased
> * https://www.slideshare.net/IvanGlushkov/zookeeper-vs-consul-41882991
> * https://jakon.me/2017/01/consul-deployment-orchestration/
>
> Regarding testing - I did basic failover scenarios on my workstation with
> 2 JobManagers, 2 TaskManagers and WindowJoin example app with checkpointing
> and restarting turned on.
> I was running the cluster no longer than for few hours.
>
> For now I'd like to open Flink for alternative HA backends (
> https://issues.apache.org/jira/browse/FLINK-8660)
>
>
> On Wed, Feb 14, 2018 at 1:47 PM, Chesnay Schepler 
> wrote:
>
>> Hello,
>>
>> I don't know anything about Consul but the prospect of having other
>> options beside Zookeeper is very interesting. It's rather surprising how
>> little you had to modify existing classes to get this to work.
>>
>> It may take a bit until someone provides proper feedback as the community
>> is currently prepping 2 releases (1.4.1 and 1.5), please don't be
>> discouraged by this.
>>
>> I saw that your branch was based on the 1.4 version. In 1.5 we reworked
>> the distributed architecture of Flink (in an initiative commonly referred
>> to as FLIP-6) which may affect your work.
>>
>> 2 things to note from my side:
>> It would also be helpful if you could explain the differences between ZK
>> and Consul and how they stack up in terms of guarantees etc. .
>> How did you test your solution so far? (Like how long was a cluster
>> running, what failure scenarios)
>>
>>
>> On 13.02.2018 21:38, Krzysztof Białek wrote:
>>
>> I'd like to get your opinion about this idea. I found related JIRA issue 
>> FLINK-2366,
>> but it seems to be dead. To attract your attention I copy my comment here.
>>
>> As an experiment I've implemented Flink HA on top of Consul. The
>> implementation is working fine in the "lab" but is not battle tested yet.
>> The source code is available at https://github.com/kbialek/
>> flink/tree/feature/consul (flink-runtime package
>> org.apache.flink.runtime.consul)
>>
>> Why?. Generally I'd like to keep as less moving parts as possible. We do
>> not have Zookeeper running, but Consul is already in place. And in the end
>> freedom of choice is a good thing.
>>
>> It would be great to see built-in Consul support in Flink someday, but if
>> it is not expected then I suggest a little refactoring to open possibility
>> to configure HighAvailabilityServicesFactory. As far as I can see this
>> should be enough to inject any HA implementation.
>>
>> Regards,
>> Krzysztof
>>
>>
>>
>


Re: flink read hdfs file error

2018-02-15 Thread Or Sher
Hi,
Did you ever get to solve this issue? I'm getting the same error.

On 1.3.2 I used to run the fat jar as a standalone without any job
submission and it worked just fine
It looked like it used an embedded MiniCluster.

After just changing the dependencies to 1.4.0 we started getting this
errors.

Does this execution type should also be supported in 1.4.0?
Couldn't find anything in the docs about it.
It always says to run the job using "flink run" which depends on an already
running cluster.

On Fri, Jan 26, 2018 at 12:59 PM Aljoscha Krettek 
wrote:

> Hi,
>
> It seems you are using Akka in your user program and the reference.conf in
> your Jar is clashing with the same File from Flink. What is the reason for
> having Akka (or reference.conf) in your user jar?
>
> Best,
> Aljoscha
>
>
> On 22. Jan 2018, at 11:09, 韩宁宁 <453673...@qq.com> wrote:
>
> Dear All
>I have a question about  Flink&Hadoop.
> I want to read the files on HDFS by flink,but I encountered an
> error as follows,can you please advise the solution about this problem.
> It will be much appreciated.:
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> Exception in thread "main"
> com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf
> @
> jar:file:/data/home/fbi/hanningning/flink-hdfs/target/flink-hdfs.jar!/reference.conf:
> 804: Could not resolve substitution to a value: ${akka.stream.materializer}
> at
> com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
> at
> com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
> at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
> at
> com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
> at
> com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
> at
> com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
> at
> com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
> at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
> at
> com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
> at
> com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
> at
> com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
> at
> com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
> at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
> at
> com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
> at
> com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
> at
> com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
> at
> com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
> at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
> at
> com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
> at
> com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
> at
> com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
> at
> com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
> at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
> at
> com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
> .
>
>
> my code===
>
> public class App {
>
> public static void main(String[] args) throws Exception {
>
> final String inputPath = args[0]//hdfs file path;
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>
> HadoopInputFormat hadoopInputFormat =
> new HadoopInputFormat(new 
> TextInputFormat(),LongWritable.class,
> Text.class,new JobConf());
> TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new 
> Path(inputPath));
>
> DataSet> text = 
> env.createInput(hadoopInputFormat);
>
> text.print();
>
> env.execute("read hdfs by flink test");
> }
>
> }
>
> ==maven
> dependencies===
>
> 
> 1.4.0
> 1.2.17
> 2.4.20
> 
>
> 
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-clients_2.11
> ${flink.version}
> 
> 
> org.apache.flink
> flink-hadoo

Re: Optimizing multiple aggregate queries on a CEP using Flink

2018-02-15 Thread Sahil Arora
Hi Timo,
Thanks a lot for the help. I will be looking forward to a reply from Kostas
to be clearer on this.


On Mon, 12 Feb 2018, 10:01 pm Timo Walther,  wrote:

> Hi Sahil,
>
> I'm not a CEP expert but I will loop in Kostas (in CC). In general, the
> example that you described can be easily done with a ProcessFunction [1]. A
> process function not only allows to keep state (like a count) but also
> allows you to set timers flexibly for specific use cases such that
> aggregations can be triggered/reused. So in general I would say that
> implementing and testing such an algorithm is possible. How easy it can be
> interegrated into the CEP API, I don't know.
>
> Regards,
> Timo
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html
>
> Am 2/9/18 um 11:28 PM schrieb Sahil Arora:
>
> Hi there,
> We have been working on a project with the title "Optimizing Multiple
> Aggregate Queries over a Complex Event Processing Engine". The aim is to
> optimize a group of queries. Take such as* "how many cars passed the post
> in the past 1 minute" *and* "how many cars passed the post in the past 2
> minutes"* are 2 queries, and the naive and inefficient method to answer
> both the queries is to independently solve both of these queries one by one
> and find the answer. However, the optimum way would be to minimize the
> computation by using the answer given by query 1 and using it in query 2.
> This is basically what our aim is, to minimize computation cost when we
> have multiple aggregate queries in a CEP.
>
> We have been searching for some platform which supports CEP, and Flink is
> probably one of them. Hence, it would be very helpful if we could get some
> answers to the following questions:
>
> 1. Does flink already have some method of optimizing multiple aggregate
> queries?
> 2. Is it possible for us to implement / test such an algorithm in flink
> which considers multiple queries in a CEP, like having a database of SQL
> queries and testing an algorithm of our choice?
>
> Any other inputs which may help us with solving the problem would be
> highly welcome.
>
> Thanks a lot.
> --
> Sahil Arora
> Final year B.Tech Undergrad | Indian Institute of Technology Mandi
> Web: https://sahilarora535.github.io
> LinkedIn: sahilarora535 
> Ph: +91-8130506047 <+91%2081305%2006047>
>
>
> --
Sahil Arora
Final year B.Tech Undergrad | Indian Institute of Technology Mandi
Web: https://sahilarora535.github.io
LinkedIn: sahilarora535 
Ph: +91-8130506047 <+91%2081305%2006047>


Re: Flink + Consul as HA backend. What do you think?

2018-02-15 Thread Fabian Hueske
Hi,

all data is stored in a distributed file system or object store (HDFS, S3,
Ceph, ...) and ZooKeeper only stores pointers to that data.

Cheers, Fabian

2018-02-15 11:08 GMT+01:00 Krzysztof Białek :

> Alright, just came across the first real-life problem with my Consul HA
> implementation.
> In Consul KV store there is a limit of 512kB per node and JobGraph of one
> of my apps exceeded it.
> In ZK there seems to be similar zNode Limit = 1MB
> How did you workaround it? Or maybe I serialize the JobGraph wrong?
>
> On Thu, Feb 15, 2018 at 8:47 AM, Krzysztof Białek <
> krzysiek.bia...@gmail.com> wrote:
>
>> I have very little experience with ZK and cannot explain the differences
>> between ZK and Consul by myself. However there are some comparisions
>> available:
>> * https://www.consul.io/intro/vs/zookeeper.html - done by Consul so may
>> be biased
>> * https://www.slideshare.net/IvanGlushkov/zookeeper-vs-consul-41882991
>> * https://jakon.me/2017/01/consul-deployment-orchestration/
>>
>> Regarding testing - I did basic failover scenarios on my workstation with
>> 2 JobManagers, 2 TaskManagers and WindowJoin example app with checkpointing
>> and restarting turned on.
>> I was running the cluster no longer than for few hours.
>>
>> For now I'd like to open Flink for alternative HA backends (
>> https://issues.apache.org/jira/browse/FLINK-8660)
>>
>>
>> On Wed, Feb 14, 2018 at 1:47 PM, Chesnay Schepler 
>> wrote:
>>
>>> Hello,
>>>
>>> I don't know anything about Consul but the prospect of having other
>>> options beside Zookeeper is very interesting. It's rather surprising how
>>> little you had to modify existing classes to get this to work.
>>>
>>> It may take a bit until someone provides proper feedback as the
>>> community is currently prepping 2 releases (1.4.1 and 1.5), please don't be
>>> discouraged by this.
>>>
>>> I saw that your branch was based on the 1.4 version. In 1.5 we reworked
>>> the distributed architecture of Flink (in an initiative commonly referred
>>> to as FLIP-6) which may affect your work.
>>>
>>> 2 things to note from my side:
>>> It would also be helpful if you could explain the differences between ZK
>>> and Consul and how they stack up in terms of guarantees etc. .
>>> How did you test your solution so far? (Like how long was a cluster
>>> running, what failure scenarios)
>>>
>>>
>>> On 13.02.2018 21:38, Krzysztof Białek wrote:
>>>
>>> I'd like to get your opinion about this idea. I found related JIRA issue
>>>  FLINK-2366, but it seems to be dead. To attract your attention I copy
>>> my comment here.
>>>
>>> As an experiment I've implemented Flink HA on top of Consul. The
>>> implementation is working fine in the "lab" but is not battle tested yet.
>>> The source code is available at https://github.com/kbialek/
>>> flink/tree/feature/consul (flink-runtime package
>>> org.apache.flink.runtime.consul)
>>>
>>> Why?. Generally I'd like to keep as less moving parts as possible. We do
>>> not have Zookeeper running, but Consul is already in place. And in the end
>>> freedom of choice is a good thing.
>>>
>>> It would be great to see built-in Consul support in Flink someday, but
>>> if it is not expected then I suggest a little refactoring to open
>>> possibility to configure HighAvailabilityServicesFactory. As far as I
>>> can see this should be enough to inject any HA implementation.
>>>
>>> Regards,
>>> Krzysztof
>>>
>>>
>>>
>>
>


Re: Flink + Consul as HA backend. What do you think?

2018-02-15 Thread Krzysztof Białek
Alright, I have checkpoints saving implemented that way. I will apply this
same pattern to jobgraphs.


On Thu, Feb 15, 2018 at 11:13 AM, Fabian Hueske  wrote:

> Hi,
>
> all data is stored in a distributed file system or object store (HDFS, S3,
> Ceph, ...) and ZooKeeper only stores pointers to that data.
>
> Cheers, Fabian
>
> 2018-02-15 11:08 GMT+01:00 Krzysztof Białek :
>
>> Alright, just came across the first real-life problem with my Consul HA
>> implementation.
>> In Consul KV store there is a limit of 512kB per node and JobGraph of one
>> of my apps exceeded it.
>> In ZK there seems to be similar zNode Limit = 1MB
>> How did you workaround it? Or maybe I serialize the JobGraph wrong?
>>
>> On Thu, Feb 15, 2018 at 8:47 AM, Krzysztof Białek <
>> krzysiek.bia...@gmail.com> wrote:
>>
>>> I have very little experience with ZK and cannot explain the differences
>>> between ZK and Consul by myself. However there are some comparisions
>>> available:
>>> * https://www.consul.io/intro/vs/zookeeper.html - done by Consul so may
>>> be biased
>>> * https://www.slideshare.net/IvanGlushkov/zookeeper-vs-consul-41882991
>>> * https://jakon.me/2017/01/consul-deployment-orchestration/
>>>
>>> Regarding testing - I did basic failover scenarios on my workstation
>>> with 2 JobManagers, 2 TaskManagers and WindowJoin example app with
>>> checkpointing and restarting turned on.
>>> I was running the cluster no longer than for few hours.
>>>
>>> For now I'd like to open Flink for alternative HA backends (
>>> https://issues.apache.org/jira/browse/FLINK-8660)
>>>
>>>
>>> On Wed, Feb 14, 2018 at 1:47 PM, Chesnay Schepler 
>>> wrote:
>>>
 Hello,

 I don't know anything about Consul but the prospect of having other
 options beside Zookeeper is very interesting. It's rather surprising how
 little you had to modify existing classes to get this to work.

 It may take a bit until someone provides proper feedback as the
 community is currently prepping 2 releases (1.4.1 and 1.5), please don't be
 discouraged by this.

 I saw that your branch was based on the 1.4 version. In 1.5 we reworked
 the distributed architecture of Flink (in an initiative commonly referred
 to as FLIP-6) which may affect your work.

 2 things to note from my side:
 It would also be helpful if you could explain the differences between
 ZK and Consul and how they stack up in terms of guarantees etc. .
 How did you test your solution so far? (Like how long was a cluster
 running, what failure scenarios)


 On 13.02.2018 21:38, Krzysztof Białek wrote:

 I'd like to get your opinion about this idea. I found related JIRA issue
  FLINK-2366, but it seems to be dead. To attract your attention I copy
 my comment here.

 As an experiment I've implemented Flink HA on top of Consul. The
 implementation is working fine in the "lab" but is not battle tested yet.
 The source code is available at https://github.com/kbialek/
 flink/tree/feature/consul (flink-runtime package
 org.apache.flink.runtime.consul)

 Why?. Generally I'd like to keep as less moving parts as possible. We
 do not have Zookeeper running, but Consul is already in place. And in the
 end freedom of choice is a good thing.

 It would be great to see built-in Consul support in Flink someday, but
 if it is not expected then I suggest a little refactoring to open
 possibility to configure HighAvailabilityServicesFactory. As far as I
 can see this should be enough to inject any HA implementation.

 Regards,
 Krzysztof



>>>
>>
>


Re: Optimizing multiple aggregate queries on a CEP using Flink

2018-02-15 Thread Kostas Kloudas
Hi Sahil,

Currently CEP does not support multi-query optimizations out-of-the-box.
In some cases you can do manual optimizations to your code, but there is 
no optimizer involved.

Cheers,
Kostas

> On Feb 15, 2018, at 11:12 AM, Sahil Arora  wrote:
> 
> Hi Timo,
> Thanks a lot for the help. I will be looking forward to a reply from Kostas 
> to be clearer on this.
>  
> 
> On Mon, 12 Feb 2018, 10:01 pm Timo Walther,  > wrote:
> Hi Sahil,
> 
> I'm not a CEP expert but I will loop in Kostas (in CC). In general, the 
> example that you described can be easily done with a ProcessFunction [1]. A 
> process function not only allows to keep state (like a count) but also allows 
> you to set timers flexibly for specific use cases such that aggregations can 
> be triggered/reused. So in general I would say that implementing and testing 
> such an algorithm is possible. How easy it can be interegrated into the CEP 
> API, I don't know.
> 
> Regards,
> Timo
> 
> 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html
>  
> 
> 
> Am 2/9/18 um 11:28 PM schrieb Sahil Arora:
>> Hi there,
>> We have been working on a project with the title "Optimizing Multiple 
>> Aggregate Queries over a Complex Event Processing Engine". The aim is to 
>> optimize a group of queries. Take such as "how many cars passed the post in 
>> the past 1 minute" and "how many cars passed the post in the past 2 minutes" 
>> are 2 queries, and the naive and inefficient method to answer both the 
>> queries is to independently solve both of these queries one by one and find 
>> the answer. However, the optimum way would be to minimize the computation by 
>> using the answer given by query 1 and using it in query 2. This is basically 
>> what our aim is, to minimize computation cost when we have multiple 
>> aggregate queries in a CEP.
>> 
>> We have been searching for some platform which supports CEP, and Flink is 
>> probably one of them. Hence, it would be very helpful if we could get some 
>> answers to the following questions:
>> 
>> 1. Does flink already have some method of optimizing multiple aggregate 
>> queries?
>> 2. Is it possible for us to implement / test such an algorithm in flink 
>> which considers multiple queries in a CEP, like having a database of SQL 
>> queries and testing an algorithm of our choice? 
>> 
>> Any other inputs which may help us with solving the problem would be highly 
>> welcome.
>> 
>> Thanks a lot.
>> -- 
>> Sahil Arora
>> Final year B.Tech Undergrad | Indian Institute of Technology Mandi
>> Web: https://sahilarora535.github.io 
>> LinkedIn: sahilarora535 
>> Ph: +91-8130506047 
> -- 
> Sahil Arora
> Final year B.Tech Undergrad | Indian Institute of Technology Mandi
> Web: https://sahilarora535.github.io 
> LinkedIn: sahilarora535 
> Ph: +91-8130506047 


Manipulating Processing elements of Network Buffers

2018-02-15 Thread m@xi
Hello Flinker!

I know that one should set appropriately the number of Network Buffers (NB)
that its Flink deployment will use. Except from that, I am wondering if one
might change/manipulate the specific sequence of data records into the NB in
order to optimize the performance of its application.

For instance, lets assume that a NB has now 3 elements {a,b,c} in this
specific order. The data is going be shipped to a taskmanager(s) for further
processing etc etc. But maybe if the aforementioned elements where to be
shipped in another order, e.g. {b,c,a} then a specific task would run
faster.

Is there any such way to manipulate the ordering in the NB or the ordering
of the arrival of tuples at the input of an operator???

Thanks in advance.

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Java heap size in YARN

2018-02-15 Thread Pawel Bartoszek
Hi,

I have a question regarding configuration of task manager heap size when
running YARN session on EMR.

I am running 2 task managers on m4.4xlarge (64GB RAM). I would like to use
as much as possible of that memory for the task manager heap.

However when requesting 56000 MB when staring YARN actually only around
42GB is assigned to TM. Do you know how I can increase that?


This is how I start YARN session:
/usr/lib/flink/bin/yarn-session.sh --container 2 --taskManagerMemory 56000
--slots 16 --detached -Dparallelism.default=32
-Dtaskmanager.network.numberOfBuffers=20480 ...


This is the output of *ps aux *on TM box

yarn  42843 1030 67.7 46394740 44688084 ?   Sl   15:27 175:56
/usr/lib/jvm/java-openjdk/bin/java -Xms42000m -Xmx42000m 

yarn  42837  0.0  0.0 113104  2684 ?Ss   15:27   0:00 /bin/bash
-c /usr/lib/jvm/java-openjdk/bin/java -Xms42000m -Xmx42000m 


I would expect around 56GB set as max heap size for TM.

some settings from yarn-site.xml that might be of interest:


yarn.scheduler.maximum-allocation-mb
57344
  


yarn.nodemanager.resource.memory-mb
57344
  


yarn.nodemanager.vmem-pmem-ratio
5
  


Cheers,
Pawel


Re: Java heap size in YARN

2018-02-15 Thread Pawel Bartoszek
I tried also setting  taskmanager.memory.off-heap to true

I still get around 42GB (Heap + DirectMemory)

yarn  56827  837 16.6 16495964 10953748 ?   Sl   16:53  34:10
/usr/lib/jvm/java-openjdk/bin/java -Xms12409m -Xmx12409m
-XX:MaxDirectMemorySize=29591m

Cheers,
Pawel


On 15 February 2018 at 16:03, Pawel Bartoszek 
wrote:

> Hi,
>
> I have a question regarding configuration of task manager heap size when
> running YARN session on EMR.
>
> I am running 2 task managers on m4.4xlarge (64GB RAM). I would like to use
> as much as possible of that memory for the task manager heap.
>
> However when requesting 56000 MB when staring YARN actually only around
> 42GB is assigned to TM. Do you know how I can increase that?
>
>
> This is how I start YARN session:
> /usr/lib/flink/bin/yarn-session.sh --container 2 --taskManagerMemory
> 56000 --slots 16 --detached -Dparallelism.default=32 
> -Dtaskmanager.network.numberOfBuffers=20480
> ...
>
>
> This is the output of *ps aux *on TM box
>
> yarn  42843 1030 67.7 46394740 44688084 ?   Sl   15:27 175:56
> /usr/lib/jvm/java-openjdk/bin/java -Xms42000m -Xmx42000m 
>
> yarn  42837  0.0  0.0 113104  2684 ?Ss   15:27   0:00
> /bin/bash -c /usr/lib/jvm/java-openjdk/bin/java -Xms42000m -Xmx42000m 
>
>
> I would expect around 56GB set as max heap size for TM.
>
> some settings from yarn-site.xml that might be of interest:
>
> 
> yarn.scheduler.maximum-allocation-mb
> 57344
>   
>
> 
> yarn.nodemanager.resource.memory-mb
> 57344
>   
>
> 
> yarn.nodemanager.vmem-pmem-ratio
> 5
>   
>
>
> Cheers,
> Pawel
>
>
>


Re: Java heap size in YARN

2018-02-15 Thread Kien Truong

Hi,

The relevant settings is:

|containerized.heap-cutoff-ratio|: (Default 0.25) Percentage of heap 
space to remove from containers started by YARN. When a user requests a 
certain amount of memory for each TaskManager container (for example 4 
GB), we can not pass this amount as the maximum heap space for the JVM 
(|-Xmx| argument) because the JVM is also allocating memory outside the 
heap. YARN is very strict with killing containers which are using more 
memory than requested. Therefore, we remove this fraction of the memory 
from the requested heap as a safety margin and add it to the memory used 
off-heap.



You can reduce this in order to get a bigger JVM heap size or increase 
it in order to reserve more memory for off-heap usage (for jobs with 
large rocksdb state),


but I suggest you not changing this setting without careful consideration.


Regards,

Kien


On 2/16/2018 12:01 AM, Pawel Bartoszek wrote:

I tried also setting |taskmanager.memory.off-heap|| to true|

I still get around 42GB (Heap + DirectMemory)

yarn      56827  837 16.6 16495964 10953748 ?   Sl  16:53  34:10 
/usr/lib/jvm/java-openjdk/bin/java -Xms12409m -Xmx12409m 
-XX:MaxDirectMemorySize=29591m


Cheers,
Pawel


On 15 February 2018 at 16:03, Pawel Bartoszek 
mailto:pawelbartosze...@gmail.com>> wrote:


Hi,

I have a question regarding configuration of task manager heap
size when running YARN session on EMR.

I am running 2 task managers on m4.4xlarge (64GB RAM). I would
like to use as much as possible of that memory for the task
manager heap.

However when requesting 56000 MB when staring YARN actually only
around 42GB is assigned to TM. Do you know how I can increase that?


This is how I start YARN session:
/usr/lib/flink/bin/yarn-session.sh --container 2
--taskManagerMemory 56000 --slots 16 --detached
-Dparallelism.default=32
-Dtaskmanager.network.numberOfBuffers=20480 ...


This is the output of /ps aux /on TM box

yarn      42843 1030 67.7 46394740 44688084 ?   Sl  15:27 175:56
/usr/lib/jvm/java-openjdk/bin/java -Xms42000m -Xmx42000m 

yarn     42837  0.0  0.0 113104  2684 ?        Ss  15:27   0:00
/bin/bash -c /usr/lib/jvm/java-openjdk/bin/java -Xms42000m
-Xmx42000m 


I would expect around 56GB set as max heap size for TM.

some settings from yarn-site.xml that might be of interest:


    yarn.scheduler.maximum-allocation-mb
    57344
  


    yarn.nodemanager.resource.memory-mb
    57344
  


    yarn.nodemanager.vmem-pmem-ratio
    5
  


Cheers,
Pawel




Re: Java heap size in YARN

2018-02-15 Thread Pawel Bartoszek
Thanks Kien. I will at least play with the setting :) We use hadoop (s3) as
a chekpoint store. In our case off heap memory is around 300MB as reported
on task manager statistic page.

15 lut 2018 17:24 "Kien Truong"  napisał(a):

> Hi,
>
> The relevant settings is:
>
> containerized.heap-cutoff-ratio: (Default 0.25) Percentage of heap space
> to remove from containers started by YARN. When a user requests a certain
> amount of memory for each TaskManager container (for example 4 GB), we can
> not pass this amount as the maximum heap space for the JVM (-Xmx
> argument) because the JVM is also allocating memory outside the heap. YARN
> is very strict with killing containers which are using more memory than
> requested. Therefore, we remove this fraction of the memory from the
> requested heap as a safety margin and add it to the memory used off-heap.
>
>
> You can reduce this in order to get a bigger JVM heap size or increase it
> in order to reserve more memory for off-heap usage (for jobs with large
> rocksdb state),
>
> but I suggest you not changing this setting without careful consideration.
>
>
> Regards,
>
> Kien
>
>
> On 2/16/2018 12:01 AM, Pawel Bartoszek wrote:
>
> I tried also setting  taskmanager.memory.off-heap to true
>
> I still get around 42GB (Heap + DirectMemory)
>
> yarn  56827  837 16.6 16495964 10953748 ?   Sl   16:53  34:10
> /usr/lib/jvm/java-openjdk/bin/java -Xms12409m -Xmx12409m
> -XX:MaxDirectMemorySize=29591m
>
> Cheers,
> Pawel
>
>
> On 15 February 2018 at 16:03, Pawel Bartoszek 
> wrote:
>
>> Hi,
>>
>> I have a question regarding configuration of task manager heap size when
>> running YARN session on EMR.
>>
>> I am running 2 task managers on m4.4xlarge (64GB RAM). I would like to
>> use as much as possible of that memory for the task manager heap.
>>
>> However when requesting 56000 MB when staring YARN actually only around
>> 42GB is assigned to TM. Do you know how I can increase that?
>>
>>
>> This is how I start YARN session:
>> /usr/lib/flink/bin/yarn-session.sh --container 2 --taskManagerMemory
>> 56000 --slots 16 --detached -Dparallelism.default=32
>> -Dtaskmanager.network.numberOfBuffers=20480 ...
>>
>>
>> This is the output of *ps aux *on TM box
>>
>> yarn  42843 1030 67.7 46394740 44688084 ?   Sl   15:27 175:56
>> /usr/lib/jvm/java-openjdk/bin/java -Xms42000m -Xmx42000m 
>>
>> yarn  42837  0.0  0.0 113104  2684 ?Ss   15:27   0:00
>> /bin/bash -c /usr/lib/jvm/java-openjdk/bin/java -Xms42000m -Xmx42000m
>> 
>>
>>
>> I would expect around 56GB set as max heap size for TM.
>>
>> some settings from yarn-site.xml that might be of interest:
>>
>> 
>> yarn.scheduler.maximum-allocation-mb
>> 57344
>>   
>>
>> 
>> yarn.nodemanager.resource.memory-mb
>> 57344
>>   
>>
>> 
>> yarn.nodemanager.vmem-pmem-ratio
>> 5
>>   
>>
>>
>> Cheers,
>> Pawel
>>
>>
>>
>


Re: Java heap size in YARN

2018-02-15 Thread Kien Truong
The off heap usage reported in the task manager ui can be misleading, because 
it does not contain the memory used by native library like rocksdb, which can 
be huge if you have large stateful job.

Regards,
Kien

⁣Sent from TypeApp ​

On Feb 16, 2018, 00:33, at 00:33, Pawel Bartoszek  
wrote:
>Thanks Kien. I will at least play with the setting :) We use hadoop
>(s3) as
>a chekpoint store. In our case off heap memory is around 300MB as
>reported
>on task manager statistic page.
>
>15 lut 2018 17:24 "Kien Truong"  napisał(a):
>
>> Hi,
>>
>> The relevant settings is:
>>
>> containerized.heap-cutoff-ratio: (Default 0.25) Percentage of heap
>space
>> to remove from containers started by YARN. When a user requests a
>certain
>> amount of memory for each TaskManager container (for example 4 GB),
>we can
>> not pass this amount as the maximum heap space for the JVM (-Xmx
>> argument) because the JVM is also allocating memory outside the heap.
>YARN
>> is very strict with killing containers which are using more memory
>than
>> requested. Therefore, we remove this fraction of the memory from the
>> requested heap as a safety margin and add it to the memory used
>off-heap.
>>
>>
>> You can reduce this in order to get a bigger JVM heap size or
>increase it
>> in order to reserve more memory for off-heap usage (for jobs with
>large
>> rocksdb state),
>>
>> but I suggest you not changing this setting without careful
>consideration.
>>
>>
>> Regards,
>>
>> Kien
>>
>>
>> On 2/16/2018 12:01 AM, Pawel Bartoszek wrote:
>>
>> I tried also setting  taskmanager.memory.off-heap to true
>>
>> I still get around 42GB (Heap + DirectMemory)
>>
>> yarn  56827  837 16.6 16495964 10953748 ?   Sl   16:53  34:10
>> /usr/lib/jvm/java-openjdk/bin/java -Xms12409m -Xmx12409m
>> -XX:MaxDirectMemorySize=29591m
>>
>> Cheers,
>> Pawel
>>
>>
>> On 15 February 2018 at 16:03, Pawel Bartoszek
>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a question regarding configuration of task manager heap size
>when
>>> running YARN session on EMR.
>>>
>>> I am running 2 task managers on m4.4xlarge (64GB RAM). I would like
>to
>>> use as much as possible of that memory for the task manager heap.
>>>
>>> However when requesting 56000 MB when staring YARN actually only
>around
>>> 42GB is assigned to TM. Do you know how I can increase that?
>>>
>>>
>>> This is how I start YARN session:
>>> /usr/lib/flink/bin/yarn-session.sh --container 2 --taskManagerMemory
>>> 56000 --slots 16 --detached -Dparallelism.default=32
>>> -Dtaskmanager.network.numberOfBuffers=20480 ...
>>>
>>>
>>> This is the output of *ps aux *on TM box
>>>
>>> yarn  42843 1030 67.7 46394740 44688084 ?   Sl   15:27 175:56
>>> /usr/lib/jvm/java-openjdk/bin/java -Xms42000m -Xmx42000m 
>>>
>>> yarn  42837  0.0  0.0 113104  2684 ?Ss   15:27   0:00
>>> /bin/bash -c /usr/lib/jvm/java-openjdk/bin/java -Xms42000m
>-Xmx42000m
>>> 
>>>
>>>
>>> I would expect around 56GB set as max heap size for TM.
>>>
>>> some settings from yarn-site.xml that might be of interest:
>>>
>>> 
>>> yarn.scheduler.maximum-allocation-mb
>>> 57344
>>>   
>>>
>>> 
>>> yarn.nodemanager.resource.memory-mb
>>> 57344
>>>   
>>>
>>> 
>>> yarn.nodemanager.vmem-pmem-ratio
>>> 5
>>>   
>>>
>>>
>>> Cheers,
>>> Pawel
>>>
>>>
>>>
>>


[ANNOUNCE] Apache Flink 1.4.1 released

2018-02-15 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache 
Flink 1.4.1, which is the first bugfix release for the Apache Flink 1.4 series.

Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this bugfix release:
https://flink.apache.org/news/2018/02/15/release-1.4.1.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12342212

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Cheers,
Gordon



Re: Optimizing multiple aggregate queries on a CEP using Flink

2018-02-15 Thread Sahil Arora
Thank you Kostas for your inputs. We will try to integrate an optimizer
into flink and will get back in case we get stuck.

Regards.

On Thu, 15 Feb 2018 at 19:11 Kostas Kloudas 
wrote:

> Hi Sahil,
>
> Currently CEP does not support multi-query optimizations out-of-the-box.
> In some cases you can do manual optimizations to your code, but there is
> no optimizer involved.
>
> Cheers,
> Kostas
>
>
> On Feb 15, 2018, at 11:12 AM, Sahil Arora 
> wrote:
>
> Hi Timo,
> Thanks a lot for the help. I will be looking forward to a reply from
> Kostas to be clearer on this.
>
>
> On Mon, 12 Feb 2018, 10:01 pm Timo Walther,  wrote:
>
>> Hi Sahil,
>>
>> I'm not a CEP expert but I will loop in Kostas (in CC). In general, the
>> example that you described can be easily done with a ProcessFunction [1]. A
>> process function not only allows to keep state (like a count) but also
>> allows you to set timers flexibly for specific use cases such that
>> aggregations can be triggered/reused. So in general I would say that
>> implementing and testing such an algorithm is possible. How easy it can be
>> interegrated into the CEP API, I don't know.
>>
>> Regards,
>> Timo
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html
>>
>> Am 2/9/18 um 11:28 PM schrieb Sahil Arora:
>>
>> Hi there,
>> We have been working on a project with the title "Optimizing Multiple
>> Aggregate Queries over a Complex Event Processing Engine". The aim is to
>> optimize a group of queries. Take such as* "how many cars passed the
>> post in the past 1 minute" *and* "how many cars passed the post in the
>> past 2 minutes"* are 2 queries, and the naive and inefficient method to
>> answer both the queries is to independently solve both of these queries one
>> by one and find the answer. However, the optimum way would be to minimize
>> the computation by using the answer given by query 1 and using it in query
>> 2. This is basically what our aim is, to minimize computation cost when we
>> have multiple aggregate queries in a CEP.
>>
>> We have been searching for some platform which supports CEP, and Flink is
>> probably one of them. Hence, it would be very helpful if we could get some
>> answers to the following questions:
>>
>> 1. Does flink already have some method of optimizing multiple aggregate
>> queries?
>> 2. Is it possible for us to implement / test such an algorithm in flink
>> which considers multiple queries in a CEP, like having a database of SQL
>> queries and testing an algorithm of our choice?
>>
>> Any other inputs which may help us with solving the problem would be
>> highly welcome.
>>
>> Thanks a lot.
>> --
>> Sahil Arora
>> Final year B.Tech Undergrad | Indian Institute of Technology Mandi
>> Web: https://sahilarora535.github.io
>> LinkedIn: sahilarora535 
>> Ph: +91-8130506047 <+91%2081305%2006047>
>>
>>
>> --
> Sahil Arora
> Final year B.Tech Undergrad | Indian Institute of Technology Mandi
> Web: https://sahilarora535.github.io
> LinkedIn: sahilarora535 
> Ph: +91-8130506047 <+91%2081305%2006047>
>
>
> --
Sahil Arora
Final year B.Tech Undergrad | Indian Institute of Technology Mandi
Web: https://sahilarora535.github.io
LinkedIn: sahilarora535 
Ph: +91-8130506047 <+91%2081305%2006047>


Re: [ANNOUNCE] Apache Flink 1.4.1 released

2018-02-15 Thread Bowen Li
Congratulations everyone!

On Thu, Feb 15, 2018 at 10:04 AM, Tzu-Li (Gordon) Tai 
wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.4.1, which is the first bugfix release for the Apache Flink 1.4
> series.
>
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
>
> The release is available for download at:
>
> https://flink.apache.org/downloads.html
>
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
>
> https://flink.apache.org/news/2018/02/15/release-1.4.1.html
>
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?
> projectId=12315522&version=12342212
>
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
>
> Cheers,
>
> Gordon
>
>


Re: [ANNOUNCE] Apache Flink 1.4.1 released

2018-02-15 Thread Hao Sun
This is great!

On Thu, Feb 15, 2018 at 2:50 PM Bowen Li  wrote:

> Congratulations everyone!
>
> On Thu, Feb 15, 2018 at 10:04 AM, Tzu-Li (Gordon) Tai  > wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.4.1, which is the first bugfix release for the Apache Flink
>> 1.4 series.
>>
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>>
>> The release is available for download at:
>>
>> https://flink.apache.org/downloads.html
>> 
>>
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>>
>> https://flink.apache.org/news/2018/02/15/release-1.4.1.html
>> 
>>
>>
>> The full release notes are available in Jira:
>>
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12342212
>> 
>>
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>>
>> Cheers,
>>
>> Gordon
>>
>>
>


Re: Regarding BucketingSink

2018-02-15 Thread Mu Kong
Hi Vishal,

I have the same concern about save pointing in BucketingSink.
As for your question, I think before the pending files get cleared in
handleRestoredBucketState .
They are finalized in notifyCheckpointComplete

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L628

I'm looking into this part of the source code now, since we are
experiencing some unclosed files after check pointing.
It would be great if you can share more if you find something new about
your problem, which might help with our problem.

Best regards,
Mu

On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi  wrote:

> -rw-r--r--   3 root hadoop 11 2018-02-14 18:48
> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>
> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>
> -rw-r--r--   3 root hadoop 11 2018-02-14 21:17
> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>
>
> This is strange, we had a few retries b'coz of an OOM on one of the TMs
> and we see this situation. 2 files ( on either sides )  that were dealt
> with fine but a dangling .pending file. I am sure this is not what is meant
> to be.   We I think have an edge condition and looking at the code it is
> not obvious. May be some one who wrote the code can shed some light as to
> how can this happen.
>
>
>
>
> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi  > wrote:
>
>> without --allowNonRestoredState, on a suspend/resume we do see the
>> length file along with the finalized file ( finalized during resume )
>>
>> -rw-r--r--   3 root hadoop 10 2018-02-09 13:57
>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>
>> that does makes much more sense.
>>
>> I guess we should document --allowNonRestoredState better ? It seems it
>> actually drops state ?
>>
>>
>>
>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> This is 1.4 BTW.  I am not sure that I am reading this correctly but the
>>> lifecycle of cancel/resume is 2 steps
>>>
>>>
>>>
>>> 1. Cancel job with SP
>>>
>>>
>>> closeCurrentPartFile
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-filesystem/src/main/java/org/apache/flink/
>>> streaming/connectors/fs/bucketing/BucketingSink.java#L549
>>>
>>> is called from close()
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-filesystem/src/main/java/org/apache/flink/
>>> streaming/connectors/fs/bucketing/BucketingSink.java#L416
>>>
>>>
>>> and that moves files to pending state.  That I would presume is called
>>> when one does a cancel.
>>>
>>>
>>>
>>> 2. The restore on resume
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-filesystem/src/main/java/org/apache/flink/
>>> streaming/connectors/fs/bucketing/BucketingSink.java#L369
>>>
>>> calls
>>>
>>> handleRestoredBucketState
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-filesystem/src/main/java/org/apache/flink/
>>> streaming/connectors/fs/bucketing/BucketingSink.java#L704
>>>
>>> clears the pending files from state without finalizing them?
>>>
>>>
>>>
>>> That does not seem to be right. I must be reading the code totally wrong
>>> ?
>>>
>>> I am not sure also whether --allowNonRestoredState is skipping getting
>>> the state . At least https://ci.apache.org/pr
>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints is not
>>> exactly clear what it does if we add an operator ( GDF I think will add a
>>> new operator in the DAG without state even if stateful, in my case the Map
>>> operator is not even stateful )
>>>
>>>
>>> Thanks and please bear with me if this is all something pretty simple.
>>>
>>> Vishal
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 What should be the behavior of BucketingSink vis a vis state ( pending
 , inprogess and finalization ) when we suspend and resume ?

 So I did this

 * I had a pipe writing to hdfs suspend and resume using

 --allowNonRestoredState as in I had added a harmless MapOperator (
 stateless ).


 * I see that a file on hdfs, the file being written to ( before the
 cancel with save point )  go into a pending state  _part-0-21.pending


 * I see a new file being written to in the resumed pipe
 _part-0-22.in-progress.


 What  I do not see is the file in  _part-0-21.pending being finalized
 ( as in renamed to a just part-0-21. I would have assumed that would be the
 case in this controlled suspend/resume circumstance. Further it is a rename
 and hdfs mv is not an expensive operation.



 Am I understand

Concurrent modification Exception when submitting multiple jobs

2018-02-15 Thread Vinay Patil
Hi,

I am submitting job to the cluster (using remote execution env) from
multiple threads. I am getting the following exception


java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:128)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:121)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1526)
at
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
at
com.test.executors.FlinkExecutor.submitJobToCluster(FlinkExecutor.java:67)


I am using Flink 1.3.2, and I am making sure that the job name is different
for each job. 
Can you please let me know if I am doing something wrong.

Regards,
Vinay Patil



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/