Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-12 Thread Fabian Hueske
I have another bugfix for 1.2.:

https://issues.apache.org/jira/browse/FLINK-2662 (pending PR)

2017-01-10 15:16 GMT+01:00 Robert Metzger :

> Hi,
>
> this depends a lot on the number of issues we find during the testing.
>
>
> These are the issues I found so far:
>
> https://issues.apache.org/jira/browse/FLINK-5379 (unresolved)
> https://issues.apache.org/jira/browse/FLINK-5383 (resolved)
> https://issues.apache.org/jira/browse/FLINK-5382 (resolved)
> https://issues.apache.org/jira/browse/FLINK-5381 (resolved)
> https://issues.apache.org/jira/browse/FLINK-5380 (pending PR)
>
>
>
>
> On Tue, Jan 10, 2017 at 11:58 AM, shijinkui  wrote:
>
> > Do we have a probable time of 1.2 release? This month or Next month?
> >
> > -邮件原件-
> > 发件人: Robert Metzger [mailto:rmetz...@apache.org]
> > 发送时间: 2017年1月3日 20:44
> > 收件人: d...@flink.apache.org
> > 抄送: user@flink.apache.org
> > 主题: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release
> candidate)
> >
> > Hi,
> >
> > First of all, I wish everybody a happy new year 2017.
> >
> > I've set user@flink in CC so that users who are interested in helping
> > with the testing get notified. Please respond only to the dev@ list to
> > keep the discussion there!
> >
> > According to the 1.2 release discussion thread, I've created a first
> > release candidate for Flink 1.2.
> > The release candidate will not be the final release, because I'm certain
> > that we'll find at least one blocking issue in the candidate :)
> >
> > Therefore, the RC is meant as a testing only release candidate.
> > Please report every issue we need to fix before the next RC in this
> thread
> > so that we have a good overview.
> >
> > The release artifacts are located here:
> > http://people.apache.org/~rmetzger/flink-1.2.0-rc0/
> >
> > The maven staging repository is located here:
> > https://repository.apache.org/content/repositories/orgapacheflink-
> >
> > The release commit (in branch "release-1.2.0-rc0"):
> > http://git-wip-us.apache.org/repos/asf/flink/commit/f3c59ced
> >
> >
> > Happy testing!
> >
>


RE: Making batches of small messages

2017-01-12 Thread Gwenhael Pasquiers
Thanks,

We are waiting for the 1.2 release eagerly ☺


From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: mercredi 11 janvier 2017 18:32
To: user@flink.apache.org
Subject: Re: Making batches of small messages

Hi,
I think this is a case for the ProcessFunction that was recently added and will 
be included in Flink 1.2.
ProcessFunction allows to register timers (so the 5 secs timeout can be 
addressed). You can maintain the fault tolerance guarantees if you collect the 
records in managed state. That way they will be included in checkpoints and 
restored in case of a failure.
If you are on Flink 1.1.x, you will need to implement a custom operator which 
is a much more low-level interface.
Best, Fabian

2017-01-11 17:16 GMT+01:00 Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>>:
Hi,

Sorry if this was already asked.

For performances reasons (streaming as well as batch) I’d like to “group” 
messages (let’s say by batches of 1000) before sending them to my sink (kafka, 
but mainly ES) so that I have a smaller overhead.

I’ve seen the “countWindow” operation but if I’m not wrong the parallelism of 
such an operation is 1. Moreover I’d need some “timeout” (send the current 
batch to next operator after 5s if it did not reach 1000 messages before that).

I could also create a flatMap “String to List” that cumulates messages 
until it reaches 1000 and then sends them to output, however that does not 
solve the timeout issue (not sure I could call out.collect() from a Timer 
thread), and even more importantly I’m afraid that that would screw up the 
exactly-once policy (flink could not know that I was stacking messages, I could 
very well be filtering them) in case of a crash.

My Sink could also create the chunks, with it’s own timer / counter, but I’m 
also afraid that it would bread the exactly-once thingie since in case of crash 
there is no way that flink would know if the message was really sent or stacked 
…

Is there a proper way to do what I want ?

Thanks in advance,

Gwenhaël PASQUIERS



Re: Making batches of small messages

2017-01-12 Thread Kostas Kloudas
Hi,

Fabian is right. 

The only thing I have to add is that if you have parallelism > 1 then each task 
will know its local “count” of messages it has buffered. In other words, with a 
parallelism of 
2 and a batching threshold of 1000 messages, each one of the parallel tasks 
will have to reach this
threshold before flushing to your sink. If task 0 has 501 messages and task 1 
600, they will 
still not flush.

This can be resolved with your timeout but it may be worth adjusting your 
threshold 
according to the parallelism of your job, to avoid memory issues that may arise 
(depending
on the state backend you are using).

Kostas

> On Jan 12, 2017, at 10:09 AM, Gwenhael Pasquiers 
>  wrote:
> 
> Thanks,
>  
> We are waiting for the 1.2 release eagerly J
>  
>  
> From: Fabian Hueske [mailto:fhue...@gmail.com] 
> Sent: mercredi 11 janvier 2017 18:32
> To: user@flink.apache.org
> Subject: Re: Making batches of small messages
>  
> Hi,
> 
> I think this is a case for the ProcessFunction that was recently added and 
> will be included in Flink 1.2.
> ProcessFunction allows to register timers (so the 5 secs timeout can be 
> addressed). You can maintain the fault tolerance guarantees if you collect 
> the records in managed state. That way they will be included in checkpoints 
> and restored in case of a failure.
> 
> If you are on Flink 1.1.x, you will need to implement a custom operator which 
> is a much more low-level interface.
> 
> Best, Fabian
>  
> 2017-01-11 17:16 GMT+01:00 Gwenhael Pasquiers 
> mailto:gwenhael.pasqui...@ericsson.com>>:
> Hi,
>  
> Sorry if this was already asked.
>  
> For performances reasons (streaming as well as batch) I’d like to “group” 
> messages (let’s say by batches of 1000) before sending them to my sink 
> (kafka, but mainly ES) so that I have a smaller overhead.
>  
> I’ve seen the “countWindow” operation but if I’m not wrong the parallelism of 
> such an operation is 1. Moreover I’d need some “timeout” (send the current 
> batch to next operator after 5s if it did not reach 1000 messages before 
> that).
>  
> I could also create a flatMap “String to List” that cumulates 
> messages until it reaches 1000 and then sends them to output, however that 
> does not solve the timeout issue (not sure I could call out.collect() from a 
> Timer thread), and even more importantly I’m afraid that that would screw up 
> the exactly-once policy (flink could not know that I was stacking messages, I 
> could very well be filtering them) in case of a crash.
>  
> My Sink could also create the chunks, with it’s own timer / counter, but I’m 
> also afraid that it would bread the exactly-once thingie since in case of 
> crash there is no way that flink would know if the message was really sent or 
> stacked …
>  
> Is there a proper way to do what I want ?
>  
> Thanks in advance,
>  
> Gwenhaël PASQUIERS



Re: About delta awareness caches

2017-01-12 Thread Xingcan
Hi, Aljoscha

Thanks for your explanation.

About the Storm windows simulation, we had tried your suggestion and gave
up due to its complexity and sort of "reinventing the wheel". Without
considering the performance, most of our business-logic code have already
been transformed to the "Flink style".

I am glad to hear that adding the accumulator is just in progress. As far
as I can see, the operations it supplies will adequately meet the demands.
I will stay focus on this topic.

Best,
Xingcan

On Wed, Jan 11, 2017 at 7:28 PM, Aljoscha Krettek 
wrote:

> Hi,
> (I'm just getting back from holidays, therefore the slow response. Sorry
> for that.)
>
> I think you can simulate the way Storm windows work by using a
> GlobalWindows assigner and having a custom Trigger and/or Evictor and also
> some special logic in your WindowFunction.
>
> About mergeable state, we're actually in the process of adding something
> like this that would be a generalisation of reduce and fold: you can call
> it combine or aggregate. The idea is to have these operations:
>
> - create accumulator
> - add value to accumulator
> - merge accumulators
> - extract output from accumulator
>
> You have three types: IN for incoming values, ACC for accumulators and OUT
> as the result of extracting output from an accumulator. This should cover
> most cases.
>
> What do you think?
>
> Cheers,
> Aljoscha
>
> On Thu, 22 Dec 2016 at 07:13 xingcan  wrote:
>
> Hi Aljoscha,
>
> First of all, sorry for that I missed your prompt reply : (
>
> In these days, I've been learning the implementation mechanism of window
> in Flink.
>
> I think the main difference between the window in Storm and Flink (from
> the API level) is that, Storm maintains only one window while Flink
> maintains several isolated windows. Due to that, Storm users can be aware
> of the transformation (tuple add and expire) of a window and take actions
> on each window modification (sliding window forwarding) while Flink users
> can only implement functions on one and another complete window as if they
> are independent of each other (actually they may get quite a few tuples in
> common).
>
> Objectively speaking, the window API provided by Flink is more formalize
> and easy to use. However, for sliding window with high-capacity and short
> interval (e.g. 5m and 1s), each tuple will be calculated redundantly (maybe
> 300 times in the example?). Though it provide the pane optimization, I
> think it's far from enough as the optimization can only be applied on
> reduce functions which restrict the input and output data type to be the
> same. Some other functions, e.g., the MaxAndMin function which take numbers
> as input and output a max&min pair and the Average function, which should
> avoid redundant calculations can not be satisfied.
>
> Actually, I just wondering if a "mergeable fold function" could be added
> (just *like* this https://en.wikipedia.org/wiki/Mergeable_heap). I know
> it may violate some principles of Flink (probably about states), but I
> insist that unnecessary calculations should be avoided in stream processing.
>
> So, could you give some advices, I am all ears : ), or if you think that
> is feasible, I'll think carefully and try to complete it.
>
> Thank you and merry Christmas.
>
> Best,
>
> - Xingcan
>
> On Thu, Dec 1, 2016 at 7:56 PM, Aljoscha Krettek 
> wrote:
>
> I'm not aware of how windows work in Storm. If you could maybe give some
> details on your use case we could figure out together how that would map to
> Flink windows.
>
> Cheers,
> Aljoscha
>
> On Tue, 29 Nov 2016 at 15:47 xingcan  wrote:
>
> Hi all,
>
> Recently I tried to transfer some old applications from Storm to Flink.
> In Storm, the window implementation (TupleWindow) gets two methods named
> getNew() and getExpired() which supply the delta information of a window
> and therefore we wrote some stateful caches that are aware of them.
> However, it seems that Flink deals with the window in a different way and
> supplies more "formalized" APIs.
> So, any tips on how to adapt these delta awareness caches in Flink or do
> some refactors to make them suitable?
>
> Thanks.
>
> Best,
> Xingcan
>
>
>


Re: Sliding Event Time Window Processing: Window Function inconsistent behavior

2017-01-12 Thread Aljoscha Krettek
Great! Thanks for letting us know.

On Wed, 11 Jan 2017 at 12:44 Sujit Sakre 
wrote:

> Hi Aljoscha,
>
> I have realized that the output stream is not defined separately in the
> code below, and hence the input values are getting in the sink. After
> defining a separate output stream it works.
>
> We have now confirmed that the windows are processed separately as per the
> groupings.
>
> Thanks.
>
>
> *Sujit Sakre*
>
>
> On 10 January 2017 at 22:10, Sujit Sakre 
> wrote:
>
> Hi Aljoscha,
>
> Thanks.
>
> I have used the following code for testing:
>
> main
>
> keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   //
> 6 min window with 2 min sliding window
> .apply(new CustomSlidingWindowFunction());
>
> keyedStream.addSink(new SinkFunction String>>() {
> /**
> *
> */
> private static final long serialVersionUID = 1L;
>
> public void invoke(Tuple5 value) {
> System.out.println(value.f1.toString().trim()+", " +
> value.f0 + ", "+value.f2 + ", " + value.f3);
> }
> });
>
>
> in WindowFunction apply
>
> ...
>
> // Condition for selecting a window
> if(d.after(x) && d.before(y)){
>
> for (Tuple5 Float, String> tr: input){
> // Write the window to Collector
> out.collect(new Tuple5<>(tr.f0, tr.f1, tr.f2, tr.f3, tr.f4));
> }
>
> I am getting all input records instead of those windows selected by
> the condition. Is there something I am doing wrong? Does this need to be
> done in a different way?
>
> Please let me know.
>
> Thanks.
>
>
>
> *Sujit Sakre*
>
>
> On 10 January 2017 at 20:24, Aljoscha Krettek  wrote:
>
> Hi,
> instead of writing to files, could you please simply output a value using
> the Collector and then write the result stream of the window operation to a
> sink (such as a file sink) to see how many windows are being processed.
> Having side effects (especially output) in user functions can lead to
> programs with quite unexpected behaviour and I would highly discourage
> doing that.
>
> Cheers,
> Aljoscha
>
> On Tue, 10 Jan 2017 at 13:44 Sujit Sakre 
> wrote:
>
> Hi,
>
> In the link (
> http://stackoverflow.com/questions/41143518/sliding-processing-time-window-computes-inconsistent-results),
> Fabian has mentioned that if Event Time is used, consistent results are
> possible.
>
> However, that's not the case with us. We are getting very random results.
>
> Please suggest.
>
>
> *Sujit Sakre*
>
>
> On 9 January 2017 at 22:27, Sujit Sakre 
> wrote:
>
> Hi,
>
> We are using Sliding Event Time Window with Kafka Consumer. The window
> size is 6 minutes, and slide is 2 minutes. We have written a window
> function to select a particular window out of multiple windows for a keyed
> stream, e.g. we select about 16 windows out of multiple windows for the
> keyed stream based on a particular condition.
>
> Upon a normal execution, we get 16 windows for processing inside the
> condition (in window function mentioned). These windows we are putting in
> different files, named after window start and end times.
>
> the code is as below:
>
> Calling code
>
>
> public class RealTimeProcessingSlidingWindow{
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // configure the Kafka consumer
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
> kafkaProps.setProperty("group.id", DEMO_GROUP);
> // always read the Kafka topic from the start
> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>
> FlinkKafkaConsumer09 String>> consumer = new FlinkKafkaConsumer09<>(
> "test",// kafka topic name
> new dataSchema(),
> kafkaProps);
> DataStream>
> stream1 = env.addSource(consumer);
> DataStream>
> keyedStream = stream1.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator2());
>
> keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes6), Time.minutes(2)))   //
> 6 min window with 2 min sliding window
> .apply(new CustomSlidingWindowFunction());
>
> env.execute("Sliding Event Time Window Processing");
>
>}
> }
>
>
> public static class CustomSlidingWindowFunction implements
> WindowFunction, Tuple5 String, Float, Float, String>, Tuple, TimeWindow>{
>
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable String, Float, Float, String>> input,
> Collector> out) throws
> Exception {
>
> HashMap> windowMap=
> new HashMap>();
> for (Tuple5 wr: input){
> windowMap.put(wr.f1.toString().trim(), wr);
> }
>
> ...
>
> SimpleDateFormat sf = new SimpleDateFormat(IndianDateTimeFormat);

Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-12 Thread Robert Metzger
I also found a bunch of issues

https://issues.apache.org/jira/browse/FLINK-5465
https://issues.apache.org/jira/browse/FLINK-5462
https://issues.apache.org/jira/browse/FLINK-5464
https://issues.apache.org/jira/browse/FLINK-5463


On Thu, Jan 12, 2017 at 9:56 AM, Fabian Hueske  wrote:

> I have another bugfix for 1.2.:
>
> https://issues.apache.org/jira/browse/FLINK-2662 (pending PR)
>
> 2017-01-10 15:16 GMT+01:00 Robert Metzger :
>
> > Hi,
> >
> > this depends a lot on the number of issues we find during the testing.
> >
> >
> > These are the issues I found so far:
> >
> > https://issues.apache.org/jira/browse/FLINK-5379 (unresolved)
> > https://issues.apache.org/jira/browse/FLINK-5383 (resolved)
> > https://issues.apache.org/jira/browse/FLINK-5382 (resolved)
> > https://issues.apache.org/jira/browse/FLINK-5381 (resolved)
> > https://issues.apache.org/jira/browse/FLINK-5380 (pending PR)
> >
> >
> >
> >
> > On Tue, Jan 10, 2017 at 11:58 AM, shijinkui 
> wrote:
> >
> > > Do we have a probable time of 1.2 release? This month or Next month?
> > >
> > > -邮件原件-
> > > 发件人: Robert Metzger [mailto:rmetz...@apache.org]
> > > 发送时间: 2017年1月3日 20:44
> > > 收件人: d...@flink.apache.org
> > > 抄送: user@flink.apache.org
> > > 主题: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release
> > candidate)
> > >
> > > Hi,
> > >
> > > First of all, I wish everybody a happy new year 2017.
> > >
> > > I've set user@flink in CC so that users who are interested in helping
> > > with the testing get notified. Please respond only to the dev@ list to
> > > keep the discussion there!
> > >
> > > According to the 1.2 release discussion thread, I've created a first
> > > release candidate for Flink 1.2.
> > > The release candidate will not be the final release, because I'm
> certain
> > > that we'll find at least one blocking issue in the candidate :)
> > >
> > > Therefore, the RC is meant as a testing only release candidate.
> > > Please report every issue we need to fix before the next RC in this
> > thread
> > > so that we have a good overview.
> > >
> > > The release artifacts are located here:
> > > http://people.apache.org/~rmetzger/flink-1.2.0-rc0/
> > >
> > > The maven staging repository is located here:
> > > https://repository.apache.org/content/repositories/orgapacheflink-
> > >
> > > The release commit (in branch "release-1.2.0-rc0"):
> > > http://git-wip-us.apache.org/repos/asf/flink/commit/f3c59ced
> > >
> > >
> > > Happy testing!
> > >
> >
>


Re: Increasing parallelism skews/increases overall job processing time linearly

2017-01-12 Thread Chakravarthy varaga
Hi Tim,

Thanks for your response.
The results are the same.
4 CPU (*8 cores in total)
kafka partitions = 4 per topic
parallesim for job = 3
task.slot / TM = 4

Basically this flink application consumes (kafka source) from 2 topics
and produces (kafka sink) onto 1 topic. on 1 consumer topic, the event load
is 100K/sec, while the other source has 1 event / an hour ...
I'm wondering if parallelism is enabled on multiple sources
irrespective of the partition size.

What I did is to enable 1 partition for the 2nd topic (1 event/hour)
and 4 partitions for 100K events topic. And deployed  a 3 parallelism job
and the results are the same...

Best Regards
CVP

On Wed, Jan 11, 2017 at 1:11 PM, Till Rohrmann  wrote:

> Hi CVP,
>
> changing the parallelism from 1 to 2 with every TM having only one slot
> will inevitably introduce another network shuffle operation between the
> sources and the keyed co flat map. This might be the source of your slow
> down, because before everything was running on one machine without any
> network communication (apart from reading from Kafka).
>
> Do you also observe a further degradation when increasing the parallelism
> from 2 to 4, for example (given that you've increased the number of topic
> partitions to at least the maximum parallelism in your topology)?
>
> Cheers,
> Till
>
> On Tue, Jan 10, 2017 at 11:37 AM, Chakravarthy varaga <
> chakravarth...@gmail.com> wrote:
>
>> Hi Guys,
>>
>> I understand that you are extremely busy but any pointers here is
>> highly appreciated. I can proceed forward towards concluding the activity !
>>
>> Best Regards
>> CVP
>>
>> On Mon, Jan 9, 2017 at 11:43 AM, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>> Anything that I could check or collect for you for investigation ?
>>>
>>> On Sat, Jan 7, 2017 at 1:35 PM, Chakravarthy varaga <
>>> chakravarth...@gmail.com> wrote:
>>>
 Hi Stephen

 . Kafka version is: 0.9.0.1 the connector is flinkconsumer09
 . The flatmap n coflatmap are connected by keyBy
 . No data is broadcasted and the data is not exploded based on the
 parallelism

 Cvp

 On 6 Jan 2017 20:16, "Stephan Ewen"  wrote:

> Hi!
>
> You are right, parallelism 2 should be faster than parallelism 1 ;-)
> As ChenQin pointed out, having only 2 Kafka Partitions may prevent further
> scaleout.
>
> Few things to check:
>   - How are you connecting the FlatMap and CoFlatMap? Default, keyBy,
> broadcast?
>   - Broadcast for example would multiply the data based on
> parallelism, can lead to slowdown when saturating the network.
>   - Are you using the standard Kafka Source (which Kafka version)?
>   - Is there any part in the program that multiplies data/effort with
> higher parallelism (does the FlatMap explode data based on parallelism)?
>
> Stephan
>
>
> On Fri, Jan 6, 2017 at 7:27 PM, Chen Qin  wrote:
>
>> Just noticed there are only two partitions per topic. Regardless of
>> how large parallelism set. Only two of those will get partition assigned 
>> at
>> most.
>>
>> Sent from my iPhone
>>
>> On Jan 6, 2017, at 02:40, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>> Hi All,
>>
>> Any updates on this?
>>
>> Best Regards
>> CVP
>>
>> On Thu, Jan 5, 2017 at 1:21 PM, Chakravarthy varaga <
>> chakravarth...@gmail.com> wrote:
>>
>>>
>>> Hi All,
>>>
>>> I have a job as attached.
>>>
>>> I have a 16 Core blade running RHEL 7. The taskmanager default
>>> number of slots is set to 1. The source is a kafka stream and each of 
>>> the 2
>>> sources(topic) have 2 partitions each.
>>>
>>>
>>> *What I notice is that when I deploy a job to run with
>>> #parallelism=2 the total processing time doubles the time it took when 
>>> the
>>> same job was deployed with #parallelism=1. It linearly increases with 
>>> the
>>> parallelism.*
>>> Since the numberof slots is set to 1 per TM, I would assume that the
>>> job would be processed in parallel in 2 different TMs and that each
>>> consumer in each TM is connected to 1 partition of the topic. This
>>> therefore should have kept the overall processing time the same or less 
>>> !!!
>>>
>>> The co-flatmap connects the 2 streams & uses ValueState
>>> (checkpointed in FS). I think this is distributed among the TMs. My
>>> understanding is that the search of values state could be costly between
>>> TMs.  Do you sense something wrong here?
>>>
>>> Best Regards
>>> CVP
>>>
>>>
>>>
>>>
>>>
>>
>
>>>
>>
>


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-12 Thread Ufuk Celebi
Hey Shannon!

Is this always reproducible and how long does it take to reproduce it?

I've not seen this error before but as you say it indicates that some
streams are not closed.

Did the jobs do any restarts before this happened? Flink 1.1.4
contains fixes for more robust releasing of resources in failure
scenarios. Is trying 1.1.4 an option?

– Ufuk

On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey  wrote:
> I'm having pretty frequent issues with the exception below. It basically
> always ends up killing my cluster after forcing a large number of job
> restarts. I just can't keep Flink up & running.
>
> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
> AWS support told me the name of the config option. However, that hasn't
> fixed the problem. Assuming that increasing the maxConnections again doesn't
> fix the problem, is there anything else I can do? Is anyone else having this
> problem? Is it possible that the state backend isn't properly calling
> close() on its filesystem objects? Or is there a large number of concurrent
> open filesystem objects for some reason? I am using the default
> checkpointing settings with one checkpoint at a time, checkpointing every 10
> minutes. If I am reading the metrics correctly, the checkpoint duration is
> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
> Any help is appreciated.
>
> java.lang.RuntimeException: Could not initialize state backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
> Unable to execute HTTP request: Timeout waiting for connection from pool
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
> at
> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
> at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy34.retrieveMetadata(U

Flink on YARN: Cannot connect to JobManager

2017-01-12 Thread Malte Schwarzer

Hi all,

I trying to run a Flink job on YARN via "$/bin/flink run -m yarn-cluster 
-yn 2 ..." with two nodes. But only one JobManager seems to be connected.


Flinks hangs at this stage (look up message repeats every second):

017-01-11 15:12:13,653 DEBUG org.apache.flink.yarn.YarnClusterClient 
  - Looking up JobManager
2017-01-11 15:12:13,678 INFO org.apache.flink.yarn.YarnClusterClient 
  - TaskManager status (1/2)

TaskManager status (1/2)
2017-01-11 15:12:13,929 DEBUG org.apache.flink.yarn.YarnClusterClient 
   - Looking up JobManager
2017-01-11 15:12:14,197 DEBUG org.apache.flink.yarn.YarnClusterClient 
   - Looking up JobManager
2017-01-11 15:12:14,451 DEBUG org.apache.hadoop.ipc.Client 
   - IPC Client (20529812) connection to /10.68.17

.206:8032 from user sending #104
2017-01-11 15:12:14,452 DEBUG org.apache.hadoop.ipc.Client 
   - IPC Client (20529812) connection to ___:8032 from 
user got value #104
2017-01-11 15:12:14,452 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine 
   - Call: getApplicationReport took 1ms
2017-01-11 15:12:14,462 DEBUG org.apache.flink.yarn.YarnClusterClient 
   - Looking up JobManager
2017-01-11 15:12:14,745 DEBUG org.apache.flink.yarn.YarnClusterClient 
   - Looking up JobManager
2017-01-11 15:12:15,014 DEBUG org.apache.flink.yarn.YarnClusterClient 
   - Looking up JobManager
2017-01-11 15:12:15,276 DEBUG org.apache.flink.yarn.YarnClusterClient 
   - Looking up JobManager
2017-01-11 15:12:15,322 DEBUG org.apache.hadoop.ipc.Client 
   - IPC Client (20529812) connection to ___:8020 from 
user: closed

...

Any suggestions what can cause this?

Standard MapReduce jobs work without any problem on YARN.

Best regards,
Malte


Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-12 Thread Till Rohrmann
I also found an issue:

https://issues.apache.org/jira/browse/FLINK-5470

I also noticed that Flink's webserver does not support https requests. It
might be worthwhile to add it, though.

https://issues.apache.org/jira/browse/FLINK-5472

On Thu, Jan 12, 2017 at 11:24 AM, Robert Metzger 
wrote:

> I also found a bunch of issues
>
> https://issues.apache.org/jira/browse/FLINK-5465
> https://issues.apache.org/jira/browse/FLINK-5462
> https://issues.apache.org/jira/browse/FLINK-5464
> https://issues.apache.org/jira/browse/FLINK-5463
>
>
> On Thu, Jan 12, 2017 at 9:56 AM, Fabian Hueske  wrote:
>
> > I have another bugfix for 1.2.:
> >
> > https://issues.apache.org/jira/browse/FLINK-2662 (pending PR)
> >
> > 2017-01-10 15:16 GMT+01:00 Robert Metzger :
> >
> > > Hi,
> > >
> > > this depends a lot on the number of issues we find during the testing.
> > >
> > >
> > > These are the issues I found so far:
> > >
> > > https://issues.apache.org/jira/browse/FLINK-5379 (unresolved)
> > > https://issues.apache.org/jira/browse/FLINK-5383 (resolved)
> > > https://issues.apache.org/jira/browse/FLINK-5382 (resolved)
> > > https://issues.apache.org/jira/browse/FLINK-5381 (resolved)
> > > https://issues.apache.org/jira/browse/FLINK-5380 (pending PR)
> > >
> > >
> > >
> > >
> > > On Tue, Jan 10, 2017 at 11:58 AM, shijinkui 
> > wrote:
> > >
> > > > Do we have a probable time of 1.2 release? This month or Next month?
> > > >
> > > > -邮件原件-
> > > > 发件人: Robert Metzger [mailto:rmetz...@apache.org]
> > > > 发送时间: 2017年1月3日 20:44
> > > > 收件人: d...@flink.apache.org
> > > > 抄送: user@flink.apache.org
> > > > 主题: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release
> > > candidate)
> > > >
> > > > Hi,
> > > >
> > > > First of all, I wish everybody a happy new year 2017.
> > > >
> > > > I've set user@flink in CC so that users who are interested in
> helping
> > > > with the testing get notified. Please respond only to the dev@ list
> to
> > > > keep the discussion there!
> > > >
> > > > According to the 1.2 release discussion thread, I've created a first
> > > > release candidate for Flink 1.2.
> > > > The release candidate will not be the final release, because I'm
> > certain
> > > > that we'll find at least one blocking issue in the candidate :)
> > > >
> > > > Therefore, the RC is meant as a testing only release candidate.
> > > > Please report every issue we need to fix before the next RC in this
> > > thread
> > > > so that we have a good overview.
> > > >
> > > > The release artifacts are located here:
> > > > http://people.apache.org/~rmetzger/flink-1.2.0-rc0/
> > > >
> > > > The maven staging repository is located here:
> > > > https://repository.apache.org/content/repositories/
> orgapacheflink-
> > > >
> > > > The release commit (in branch "release-1.2.0-rc0"):
> > > > http://git-wip-us.apache.org/repos/asf/flink/commit/f3c59ced
> > > >
> > > >
> > > > Happy testing!
> > > >
> > >
> >
>


Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-12 Thread Chesnay Schepler

FLINK-5470 is a duplicate of FLINK-5298 for which there is also an open PR.

FLINK-5472 is imo invalid since the webserver does support https, you 
just have to enable it as per the security documentation.


On 12.01.2017 16:20, Till Rohrmann wrote:

I also found an issue:

https://issues.apache.org/jira/browse/FLINK-5470

I also noticed that Flink's webserver does not support https requests. 
It might be worthwhile to add it, though.


https://issues.apache.org/jira/browse/FLINK-5472

On Thu, Jan 12, 2017 at 11:24 AM, Robert Metzger > wrote:


I also found a bunch of issues

https://issues.apache.org/jira/browse/FLINK-5465

https://issues.apache.org/jira/browse/FLINK-5462

https://issues.apache.org/jira/browse/FLINK-5464

https://issues.apache.org/jira/browse/FLINK-5463



On Thu, Jan 12, 2017 at 9:56 AM, Fabian Hueske mailto:fhue...@gmail.com>> wrote:

> I have another bugfix for 1.2.:
>
> https://issues.apache.org/jira/browse/FLINK-2662
 (pending PR)
>
> 2017-01-10 15:16 GMT+01:00 Robert Metzger mailto:rmetz...@apache.org>>:
>
> > Hi,
> >
> > this depends a lot on the number of issues we find during the
testing.
> >
> >
> > These are the issues I found so far:
> >
> > https://issues.apache.org/jira/browse/FLINK-5379
 (unresolved)
> > https://issues.apache.org/jira/browse/FLINK-5383
 (resolved)
> > https://issues.apache.org/jira/browse/FLINK-5382
 (resolved)
> > https://issues.apache.org/jira/browse/FLINK-5381
 (resolved)
> > https://issues.apache.org/jira/browse/FLINK-5380
 (pending PR)
> >
> >
> >
> >
> > On Tue, Jan 10, 2017 at 11:58 AM, shijinkui
mailto:shijin...@huawei.com>>
> wrote:
> >
> > > Do we have a probable time of 1.2 release? This month or
Next month?
> > >
> > > -邮件原件-
> > > 发件人: Robert Metzger [mailto:rmetz...@apache.org
]
> > > 发送时间: 2017年1月3日 20:44
> > > 收件人: d...@flink.apache.org 
> > > 抄送: user@flink.apache.org 
> > > 主题: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing
release
> > candidate)
> > >
> > > Hi,
> > >
> > > First of all, I wish everybody a happy new year 2017.
> > >
> > > I've set user@flink in CC so that users who are interested
in helping
> > > with the testing get notified. Please respond only to the
dev@ list to
> > > keep the discussion there!
> > >
> > > According to the 1.2 release discussion thread, I've created
a first
> > > release candidate for Flink 1.2.
> > > The release candidate will not be the final release, because I'm
> certain
> > > that we'll find at least one blocking issue in the candidate :)
> > >
> > > Therefore, the RC is meant as a testing only release candidate.
> > > Please report every issue we need to fix before the next RC
in this
> > thread
> > > so that we have a good overview.
> > >
> > > The release artifacts are located here:
> > > http://people.apache.org/~rmetzger/flink-1.2.0-rc0/

> > >
> > > The maven staging repository is located here:
> > >
https://repository.apache.org/content/repositories/orgapacheflink-

> > >
> > > The release commit (in branch "release-1.2.0-rc0"):
> > > http://git-wip-us.apache.org/repos/asf/flink/commit/f3c59ced

> > >
> > >
> > > Happy testing!
> > >
> >
>






Kafka topic partition skewness causes watermark not being emitted

2017-01-12 Thread tao xiao
Hi team,

I have a topic with 2 partitions in Kafka. I produced all data to partition
0 and no data to partition 1. I created a Flink job with parallelism to 1
that consumes that topic and count the events with session event window (5
seconds gap). It turned out that the session event window was never closed
even I sent a message with 10 minutes gap. After digging into the source
code, AbstractFetcher[1] that is responsible for sending watermark to
downstream calculates the min watermark of all partitions. Due to the fact
that we don't have data in partition 1, the watermark returned from
partition 1is always Long.MIN_VALUE therefore AbstractFetcher never fires
the watermark to downstream.

I want to know if this is expected behavior or a bug. If this is expected
behavior how do I avoid the delay of watermark firing when data is not
evenly distributed to all partitions?

This is the timestamp extractor I used

public class ExactTimestampExtractor implements
AssignerWithPeriodicWatermarks {

private long currentMaxTimestamp = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE
: currentMaxTimestamp - 1);
}

@Override
public long extractTimestamp(SessionEvent element, long
previousElementTimestamp) {
long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
if (eventStartTime > currentMaxTimestamp) {
currentMaxTimestamp = eventStartTime;
}

return eventStartTime;
}
}

and this is the Flink topo

// get input data
FlinkKafkaConsumer010 consumer = new
FlinkKafkaConsumer010<>("topic4",
new MyOwnSchema()
consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
DataStream input = env.addSource(consumer);

input.
keyBy("id").
window(EventTimeSessionWindows.withGap(Time.seconds(5))).
reduce(new Reducer(), new WindowFunction()).
print();

//// execute program
env.execute("a job");

I used the latest code in github

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539


Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-12 Thread Till Rohrmann
I'm wondering whether we should not depend the webserver encryption on the
global encryption activation and activating it instead per default.

On Thu, Jan 12, 2017 at 4:54 PM, Chesnay Schepler 
wrote:

> FLINK-5470 is a duplicate of FLINK-5298 for which there is also an open PR.
>
> FLINK-5472 is imo invalid since the webserver does support https, you just
> have to enable it as per the security documentation.
>
>
> On 12.01.2017 16:20, Till Rohrmann wrote:
>
> I also found an issue:
>
> https://issues.apache.org/jira/browse/FLINK-5470
>
> I also noticed that Flink's webserver does not support https requests. It
> might be worthwhile to add it, though.
>
> https://issues.apache.org/jira/browse/FLINK-5472
>
> On Thu, Jan 12, 2017 at 11:24 AM, Robert Metzger 
> wrote:
>
>> I also found a bunch of issues
>>
>> https://issues.apache.org/jira/browse/FLINK-5465
>> https://issues.apache.org/jira/browse/FLINK-5462
>> https://issues.apache.org/jira/browse/FLINK-5464
>> https://issues.apache.org/jira/browse/FLINK-5463
>>
>>
>> On Thu, Jan 12, 2017 at 9:56 AM, Fabian Hueske < 
>> fhue...@gmail.com> wrote:
>>
>> > I have another bugfix for 1.2.:
>> >
>> > https://issues.apache.org/jira/browse/FLINK-2662 (pending PR)
>> >
>> > 2017-01-10 15:16 GMT+01:00 Robert Metzger < 
>> rmetz...@apache.org>:
>> >
>> > > Hi,
>> > >
>> > > this depends a lot on the number of issues we find during the testing.
>> > >
>> > >
>> > > These are the issues I found so far:
>> > >
>> > > https://issues.apache.org/jira/browse/FLINK-5379 (unresolved)
>> > > https://issues.apache.org/jira/browse/FLINK-5383 (resolved)
>> > > https://issues.apache.org/jira/browse/FLINK-5382 (resolved)
>> > > https://issues.apache.org/jira/browse/FLINK-5381 (resolved)
>> > > https://issues.apache.org/jira/browse/FLINK-5380 (pending PR)
>> > >
>> > >
>> > >
>> > >
>> > > On Tue, Jan 10, 2017 at 11:58 AM, shijinkui 
>> > wrote:
>> > >
>> > > > Do we have a probable time of 1.2 release? This month or Next month?
>> > > >
>> > > > -邮件原件-
>> > > > 发件人: Robert Metzger [mailto: 
>> rmetz...@apache.org]
>> > > > 发送时间: 2017年1月3日 20:44
>> > > > 收件人: d...@flink.apache.org
>> > > > 抄送: user@flink.apache.org
>> > > > 主题: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release
>> > > candidate)
>> > > >
>> > > > Hi,
>> > > >
>> > > > First of all, I wish everybody a happy new year 2017.
>> > > >
>> > > > I've set user@flink in CC so that users who are interested in
>> helping
>> > > > with the testing get notified. Please respond only to the dev@
>> list to
>> > > > keep the discussion there!
>> > > >
>> > > > According to the 1.2 release discussion thread, I've created a first
>> > > > release candidate for Flink 1.2.
>> > > > The release candidate will not be the final release, because I'm
>> > certain
>> > > > that we'll find at least one blocking issue in the candidate :)
>> > > >
>> > > > Therefore, the RC is meant as a testing only release candidate.
>> > > > Please report every issue we need to fix before the next RC in this
>> > > thread
>> > > > so that we have a good overview.
>> > > >
>> > > > The release artifacts are located here:
>> > > > http://people.apache.org/~rmetzger/flink-1.2.0-rc0/
>> > > >
>> > > > The maven staging repository is located here:
>> > > > https://repository.apache.org/content/repositories/orgapache
>> flink-
>> > > >
>> > > > The release commit (in branch "release-1.2.0-rc0"):
>> > > > http://git-wip-us.apache.org/repos/asf/flink/commit/f3c59ced
>> > > >
>> > > >
>> > > > Happy testing!
>> > > >
>> > >
>> >
>>
>
>
>


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-12 Thread Chen Qin
We have seen this issue back to Flink 1.0. Our finding back then was traffic 
congestion to AWS in internal network. Many teams too dependent on S3 and 
bandwidth is shared, cause traffic congestion from time to time.

Hope it helps!

Thanks
Chen

> On Jan 12, 2017, at 03:30, Ufuk Celebi  wrote:
> 
> Hey Shannon!
> 
> Is this always reproducible and how long does it take to reproduce it?
> 
> I've not seen this error before but as you say it indicates that some
> streams are not closed.
> 
> Did the jobs do any restarts before this happened? Flink 1.1.4
> contains fixes for more robust releasing of resources in failure
> scenarios. Is trying 1.1.4 an option?
> 
> – Ufuk
> 
>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey  wrote:
>> I'm having pretty frequent issues with the exception below. It basically
>> always ends up killing my cluster after forcing a large number of job
>> restarts. I just can't keep Flink up & running.
>> 
>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>> AWS support told me the name of the config option. However, that hasn't
>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>> fix the problem, is there anything else I can do? Is anyone else having this
>> problem? Is it possible that the state backend isn't properly calling
>> close() on its filesystem objects? Or is there a large number of concurrent
>> open filesystem objects for some reason? I am using the default
>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>> Any help is appreciated.
>> 
>> java.lang.RuntimeException: Could not initialize state backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by:
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>> Unable to execute HTTP request: Timeout waiting for connection from pool
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
>> at sun.reflect.GeneratedMethodAccessor22.invoke(Unkn

Re: How to get help on ClassCastException when re-submitting a job

2017-01-12 Thread Yury Ruchin
Hi,

I'd like to chime in since I've faced the same issue running Flink 1.1.4. I
have a long-running YARN session which I use to run multiple streaming jobs
concurrently. Once after cancelling and resubmitting the job I saw the "X
cannot be cast to X" ClassCastException exception in logs. I restarted YARN
session, then the problem disappeared.

The class that failed to be cast was autogenerated by Avro compiler. I know
that Avro's Java binding does caching schemas in some static WeakHashMap.
I'm wondering whether that may step in the way of Flink classloading design.

Anyway, I would be interested in watching the issue in Flink JIRA.

Giuliano, could you provide the issue number?

Thanks,
Yury

2017-01-11 14:11 GMT+03:00 Fabian Hueske :

> Hi Guiliano,
>
> thanks for bringing up this issue.
> A "ClassCastException: X cannot be cast to X" often points to a
> classloader issue.
> So it might actually be a bug in Flink.
>
> I assume you submit the same application (same jar file) with the same
> command right?
> Did you cancel the job before resubmitting?
>
> Can you create a JIRA issue [1] for this bug (hit the read CREATE button
> on top) and include the commit hash from which you built Flink?
> It would be great if you could provide a short example program and
> instructions how to reproduce the problem.
>
> Thank you very much,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK
>
>
>
> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari :
>
>> Hello,
>>
>>
>>
>> I need some guidance on how to report a bug.
>>
>>
>>
>> I’m testing version 1.2 on my local cluster and the first time I submit
>> the job everything works but whenever I re-submit the same job it fails
>> with
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:427)
>>
>> at org.apache.flink.client.program.StandaloneClusterClient.subm
>> itJob(StandaloneClusterClient.java:101)
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:400)
>>
>> at org.apache.flink.streaming.api.environment.StreamContextEnvi
>> ronment.execute(StreamContextEnvironment.java:66)
>>
>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironm
>> ent.execute(StreamExecutionEnvironment.scala:634)
>>
>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoi
>> nt$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$
>> body.apply(TraitorApp.scala:21)
>>
>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>>
>> at scala.runtime.AbstractFunction0.apply$mcV$sp(
>> AbstractFunction0.scala:12)
>>
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>
>> at scala.collection.immutable.List.foreach(List.scala:381)
>>
>> at scala.collection.generic.TraversableForwarder$class.foreach(
>> TraversableForwarder.scala:35)
>>
>> at scala.App$class.main(App.scala:76)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorA
>> pp.scala:21)
>>
>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>>
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>> od(PackagedProgram.java:528)
>>
>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>> ctiveModeForExecution(PackagedProgram.java:419)
>>
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:339)
>>
>> at org.apache.flink.client.CliFrontend.executeProgram(CliFronte
>> nd.java:831)
>>
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>>
>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>> end.java:1073)
>>
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>>
>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>>
>> at org.apache.flink.runtime.security.NoOpSecurityContext.runSec
>> ured(NoOpSecurityContext.java:29)
>>
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$6.a

Getting key from keyed stream

2017-01-12 Thread Paul Joireman
Hi all,


Is there a simple way to read the key from a KeyedStream.   Very simply I'm 
trying to read a message from Kafka, separate the incoming messages by a field 
in the message and write the original message back to Kafka using that field as 
a new topic.  I chose to partition the incoming stream by creating a 
KeyedStream and using the field from the message as the key.The only thing 
left is to write the message to Kafka with a producer but i need to know the 
topic to write to and for that I need to be able to read the key.   Is there a 
way to do this?


Is there a better way to do this, rather than using a KeyedStream.


Paul


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-12 Thread Shannon Carey
I can't predict when it will occur, but usually it's after Flink has been 
running for at least a week.

Yes, I do believe we had several job restarts due to an exception due to a 
Cassandra node being down for maintenance and therefore a query failing to meet 
the QUORUM consistency level requested. I'm fixing the retry consistency logic 
there, but I'm sure we'll run into failing jobs again eventually.

I'm upgrading to 1.1.4 now, hopefully it will help.


-Shannon

On 1/12/17, 5:30 AM, "Ufuk Celebi"  wrote:

>Hey Shannon!
>
>Is this always reproducible and how long does it take to reproduce it?
>
>I've not seen this error before but as you say it indicates that some
>streams are not closed.
>
>Did the jobs do any restarts before this happened? Flink 1.1.4
>contains fixes for more robust releasing of resources in failure
>scenarios. Is trying 1.1.4 an option?
>
>– Ufuk
>
>On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey  wrote:
>> I'm having pretty frequent issues with the exception below. It basically
>> always ends up killing my cluster after forcing a large number of job
>> restarts. I just can't keep Flink up & running.
>>
>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>> AWS support told me the name of the config option. However, that hasn't
>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>> fix the problem, is there anything else I can do? Is anyone else having this
>> problem? Is it possible that the state backend isn't properly calling
>> close() on its filesystem objects? Or is there a large number of concurrent
>> open filesystem objects for some reason? I am using the default
>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>> Any help is appreciated.
>>
>> java.lang.RuntimeException: Could not initialize state backend.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:105)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by:
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>> Unable to execute HTTP request: Timeout waiting for connection from pool
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>> at
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
>> at
>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractA

Re: Getting key from keyed stream

2017-01-12 Thread Jamie Grier
A simpler and more efficient approach would simply be the following:

val stream = env.addSource(new FlinkKafkaConsumer(...))

stream
  .addSink(new FlinkKafkaProducer(new MyKeyedSerializationSchema(...)))

env.execute()

In MyKeyedSerializationSchema just override the getTargetTopic() method.

That should do it :)

-Jamie

On Thu, Jan 12, 2017 at 12:53 PM, Paul Joireman 
wrote:

Hi all,
>
>
> Is there a simple way to read the key from a KeyedStream.   Very simply
> I'm trying to read a message from Kafka, separate the incoming messages by
> a field in the message and write the original message back to Kafka using
> that field as a new topic.  I chose to partition the incoming stream by
> creating a KeyedStream and using the field from the message as the key.
>  The only thing left is to write the message to Kafka with a producer but i
> need to know the topic to write to and for that I need to be able to read
> the key.   Is there a way to do this?
>
>
> Is there a better way to do this, rather than using a KeyedStream.
>
>
> Paul
>
​
-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Getting key from keyed stream

2017-01-12 Thread Paul Joireman
Thanks Jamie,


Just figured that out after some digging and a little trial and error, that 
works great.


Paul


From: Jamie Grier 
Sent: Thursday, January 12, 2017 4:59:43 PM
To: user@flink.apache.org
Subject: Re: Getting key from keyed stream


A simpler and more efficient approach would simply be the following:

val stream = env.addSource(new FlinkKafkaConsumer(...))

stream
  .addSink(new FlinkKafkaProducer(new MyKeyedSerializationSchema(...)))

env.execute()


In MyKeyedSerializationSchema just override the getTargetTopic() method.

That should do it :)

-Jamie

On Thu, Jan 12, 2017 at 12:53 PM, Paul Joireman 
mailto:paul.joire...@physiq.com>> wrote:

Hi all,


Is there a simple way to read the key from a KeyedStream.   Very simply I'm 
trying to read a message from Kafka, separate the incoming messages by a field 
in the message and write the original message back to Kafka using that field as 
a new topic.  I chose to partition the incoming stream by creating a 
KeyedStream and using the field from the message as the key.The only thing 
left is to write the message to Kafka with a producer but i need to know the 
topic to write to and for that I need to be able to read the key.   Is there a 
way to do this?


Is there a better way to do this, rather than using a KeyedStream.


Paul

?
--

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier
ja...@data-artisans.com



1.1.4 on YARN - vcores change?

2017-01-12 Thread Shannon Carey
Did anything change in 1.1.4 with regard to YARN & vcores?

I'm getting this error when deploying 1.1.4 to my test cluster. Only the Flink 
version changed.

java.lang.RuntimeException: Couldn't deploy Yarn cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:384)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:591)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:465)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: 
The number of virtual cores per node were configured with 8 but Yarn only has 4 
virtual cores available. Please note that the number of virtual cores is set to 
the number of task slots by default unless configured in the Flink config with 
'yarn.containers.vcores.'
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:273)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:393)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:381)
... 2 more

When I run: ./bin/yarn-session.sh –q
It shows 8 vCores on each machine:


NodeManagers in the ClusterClient 3|Property |Value

+---+

|NodeID   |ip-10-2-…:8041

|Memory   |12288 MB

|vCores   |8

|HealthReport |

|Containers   |0

+---+

|NodeID   |ip-10-2-…:8041

|Memory   |12288 MB

|vCores   |8

|HealthReport |

|Containers   |0

+---+

|NodeID   |ip-10-2-…:8041

|Memory   |12288 MB

|vCores   |8

|HealthReport |

|Containers   |0

+---+

Summary: totalMemory 36864 totalCores 24

Queue: default, Current Capacity: 0.0 Max Capacity: 1.0 Applications: 0

I'm running:
./bin/yarn-session.sh –n 3 --jobManagerMemory 1504 --taskManagerMemory 10764 
--slots 8 —detached

I have not specified any value for "yarn.containers.vcores" in my config.

I switched to –n 5 and —slots 4, and halved the taskManagerMemory, which 
allowed the cluster to start.

However, in the YARN "Nodes" UI I see "VCores Used: 2" and "VCores Avail: 6" on 
all three nodes. And if I look at one of the Containers, it says, "Resource: 
5408 Memory, 1 VCores". I don't understand what's happening here.

Thanks…


Re: Flink snapshotting to S3 - Timeout waiting for connection from pool

2017-01-12 Thread Shannon Carey
Good to know someone else has had the same problem... What did you do about it? 
Did it resolve on its own?

-Shannon




On 1/12/17, 11:55 AM, "Chen Qin"  wrote:

>We have seen this issue back to Flink 1.0. Our finding back then was traffic 
>congestion to AWS in internal network. Many teams too dependent on S3 and 
>bandwidth is shared, cause traffic congestion from time to time.
>
>Hope it helps!
>
>Thanks
>Chen
>
>> On Jan 12, 2017, at 03:30, Ufuk Celebi  wrote:
>> 
>> Hey Shannon!
>> 
>> Is this always reproducible and how long does it take to reproduce it?
>> 
>> I've not seen this error before but as you say it indicates that some
>> streams are not closed.
>> 
>> Did the jobs do any restarts before this happened? Flink 1.1.4
>> contains fixes for more robust releasing of resources in failure
>> scenarios. Is trying 1.1.4 an option?
>> 
>> – Ufuk
>> 
>>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey  wrote:
>>> I'm having pretty frequent issues with the exception below. It basically
>>> always ends up killing my cluster after forcing a large number of job
>>> restarts. I just can't keep Flink up & running.
>>> 
>>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
>>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after
>>> AWS support told me the name of the config option. However, that hasn't
>>> fixed the problem. Assuming that increasing the maxConnections again doesn't
>>> fix the problem, is there anything else I can do? Is anyone else having this
>>> problem? Is it possible that the state backend isn't properly calling
>>> close() on its filesystem objects? Or is there a large number of concurrent
>>> open filesystem objects for some reason? I am using the default
>>> checkpointing settings with one checkpoint at a time, checkpointing every 10
>>> minutes. If I am reading the metrics correctly, the checkpoint duration is
>>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3.
>>> Any help is appreciated.
>>> 
>>> java.lang.RuntimeException: Could not initialize state backend.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:105)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by:
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException:
>>> Unable to execute HTTP request: Timeout waiting for connection from pool
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
>>> at
>>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
>>> at
>>> com.amazon.ws.emr.hadoop

Objects accessible from all Flink nodes

2017-01-12 Thread Matt
Hello,

I have a stream of objects which I use to update the model of a
classification algorithm and another stream with the objects I need to
classify in real time.

The problem is that the instances for training and evaluation are processed
on potentially different Flink nodes, but the classifier should be applied
to all instances no matter in what node it was generated (ie, the
classifier should be accessible from any Flink node).

Just to make it clearer, here is what would NOT work since these sink
functions are not serializable:
https://gist.github.com/b979bf742b0d2f3da8cc8e5e91207151

Two questions here:

*1. How can an instance be accessed by any Flink node like this (line 11
and 19)? Maybe there's a better approach to this problem.*

*2. In the example the second stream (line 15) is started right away but at
startup the classifier is not ready to use until it has been trained with
enough instances. Is it possible to do this? If I'm not wrong env.execute
(line 24) can be used only once.*

Regards,
Matt