Re: Session Based Windows

2015-11-18 Thread Vladimir Stoyak
We, were also trying to address session windowing but took slightly different 
approach as to what window we place the event into. 

We did not want "triggering event" to be purged as part of the window it 
triggered, but instead to create a new window for it and have the old window to 
fire and purge on event time timeout.

Take a look and see if it will be useful - 
https://bitbucket.org/snippets/vstoyak/o9Rqp

Vladimir



On Tuesday, November 17, 2015 11:25 PM, Konstantin Knauf 
 wrote:
Hi Aljoscha,

sorry to bother you again (this time with this old thread), just a short
question about the caveat you mention in your answer. You wrote that
events of different sessions can not intermingled. Isn't the idea of the
keyBy expression below exactly not to have intermingled sessions by
first grouping by sesion-ids?

Cheers and thank you,

Konstantin

On 17.10.2015 14:39, Aljoscha Krettek wrote:
> Hi Paul,
> it’s good to see people interested in this. I sketched a Trigger that should 
> fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac
> 
> You can use it like this:
> 
> DataStream<> input = …
> DataStream<> result = input
>   .keyBy(“session-id”)
>   .window(GlobalWindows.create())
>   .trigger(new SessionTrigger(timeout, maxElements))
>   .apply(new MyWindowFunction())
> 
> The Trigger uses the new state API that I’m currently introducing in a new 
> Pull Request. It should be merged very soon, before the 0.10 release.
> 
> This implementation has one caveat, though. It cannot deal with elements that 
> belong to different sessions that arrive intermingled with other sessions. 
> The reason is that Flink does not yet support merging the windows that the 
> WindowAssigner assigns as, for example, the Cloud Dataflow API supports. This 
> means that elements cannot be assigned to session windows, instead the 
> workaround with the GlobalWindow has to be used. I want to tackle this for 
> the release after 0.10, however.
> 
> Please let us know if you need more information. I’m always happy to help in 
> these interesting cases at the bleeding edge of what is possible. :-)
> 
> Cheers,
> Aljoscha
> 
>> On 16 Oct 2015, at 19:36, Hamilton, Paul  wrote:
>>
>> Hi,
>>
>> I am attempting to make use of the new window APIs in streaming to
>> implement a session based window and am not sure if the currently provided
>> functionality handles my use case.  Specifically what I want to do is
>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>> in Google DataFlow.
>>
>> Assuming the events are keyed by session id.  I would like to use the
>> event time and the watermarking functionality to trigger a window after
>> the ³end of a session² (no events for a given session received within x
>> amount of time).  With watermarking this would mean trigger when a
>> watermark is seen that is > (the time of the last event + session
>> timeout). Also I want to perform an early triggering of the window after a
>> given number of events have been received.
>>
>> Is it currently possible to do this with the current combination of window
>> assigners and triggers?  I am happy to write custom triggers etc, but
>> wanted to make sure it wasn¹t already available before going down that
>> road.
>>
>> Thanks,
>>
>> Paul Hamilton
>> Hybris Software
>>
>>
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Session Based Windows

2015-11-18 Thread Aljoscha Krettek
Hi Konstatin,
you are right, if the stream is keyed by the session-id then it works.

I was referring to the case where you have, for example, some interactions with 
timestamps and you want to derive the sessions from this. In that case, it can 
happen that events that should belong to one session (depending on their 
timestamp) arrive intermixed with elements that should belong to another 
session because of delays (and because elements never really arrive in the 
order of their timestamps). Does this make clear what I meant? It’s a bit 
tricky, so I can maybe draw a picture if it helps.

Cheers,
Aljoscha
> On 18 Nov 2015, at 09:09, Vladimir Stoyak  wrote:
> 
> We, were also trying to address session windowing but took slightly different 
> approach as to what window we place the event into. 
> 
> We did not want "triggering event" to be purged as part of the window it 
> triggered, but instead to create a new window for it and have the old window 
> to fire and purge on event time timeout.
> 
> Take a look and see if it will be useful - 
> https://bitbucket.org/snippets/vstoyak/o9Rqp
> 
> Vladimir
> 
> 
> 
> On Tuesday, November 17, 2015 11:25 PM, Konstantin Knauf 
>  wrote:
> Hi Aljoscha,
> 
> sorry to bother you again (this time with this old thread), just a short
> question about the caveat you mention in your answer. You wrote that
> events of different sessions can not intermingled. Isn't the idea of the
> keyBy expression below exactly not to have intermingled sessions by
> first grouping by sesion-ids?
> 
> Cheers and thank you,
> 
> Konstantin
> 
> On 17.10.2015 14:39, Aljoscha Krettek wrote:
>> Hi Paul,
>> it’s good to see people interested in this. I sketched a Trigger that should 
>> fit your requirements: https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac
>> 
>> You can use it like this:
>> 
>> DataStream<> input = …
>> DataStream<> result = input
>>  .keyBy(“session-id”)
>>  .window(GlobalWindows.create())
>>  .trigger(new SessionTrigger(timeout, maxElements))
>>  .apply(new MyWindowFunction())
>> 
>> The Trigger uses the new state API that I’m currently introducing in a new 
>> Pull Request. It should be merged very soon, before the 0.10 release.
>> 
>> This implementation has one caveat, though. It cannot deal with elements 
>> that belong to different sessions that arrive intermingled with other 
>> sessions. The reason is that Flink does not yet support merging the windows 
>> that the WindowAssigner assigns as, for example, the Cloud Dataflow API 
>> supports. This means that elements cannot be assigned to session windows, 
>> instead the workaround with the GlobalWindow has to be used. I want to 
>> tackle this for the release after 0.10, however.
>> 
>> Please let us know if you need more information. I’m always happy to help in 
>> these interesting cases at the bleeding edge of what is possible. :-)
>> 
>> Cheers,
>> Aljoscha
>> 
>>> On 16 Oct 2015, at 19:36, Hamilton, Paul  wrote:
>>> 
>>> Hi,
>>> 
>>> I am attempting to make use of the new window APIs in streaming to
>>> implement a session based window and am not sure if the currently provided
>>> functionality handles my use case.  Specifically what I want to do is
>>> something conceptually similar to a ³Sessions.withGapDuration(Š)² window
>>> in Google DataFlow.
>>> 
>>> Assuming the events are keyed by session id.  I would like to use the
>>> event time and the watermarking functionality to trigger a window after
>>> the ³end of a session² (no events for a given session received within x
>>> amount of time).  With watermarking this would mean trigger when a
>>> watermark is seen that is > (the time of the last event + session
>>> timeout). Also I want to perform an early triggering of the window after a
>>> given number of events have been received.
>>> 
>>> Is it currently possible to do this with the current combination of window
>>> assigners and triggers?  I am happy to write custom triggers etc, but
>>> wanted to make sure it wasn¹t already available before going down that
>>> road.
>>> 
>>> Thanks,
>>> 
>>> Paul Hamilton
>>> Hybris Software
>>> 
>>> 
>> 
>> 
> 
> -- 
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082



Re: Creating a representative streaming workload

2015-11-18 Thread Robert Metzger
Hey Vasia,

I think a very common workload would be an event stream from web servers of
an online shop. Usually, these shops have multiple servers, so events
arrive out of order.
I think there are plenty of different use cases that you can build around
that data:
- Users perform different actions that a streaming system could track
(analysis of click-paths),
- some simple statistics using windows (items sold in the last 10 minutes,
..).
- Maybe fraud detection would be another use case.
- Often, there also needs to be a sink to HDFS or another file system for a
long-term archive.

I would love to see such an event generator in flink's contrib module. I
think that's something the entire streaming space could use.




On Mon, Nov 16, 2015 at 8:22 PM, Nick Dimiduk  wrote:

> All those should apply for streaming too...
>
> On Mon, Nov 16, 2015 at 11:06 AM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi,
>>
>> thanks Nick and Ovidiu for the links!
>>
>> Just to clarify, we're not looking into creating a generic streaming
>> benchmark. We have quite limited time and resources for this project. What
>> we want is to decide on a set of 3-4 _common_ streaming applications. To
>> give you an idea, for the batch workload, we will pick something like a
>> grep, one relational application, a graph algorithm, and an ML algorithm.
>>
>> Cheers,
>> -Vasia.
>>
>> On 16 November 2015 at 19:25, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.ma...@inria.fr> wrote:
>>
>>> Regarding Flink vs Spark / Storm you can check here:
>>> http://www.sparkbigdata.com/102-spark-blog-slim-baltagi/14-results-of-a-benchmark-between-apache-flink-and-apache-spark
>>>
>>> Best regards,
>>> Ovidiu
>>>
>>> On 16 Nov 2015, at 15:21, Vasiliki Kalavri 
>>> wrote:
>>>
>>> Hello squirrels,
>>>
>>> with some colleagues and students here at KTH, we have started 2
>>> projects to evaluate (1) performance and (2) behavior in the presence of
>>> memory interference in cloud environments, for Flink and other systems. We
>>> want to provide our students with a workload of representative applications
>>> for testing.
>>>
>>> While for batch applications, it is quite clear to us what classes of
>>> applications are widely used and how to create a workload of different
>>> types of applications, we are not quite sure about the streaming workload.
>>>
>>> That's why, we'd like your opinions! If you're using Flink streaming in
>>> your company or your project, we'd love your input even more :-)
>>>
>>> What kind of applications would you consider as "representative" of a
>>> streaming workload? Have you run any experiments to evaluate Flink versus
>>> Spark, Storm etc.? If yes, would you mind sharing your code with us?
>>>
>>> We will of course be happy to share our results with everyone after we
>>> have completed our study.
>>>
>>> Thanks a lot!
>>> -Vasia.
>>>
>>>
>>>
>>
>


Re: Session Based Windows

2015-11-18 Thread Konstantin Knauf
Hi Aljoscha,

thanks, that's what I thought. Just wanted to verify, that keyBy +
SessionWindow() works with intermingled events.

Cheers,

Konstantin

On 18.11.2015 11:14, Aljoscha Krettek wrote:
> Hi Konstatin,
> you are right, if the stream is keyed by the session-id then it works.
> 
> I was referring to the case where you have, for example, some interactions 
> with timestamps and you want to derive the sessions from this. In that case, 
> it can happen that events that should belong to one session (depending on 
> their timestamp) arrive intermixed with elements that should belong to 
> another session because of delays (and because elements never really arrive 
> in the order of their timestamps). Does this make clear what I meant? It’s a 
> bit tricky, so I can maybe draw a picture if it helps.
> 
> Cheers,
> Aljoscha
>> On 18 Nov 2015, at 09:09, Vladimir Stoyak  wrote:
>>
>> We, were also trying to address session windowing but took slightly 
>> different approach as to what window we place the event into. 
>>
>> We did not want "triggering event" to be purged as part of the window it 
>> triggered, but instead to create a new window for it and have the old window 
>> to fire and purge on event time timeout.
>>
>> Take a look and see if it will be useful - 
>> https://bitbucket.org/snippets/vstoyak/o9Rqp
>>
>> Vladimir
>>
>>
>>
>> On Tuesday, November 17, 2015 11:25 PM, Konstantin Knauf 
>>  wrote:
>> Hi Aljoscha,
>>
>> sorry to bother you again (this time with this old thread), just a short
>> question about the caveat you mention in your answer. You wrote that
>> events of different sessions can not intermingled. Isn't the idea of the
>> keyBy expression below exactly not to have intermingled sessions by
>> first grouping by sesion-ids?
>>
>> Cheers and thank you,
>>
>> Konstantin
>>
>> On 17.10.2015 14:39, Aljoscha Krettek wrote:
>>> Hi Paul,
>>> it’s good to see people interested in this. I sketched a Trigger that 
>>> should fit your requirements: 
>>> https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac
>>>
>>> You can use it like this:
>>>
>>> DataStream<> input = …
>>> DataStream<> result = input
>>>  .keyBy(“session-id”)
>>>  .window(GlobalWindows.create())
>>>  .trigger(new SessionTrigger(timeout, maxElements))
>>>  .apply(new MyWindowFunction())
>>>
>>> The Trigger uses the new state API that I’m currently introducing in a new 
>>> Pull Request. It should be merged very soon, before the 0.10 release.
>>>
>>> This implementation has one caveat, though. It cannot deal with elements 
>>> that belong to different sessions that arrive intermingled with other 
>>> sessions. The reason is that Flink does not yet support merging the windows 
>>> that the WindowAssigner assigns as, for example, the Cloud Dataflow API 
>>> supports. This means that elements cannot be assigned to session windows, 
>>> instead the workaround with the GlobalWindow has to be used. I want to 
>>> tackle this for the release after 0.10, however.
>>>
>>> Please let us know if you need more information. I’m always happy to help 
>>> in these interesting cases at the bleeding edge of what is possible. :-)
>>>
>>> Cheers,
>>> Aljoscha
>>>
 On 16 Oct 2015, at 19:36, Hamilton, Paul  wrote:

 Hi,

 I am attempting to make use of the new window APIs in streaming to
 implement a session based window and am not sure if the currently provided
 functionality handles my use case.  Specifically what I want to do is
 something conceptually similar to a ³Sessions.withGapDuration(Š)² window
 in Google DataFlow.

 Assuming the events are keyed by session id.  I would like to use the
 event time and the watermarking functionality to trigger a window after
 the ³end of a session² (no events for a given session received within x
 amount of time).  With watermarking this would mean trigger when a
 watermark is seen that is > (the time of the last event + session
 timeout). Also I want to perform an early triggering of the window after a
 given number of events have been received.

 Is it currently possible to do this with the current combination of window
 assigners and triggers?  I am happy to write custom triggers etc, but
 wanted to make sure it wasn¹t already available before going down that
 road.

 Thanks,

 Paul Hamilton
 Hybris Software


>>>
>>>
>>
>> -- 
>> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: finite subset of an infinite data stream

2015-11-18 Thread Aljoscha Krettek
Hi,
I wrote a little example that could be what you are looking for: 
https://github.com/dataArtisans/query-window-example

It basically implements a window operator with a modifiable window size that 
also allows querying the current accumulated window contents using a second 
input stream.

There is a README file in the github repository, but please let me know if you 
need further explanations.

Cheers,
Aljoscha

> On 18 Nov 2015, at 12:02, Robert Metzger  wrote:
> 
> Hi Roman,
> 
> I've updated the documentation. It seems that it got out of sync. Thank you 
> for notifying us about this.
> 
> My colleague Aljoscha has some experimental code that is probably doing what 
> you are looking for: A standing window (your RT-buffer) that you can query 
> using a secondary stream (your user's queries).
> He'll post the code soon to this email thread.
> 
> Regards,
> Robert
> 
> 
> On Wed, Nov 11, 2015 at 2:51 PM, rss rss  wrote:
> Hello,
> 
>   thanks, Stephan, but triggers are not that I searched. And BTW, the 
> documentation is obsolete. There is no Count class now. I found CountTrigger 
> only.
> 
>   Thanks Robert, your example may be useful for me but in some other point. I 
> mentioned "union" as an ordinary union of similar data. It is the same as 
> "union" in the datastream documentation.
>   
>   The task is very simple. We have an infinite stream of data from sensors, 
> billing system etc. There is no matter what it is but it is infinite. We have 
> to store the data in any persistent storage to be able to make analytical 
> queries later. And there is a stream of user's analytical queries. But the 
> stream of input data is big and time of saving in the persistent storage is 
> big too. And we have not a very fast bigdata OLTP storage. That is the data 
> extracted from the persistent storage by the user's requests probably will 
> not contain actual data. We have to have some real time buffer (RT-Buffer in 
> the schema) with actual data and have to union it with the data processing 
> results from persistent storage (I don't speak about data deduplication and 
> ordering now.). And of course the user's query are unpredictable regarding 
> data filtering conditions.
> 
>   The attached schema is attempt to understand how it may be implemented with 
> Flink. I tried to imagine how to implement it by Flink's streaming API but 
> found obstacles. This schema is not first variant. It contains separated 
> driver program to configure new jobs by user's queries. The reason I not 
> found a way how to link the stream of user's queries with further data 
> processing. But it is some near to 
> https://gist.github.com/fhueske/4ea5422edb5820915fa4
> 
> 
> 
> 
>   The main question is how to process each user's query combining it with 
> actual data from the real time buffer and batch request to the persistent 
> storage. Unfortunately I not found a decision in Streaming API only.
> 
> Regards, 
> Roman
> 
> 2015-11-11 15:45 GMT+04:00 Robert Metzger :
> I think what you call "union" is a "connected stream" in Flink. Have a look 
> at this example: https://gist.github.com/fhueske/4ea5422edb5820915fa4
> It shows how to dynamically update a list of filters by external requests.
> Maybe that's what you are looking for?
> 
> 
> 
> On Wed, Nov 11, 2015 at 12:15 PM, Stephan Ewen  wrote:
> Hi!
> 
> I don not really understand what exactly you want to do, especially the 
> "union an infinite real time data stream with filtered persistent data where 
> the condition of filtering is provided by external requests".
> 
> If you want to work on substreams in general, there are two options:
> 
> 1) Create the substream in a streaming window. You can "cut" the stream based 
> on special records/events that signal that the subsequence is done. Have a 
> look at the "Trigger" class for windows, it can react to elements and their 
> contents:
> 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#windows-on-keyed-data-streams
>  (secion on Advanced Windowing).
> 
> 
> 2) You can trigger sequences of batch jobs. The batch job data source (input 
> format) can decide when to stop consuming the stream, at which point the 
> remainder of the transformations run, and the batch job finishes. 
> You can already run new transformation chains after each call to 
> "env.execute()", once the execution finished, to implement the sequence of 
> batch jobs.
> 
> 
> I would try and go for the windowing solution if that works, because that 
> will give you better fault tolerance / high availability. In the repeated 
> batch jobs case, you need to worry yourself about what happens when the 
> driver program (that calls env.execute()) fails.
> 
> 
> Hope that helps...
> 
> Greetings,
> Stephan
> 
> 
> 
> On Mon, Nov 9, 2015 at 1:24 PM, rss rss  wrote:
> Hello, 
> 
>   thanks for the answer but windows produce periodical results. I used your 
> example but the data source is changed to TCP stream:
> 
> Da

Re: Published test artifacts for flink streaming

2015-11-18 Thread Stephan Ewen
There is no global order in parallel streams, it is something that
applications need to work with. We are thinking about adding operations to
introduce event-time order (at the cost of some delay), but that is only
plans at this point.


What I do in my tests is run the test streams in parallel, but the Sink in
DOP 1. The sink gathers the elements in a list, and the close() function
validates the result.

Validating the results may involve sorting the list where elements where
gathered (make the order deterministic) or use a hash set if it is only
about distinct count.

Hope that helps.

On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk  wrote:

> On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk  wrote:
>
>> Thanks Stephan, I'll check that out in the morning. Generally speaking,
>> it would be great to have some single-jvm example tests for those of us
>> getting started. Following the example of WindowingIntegrationTest is
>> mostly working, though reusing my single sink instance with its static
>> collection results in non-deterministic results; there appears to be a race
>> between instances clearing the collection in their open method and the
>> runtime returning the collection to my test harness.
>
>
> This inconsistent test result is pretty frustrating. I've created a sample
> project with an IT that demonstrates the issue. Run `mvn test` multiple
> times and see that sometimes it passes and sometimes it fails. Maybe
> someone has some thoughts?
>
> https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6
>
> Thanks,
> Nick
>
> I'd also appreciate some guidance on stream composition. It's nice to use
>> the fluent API when exploring data in a shell, but it seems to me like that
>> API is cumbersome when composing data pipelines of reusable partials. Or
>> maybe I'm doing it all wrong... Hence the request for more examples :)
>>
>> While I'm asking, how might you model this: I have a set of predicates
>> I'd like to flatMap over a stream. An input item should be compared vs
>> every predicate (basically, I want a Clojure juxt of predicates over each
>> stream element). Imagine those predicates expressed as where clauses via
>> the Table API. Say I have hundreds of thousands of these predicates to run
>> over every stream event. Is the java client API rich enough to express such
>> a flow, or should I examine something lower than DataStream?
>>
>> Thanks a lot, and sorry for all the newb questions.
>> -n
>>
>>
>> On Thursday, November 5, 2015, Stephan Ewen  wrote:
>>
>>> Hey!
>>>
>>> There is also a collect() sink in the "flink-streaming-contrib" project,
>>> see here:
>>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>>>
>>> It should work well locally for testing. In that case you can write a
>>> program as usual an use "DataStreamUtils.collect(stream)", so you need to
>>> stop reading it once you know the stream is exhausted...
>>>
>>> Stephan
>>>
>>>
>>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk 
>>> wrote:
>>>
 Hi Robert,

 It seems "type" was what I needed. This it also looks like the test
 jar has an undeclared dependency. In the end, the following allowed me
 to use TestStreamEnvironment for my integration test. Thanks a lot!

 -n

 
   org.apache.flink
   flink-streaming-core
   ${flink.version}
   test-jar
   test
 
 
   org.apache.flink
   flink-test-utils
   ${flink.version}
   test
 

 On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger 
 wrote:
 > Hi Nick,
 >
 > we are usually publishing the test  artifacts. Can you try and
 replace the
 >  tag by test-jar:
 >
 > 
 >org.apache.flink
 >flink-streaming-core
 >${flink.version}
 >test-jar
 >test
 > 
 >
 >
 > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk 
 wrote:
 >>
 >> Hello,
 >>
 >> I'm attempting integration tests for my streaming flows. I'd like to
 >> produce an input stream of java objects and sink the results into a
 >> collection for verification via JUnit asserts.
 >> StreamExecutionEnvironment provides methods for the former, however,
 >> how to achieve the latter is not evident based on my internet
 >> searching. I think I've found a solution in the TestStreamEnvironment
 >> class, ie, as used by WindowingIntegrationTest. However, this class
 >> appears to be packaged in the flink-streaming-core test artifact,
 >> which is not published to maven.
 >>
 >> For reference, this is the maven dependency stanza I'm using. Please
 >> let me know if I've got it wrong.
 >>
 >> Thanks,
 >> Nick
 >>
 >> 
 >>   org.apache.flink
 >>   flink-streaming-core
 >>   ${flink.version}
 >>   test
 >>   test

Re: Checkpoints in batch processing & JDBC Output Format

2015-11-18 Thread Stephan Ewen
Hi!

If you go with the Batch API, then any failed task (like a sink trying to
insert into the database) will be completely re-executed. That makes sure
no data is lost in any way, no extra effort needed.

It may insert a lot of duplicates, though, if the task is re-started after
half the data was inserted. That is where streaming does a better job (more
fine grained checkpoints / commits). Not sure if you worry about this, or
have a deterministic primary key anyways where the database insertion
discards duplicate records automatically.

Stephan




On Mon, Nov 16, 2015 at 10:07 AM, Maximilian Bode <
maximilian.b...@tngtech.com> wrote:

> Hi Stephan,
>
> thank you very much for your answer. I was happy to meet Robert in Munich
> last week and he proposed that for our problem, batch processing is the way
> to go.
>
> We also talked about how exactly to guarantee in this context that no data
> is lost even in the case the job dies while writing to the database. His
> idea was based on inserting a 'batch id' field into the database and
> therefore being able to check whether something has already been committed
> or not. Do you happen to have further input on how this or a similar
> approach (e.g. using a timestamp) could be automated, perhaps by
> customizing the output format as well?
>
> Cheers,
> Max
>
> Am 11.11.2015 um 11:35 schrieb Stephan Ewen :
>
> Hi!
>
> You can use both the DataSet API or the DataStream API for that. In case
> of failures, they would behave slightly differently.
>
> DataSet:
>
> Fault tolerance for the DataSet API works by restarting the job and
> redoing all of the work. In some sense, that is similar to what happens in
> MapReduce, only that Flink currently restarts more tasks than strictly
> necessary (work in progress to reduce that). The periodic in-flight
> checkpoints are not used here.
>
> DataStream:
>
> This one would start immediately inserting data (as it is a streaming
> job), and draw periodic checkpoints that make sure replay-on-failure only
> has to redo only a bit, not everything.Whether this fits your use case
> depends on the type of processing you want to do.
> You could even use this job in a way that it monitors the directory for
> new files, picks them up, and starts immediate insertion into the database
> when they appear.
>
>
> Considering the last question (JDBC output format): Using UPSERT needs a
> few modifications (issue that another user had), you would probably have to
> write a custom output format that would be based on the JDBC output format.
>
> If you go with the streaming API, it should be possible to change the
> database writing output format to give you exactly-once semantics. The way
> to do that would be to commit the upserts only on completed checkpoints
> (and buffer them in the sink between checkpoints). This may be interesting
> if your database cannot deduplicate insertions (no deterministic primary
> key).
>
> Greetings,
> Stephan
>
>
> On Mon, Nov 9, 2015 at 5:25 PM, Maximilian Bode <
> maximilian.b...@tngtech.com> wrote:
>
>> Hi everyone,
>>
>> I am considering using Flink in a project. The setting would be a YARN
>> cluster where data is first read in from HDFS, then processed and finally
>> written into an Oracle database using an upsert command. If I understand
>> the documentation correctly, the DataSet API would be the natural candidate
>> for this problem.
>>
>> My first question is about the checkpointing system. Apparently (e.g. [1]
>> and [2]) it does not apply to batch processing. So how does Flink handle
>> failures during batch processing? For the use case described above, 'at
>> least once' semantics would suffice – still, are 'exactly once' guarantees
>> possible?
>> For example, how does Flink handle a failure of one taskmanager during a
>> batch process? What happens in this case, if the data has already partly
>> been written to the database?
>>
>> Secondly, the most obvious, straight-forward approach of connecting to
>> the Oracle DB would be the JDBC Output Format. In [3], it was mentioned
>> that it does not have many users and might not be trusted. What is the
>> status on this?
>>
>> Best regards,
>> Max
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-and-Spark-tp583p587.html
>> [2]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Batch-Processing-as-Streaming-td1909.html
>> [3]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PotsgreSQL-JDBC-Sink-quot-writeRecord-failed-quot-and-quot-Batch-element-cancelled-quot-on-upsert-td623.html
>>
>
>
>


RE: MaxPermSize on yarn

2015-11-18 Thread Gwenhael Pasquiers
The option was accepted using the yaml file and it looks likes it solved our 
issue.

Thanks again.

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: mardi 17 novembre 2015 12:04
To: user@flink.apache.org
Subject: Re: MaxPermSize on yarn

You can also put the configuration option into the flink-conf.yaml file.

On Tue, Nov 17, 2015 at 12:01 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
We tried to add : -yD env.java.opts="-XX:MaxPermSize=256m"

But we’ve got to investigate since we have to following error :

Improperly specified VM option 'MaxPermSize'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

Either is the option is badly supported, or it is incorrectly passed to the VM 
(maybe the “-XX:” causes some issues in the scripts).

We’ll investigate.

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: lundi 16 novembre 2015 19:18
To: user@flink.apache.org
Subject: Re: MaxPermSize on yarn

Hi Gwen,

there is a configuration value called "env.java.opts", that allows you to pass 
custom JVM args to JM and TM JVMs.

I hope that helps.



On Mon, Nov 16, 2015 at 5:30 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hi,

We’re having some OOM permgen exceptions when running on yarn.

We’re not yet sure if it is either a consequence or a cause of our crashes, but 
we’ve been trying to increase that value… And we did not find how to do it.

I’ve seen that the yarn-daemon.sh sets a 256m value.
It looks to me that it’s also possible to customize the YarnClient JVM args, 
but it will only be for the client, not for the TaskManagers.

Do you know of a way to do it ?

B.R.

Gwenhaël PASQUIERS




Re: Published test artifacts for flink streaming

2015-11-18 Thread Nick Dimiduk
Sorry Stephan but I don't follow how global order applies in my case. I'm
merely checking the size of the sink results. I assume all tuples from a
given test invitation have sunk before the next test begins, which is
clearly not the case. Is there a way I can place a barrier in my tests to
ensure one streaming DAG runs at a time, and that all buffers have been
flushed to the sink before the next test begins?

What is "sink in DOP 1"?

Thanks,
Nick

On Wednesday, November 18, 2015, Stephan Ewen  wrote:

> There is no global order in parallel streams, it is something that
> applications need to work with. We are thinking about adding operations to
> introduce event-time order (at the cost of some delay), but that is only
> plans at this point.
>
>
> What I do in my tests is run the test streams in parallel, but the Sink in
> DOP 1. The sink gathers the elements in a list, and the close() function
> validates the result.
>
> Validating the results may involve sorting the list where elements where
> gathered (make the order deterministic) or use a hash set if it is only
> about distinct count.
>
> Hope that helps.
>
> On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk  > wrote:
>
>> On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk > > wrote:
>>
>>> Thanks Stephan, I'll check that out in the morning. Generally speaking,
>>> it would be great to have some single-jvm example tests for those of us
>>> getting started. Following the example of WindowingIntegrationTest is
>>> mostly working, though reusing my single sink instance with its static
>>> collection results in non-deterministic results; there appears to be a race
>>> between instances clearing the collection in their open method and the
>>> runtime returning the collection to my test harness.
>>
>>
>> This inconsistent test result is pretty frustrating. I've created a
>> sample project with an IT that demonstrates the issue. Run `mvn test`
>> multiple times and see that sometimes it passes and sometimes it fails.
>> Maybe someone has some thoughts?
>>
>> https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6
>>
>> Thanks,
>> Nick
>>
>> I'd also appreciate some guidance on stream composition. It's nice to use
>>> the fluent API when exploring data in a shell, but it seems to me like that
>>> API is cumbersome when composing data pipelines of reusable partials. Or
>>> maybe I'm doing it all wrong... Hence the request for more examples :)
>>>
>>> While I'm asking, how might you model this: I have a set of predicates
>>> I'd like to flatMap over a stream. An input item should be compared vs
>>> every predicate (basically, I want a Clojure juxt of predicates over each
>>> stream element). Imagine those predicates expressed as where clauses via
>>> the Table API. Say I have hundreds of thousands of these predicates to run
>>> over every stream event. Is the java client API rich enough to express such
>>> a flow, or should I examine something lower than DataStream?
>>>
>>> Thanks a lot, and sorry for all the newb questions.
>>> -n
>>>
>>>
>>> On Thursday, November 5, 2015, Stephan Ewen >> > wrote:
>>>
 Hey!

 There is also a collect() sink in the "flink-streaming-contrib"
 project, see here:
 https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java

 It should work well locally for testing. In that case you can write a
 program as usual an use "DataStreamUtils.collect(stream)", so you need to
 stop reading it once you know the stream is exhausted...

 Stephan


 On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk 
 wrote:

> Hi Robert,
>
> It seems "type" was what I needed. This it also looks like the test
> jar has an undeclared dependency. In the end, the following allowed me
> to use TestStreamEnvironment for my integration test. Thanks a lot!
>
> -n
>
> 
>   org.apache.flink
>   flink-streaming-core
>   ${flink.version}
>   test-jar
>   test
> 
> 
>   org.apache.flink
>   flink-test-utils
>   ${flink.version}
>   test
> 
>
> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger 
> wrote:
> > Hi Nick,
> >
> > we are usually publishing the test  artifacts. Can you try and
> replace the
> >  tag by test-jar:
> >
> > 
> >org.apache.flink
> >flink-streaming-core
> >${flink.version}
> >test-jar
> >test
> > 
> >
> >
> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk 
> wrote:
> >>
> >> Hello,
> >>
> >> I'm attempting integration tests for my streaming flows. I'd like to
> >> produce an input stream of java objects and sink the results into a
> >> collection for verification via JUnit asserts.
> >> StreamExecutionEnvironment provides methods for the former, however,
> >>

Re: Does 'DataStream.writeAsCsv' suppose to work like this?

2015-11-18 Thread Stephan Ewen
That makes sense...

On Mon, Oct 26, 2015 at 12:31 PM, Márton Balassi 
wrote:

> Hey Max,
>
> The solution I am proposing is not flushing on every record, but it makes
> sure to forward the flushing from the sinkfunction to the outputformat
> whenever it is triggered. Practically this means that the buffering is done
> (almost) solely in the sink and not in the outputformat any more.
>
> On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels 
> wrote:
>
>> Not sure whether we really want to flush at every invoke call. If you
>> want to flush every time, you may want to set the update condition to 0
>> milliseconds. That way, flush will be called every time. In the API this is
>> exposed by using the FileSinkFunctionByMillis. If you flush every time,
>> performance might degrade.
>>
>> By the way, you may also use the RollingFileSink which splits the output
>> into several files for each hour/week/day. You can then be sure those files
>> are already completely written to HDFS.
>>
>> Best regards,
>> Max
>>
>> On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi > > wrote:
>>
>>> The problem persists in the current master, simply a format.flush() is
>>> needed here [1]. I'll do a quick hotfix, thanks for the report again!
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99
>>>
>>> On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi <
>>> balassi.mar...@gmail.com> wrote:
>>>
 Hey Rex,

 Writing half-baked records is definitely unwanted, thanks for spotting
 this. Most likely it can be solved by adding a flush at the end of every
 invoke call, let me check.

 Best,

 Marton

 On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge  wrote:

> Hi, flinkers!
>
> I'm new to this whole thing,
> and it seems to me that
> ' org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String,
> WriteMode, long)' does not work properly.
> To be specific, data were not flushed by update frequency when write
> to HDFS.
>
> what make it more disturbing is that, if I check the content with
> 'hdfs dfs -cat xxx', sometimes I got partial records.
>
>
> I did a little digging in flink-0.9.1.
> And it turns out all
> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)'
> does
> is pushing data to
> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
> which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.
>
> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never
> flushed.
> Which result in data being held in local buffer, and 'hdfs dfs -cat
> xxx' might return partial records.
>
>
> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up
> somewhere?
>
>
> Best regards and thanks for your time!
>
> Rex
>


>>>
>>
>


Re: Does 'DataStream.writeAsCsv' suppose to work like this?

2015-11-18 Thread Maximilian Michels
Yes, that does make sense! Thank you for explaining. Have you made the
change yet? I couldn't find it on the master.

On Wed, Nov 18, 2015 at 5:16 PM, Stephan Ewen  wrote:
> That makes sense...
>
> On Mon, Oct 26, 2015 at 12:31 PM, Márton Balassi 
> wrote:
>>
>> Hey Max,
>>
>> The solution I am proposing is not flushing on every record, but it makes
>> sure to forward the flushing from the sinkfunction to the outputformat
>> whenever it is triggered. Practically this means that the buffering is done
>> (almost) solely in the sink and not in the outputformat any more.
>>
>> On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels 
>> wrote:
>>>
>>> Not sure whether we really want to flush at every invoke call. If you
>>> want to flush every time, you may want to set the update condition to 0
>>> milliseconds. That way, flush will be called every time. In the API this is
>>> exposed by using the FileSinkFunctionByMillis. If you flush every time,
>>> performance might degrade.
>>>
>>> By the way, you may also use the RollingFileSink which splits the output
>>> into several files for each hour/week/day. You can then be sure those files
>>> are already completely written to HDFS.
>>>
>>> Best regards,
>>> Max
>>>
>>> On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi
>>>  wrote:

 The problem persists in the current master, simply a format.flush() is
 needed here [1]. I'll do a quick hotfix, thanks for the report again!

 [1]
 https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99

 On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi
  wrote:
>
> Hey Rex,
>
> Writing half-baked records is definitely unwanted, thanks for spotting
> this. Most likely it can be solved by adding a flush at the end of every
> invoke call, let me check.
>
> Best,
>
> Marton
>
> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge  wrote:
>>
>> Hi, flinkers!
>>
>> I'm new to this whole thing,
>> and it seems to me that '
>> org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String,
>> WriteMode, long)' does not work properly.
>> To be specific, data were not flushed by update frequency when write
>> to HDFS.
>>
>> what make it more disturbing is that, if I check the content with
>> 'hdfs dfs -cat xxx', sometimes I got partial records.
>>
>>
>> I did a little digging in flink-0.9.1.
>> And it turns out all
>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)'
>> does
>> is pushing data to
>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
>> which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.
>>
>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never
>> flushed.
>> Which result in data being held in local buffer, and 'hdfs dfs -cat
>> xxx' might return partial records.
>>
>>
>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed up
>> somewhere?
>>
>>
>> Best regards and thanks for your time!
>>
>> Rex
>
>

>>>
>>
>


Re: Specially introduced Flink to chinese users in CNCC(China National Computer Congress)

2015-11-18 Thread Stephan Ewen
Thank you indeed for presenting there.

It looks like a very large audience!

Greetings,
Stephan


On Mon, Oct 26, 2015 at 11:24 AM, Maximilian Michels  wrote:

> Hi Liang,
>
> We greatly appreciate you introduced Flink to the Chinese users at CNCC!
> We would love to hear how people like Flink.
>
> Please keep us up to date and point the users to the mailing list or
> Stackoverflow if they have any difficulties.
>
> Best regards,
> Max
>
> On Sat, Oct 24, 2015 at 5:48 PM, Liang Chen 
> wrote:
>
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n3255/IMG_20151023_104030.jpg
>> >
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Specially-introduced-Flink-to-chinese-users-in-CNCC-China-National-Computer-Congress-tp3254p3255.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: Reading null value from datasets

2015-11-18 Thread Stephan Ewen
Hi Guido!

If you use Scala, I would use an Option to represent nullable fields. That
is a very explicit solution that marks which fields can be null, and also
forces the program to handle this carefully.

We are looking to add support for Java 8's Optional type as well for
exactly that purpose.

Greetings,
Stephan


On Mon, Oct 26, 2015 at 10:27 AM, Maximilian Michels  wrote:

> As far as I know the null support was removed from the Table API because
> its support was consistently supported with all operations. See
> https://issues.apache.org/jira/browse/FLINK-2236
>
>
> On Fri, Oct 23, 2015 at 7:20 PM, Shiti Saxena 
> wrote:
>
>> For a similar problem where we wanted to preserve and track null entries,
>> we load the CSV as a DataSet[Array[Object]] and then transform it into
>> DataSet[Row] using a custom RowSerializer(
>> https://gist.github.com/Shiti/d0572c089cc08654019c) which handles null.
>>
>> The Table API(which supports null) can then be used on the resulting
>> DataSet[Row].
>>
>> On Fri, Oct 23, 2015 at 7:40 PM, Maximilian Michels 
>> wrote:
>>
>>> Hi Guido,
>>>
>>> This depends on your use case but you may read those values as type
>>> String and treat them accordingly.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Fri, Oct 23, 2015 at 1:59 PM, Guido  wrote:
>>>
 Hello,
 I would like to ask if there were any particular ways to read or treat
 null (e.g. Name, Lastname,, Age..) value in a dataset using readCsvFile,
 without being forced to ignore them.

 Thanks for your time.
 Guido


>>>
>>
>


Re: Parallel file read in LocalEnvironment

2015-11-18 Thread Stephan Ewen
Late answer, sorry:

The splits are created in the JobManager, so the sub submission should not
be affected by that.

The assignment of splits to workers is very fast, so many splits with small
data is not very different from few splits with large data.

Lines are never materialized and the operators do not work differently
based on different numbers of splits.

On Wed, Oct 7, 2015 at 4:26 PM, Flavio Pompermaier 
wrote:

> I've tried to split my huge file by lines count (using the bash command
> split -l) in 2 different ways:
>
>1. small lines count (huge number of small files)
>2. big lines count (small number of big files)
>
> I can't understand why the time required to effectively start the job is
> more or less the same
>
>- in 1. it takes a lot to fetch the file list (~50.000) and the split
>assigner is fast to assign the splits (but also being fast they are a lot)
>- in 2. Flink is fast in fetch the file list but it's extremely slow
>to generate the splits to assign
>
> Initially I was thinking that Flink was eagerly materializing the lines
> somewhere but both the memory and the disks doesn't increase.
> What is going on underneath? Is it normal?
>
> Thanks in advance,
> Flavio
>
>
>
> On Wed, Oct 7, 2015 at 3:27 PM, Stephan Ewen  wrote:
>
>> The split functionality is in the FileInputFormat and the functionality
>> that takes care of lines across splits is in the DelimitedIntputFormat.
>>
>> On Wed, Oct 7, 2015 at 3:24 PM, Fabian Hueske  wrote:
>>
>>> I'm sorry there is no such documentation.
>>> You need to look at the code :-(
>>>
>>> 2015-10-07 15:19 GMT+02:00 Flavio Pompermaier :
>>>
 And what is the split policy for the FileInputFormat?it depends on the
 fs block size?
 Is there a pointer to the several flink input formats and a description
 of their internals?

 On Wed, Oct 7, 2015 at 3:09 PM, Fabian Hueske 
 wrote:

> Hi Flavio,
>
> it is not possible to split by line count because that would mean to
> read and parse the file just for splitting.
>
> Parallel processing of data sources depends on the input splits
> created by the InputFormat. Local files can be split just like files in
> HDFS. Usually, each file corresponds to at least one split but multiple
> files could also be put into a single split if necessary.The logic for 
> that
> would go into to the InputFormat.createInputSplits() method.
>
> Cheers, Fabian
>
> 2015-10-07 14:47 GMT+02:00 Flavio Pompermaier :
>
>> Hi to all,
>>
>> is there a way to split a single local file by line count (e.g. a
>> split every 100 lines) in a LocalEnvironment to speed up a simple map
>> function? For me it is not very clear how the local files (files into
>> directory if recursive=true) are managed by Flink..is there any ref to 
>> this
>> internals?
>>
>> Best,
>> Flavio
>>
>
>


>>>
>>
>


YARN High Availability

2015-11-18 Thread Gwenhael Pasquiers
Hello,

We're trying to set up high availability using an existing zookeeper quorum 
already running in our Cloudera cluster.

So, as per the doc we've changed the max attempt in yarn's config as well as 
the flink.yaml.

recovery.mode: zookeeper
recovery.zookeeper.quorum: host1:3181,host2:3181,host3:3181
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
recovery.zookeeper.storageDir: hdfs:///flink/recovery/
yarn.application-attempts: 1000

Everything is ok as long as recovery.mode is commented.
As soon as I uncomment recovery.mode the deployment on yarn is stuck on :

"Deploying cluster, current state ACCEPTED".
"Deployment took more than 60 seconds"
Every second.

And I have more than enough resources available on my yarn cluster.

Do you have any idea of what could cause this, and/or what logs I should look 
for in order to understand ?

B.R.

Gwenhaël PASQUIERS


Re: YARN High Availability

2015-11-18 Thread Till Rohrmann
Hi Gwenhaël,

do you have access to the yarn logs?

Cheers,
Till

On Wed, Nov 18, 2015 at 5:55 PM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> Hello,
>
>
>
> We’re trying to set up high availability using an existing zookeeper
> quorum already running in our Cloudera cluster.
>
>
>
> So, as per the doc we’ve changed the max attempt in yarn’s config as well
> as the flink.yaml.
>
>
>
> recovery.mode: zookeeper
>
> recovery.zookeeper.quorum: host1:3181,host2:3181,host3:3181
>
> state.backend: filesystem
>
> state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
>
> recovery.zookeeper.storageDir: hdfs:///flink/recovery/
>
> yarn.application-attempts: 1000
>
>
>
> Everything is ok as long as recovery.mode is commented.
>
> As soon as I uncomment recovery.mode the deployment on yarn is stuck on :
>
>
>
> “Deploying cluster, current state ACCEPTED”.
>
> “Deployment took more than 60 seconds….”
>
> Every second.
>
>
>
> And I have more than enough resources available on my yarn cluster.
>
>
>
> Do you have any idea of what could cause this, and/or what logs I should
> look for in order to understand ?
>
>
>
> B.R.
>
>
>
> Gwenhaël PASQUIERS
>


Re: Published test artifacts for flink streaming

2015-11-18 Thread Stephan Ewen
Okay, I think I misunderstood your problem.

Usually you can simply execute tests one after another by waiting until
"env.execute()" returns. The streaming jobs terminate by themselves once
the sources reach end of stream (finite streams are supported that way) but
make sure all records flow through the entire stream (no barrier needed).
So if you use a source that reaches an end, no extra work is needed.

If you have a proper infinite source (like Kafka), things are a bit more
tricky, since you have a proper infinite streaming program. What we do in
our Kafka Tests is throw a "SuccessException" in the sink once we saw all
data we expected. You can get the cause exceptions in a try/catch around
env.execute() to check if the program "failed" with a SuccessException, or
whether it failed proper.

A "sink in DOP 1" (sorry for the unclear terminology) is a sink with
parallelism 1, so all data is collected by the same function instance.

Any of this helpful?

Stephan


On Wed, Nov 18, 2015 at 5:13 PM, Nick Dimiduk  wrote:

> Sorry Stephan but I don't follow how global order applies in my case. I'm
> merely checking the size of the sink results. I assume all tuples from a
> given test invitation have sunk before the next test begins, which is
> clearly not the case. Is there a way I can place a barrier in my tests to
> ensure one streaming DAG runs at a time, and that all buffers have been
> flushed to the sink before the next test begins?
>
> What is "sink in DOP 1"?
>
> Thanks,
> Nick
>
>
> On Wednesday, November 18, 2015, Stephan Ewen  wrote:
>
>> There is no global order in parallel streams, it is something that
>> applications need to work with. We are thinking about adding operations to
>> introduce event-time order (at the cost of some delay), but that is only
>> plans at this point.
>>
>>
>> What I do in my tests is run the test streams in parallel, but the Sink
>> in DOP 1. The sink gathers the elements in a list, and the close() function
>> validates the result.
>>
>> Validating the results may involve sorting the list where elements where
>> gathered (make the order deterministic) or use a hash set if it is only
>> about distinct count.
>>
>> Hope that helps.
>>
>> On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk  wrote:
>>
>>> On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk  wrote:
>>>
 Thanks Stephan, I'll check that out in the morning. Generally speaking,
 it would be great to have some single-jvm example tests for those of us
 getting started. Following the example of WindowingIntegrationTest is
 mostly working, though reusing my single sink instance with its static
 collection results in non-deterministic results; there appears to be a race
 between instances clearing the collection in their open method and the
 runtime returning the collection to my test harness.
>>>
>>>
>>> This inconsistent test result is pretty frustrating. I've created a
>>> sample project with an IT that demonstrates the issue. Run `mvn test`
>>> multiple times and see that sometimes it passes and sometimes it fails.
>>> Maybe someone has some thoughts?
>>>
>>> https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6
>>>
>>> Thanks,
>>> Nick
>>>
>>> I'd also appreciate some guidance on stream composition. It's nice to
 use the fluent API when exploring data in a shell, but it seems to me like
 that API is cumbersome when composing data pipelines of reusable partials.
 Or maybe I'm doing it all wrong... Hence the request for more examples :)

 While I'm asking, how might you model this: I have a set of predicates
 I'd like to flatMap over a stream. An input item should be compared vs
 every predicate (basically, I want a Clojure juxt of predicates over each
 stream element). Imagine those predicates expressed as where clauses via
 the Table API. Say I have hundreds of thousands of these predicates to run
 over every stream event. Is the java client API rich enough to express such
 a flow, or should I examine something lower than DataStream?

 Thanks a lot, and sorry for all the newb questions.
 -n


 On Thursday, November 5, 2015, Stephan Ewen  wrote:

> Hey!
>
> There is also a collect() sink in the "flink-streaming-contrib"
> project, see here:
> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>
> It should work well locally for testing. In that case you can write a
> program as usual an use "DataStreamUtils.collect(stream)", so you need to
> stop reading it once you know the stream is exhausted...
>
> Stephan
>
>
> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk 
> wrote:
>
>> Hi Robert,
>>
>> It seems "type" was what I needed. This it also looks like the test
>> jar has an undeclared dependency. In the end, the following allowed me
>> to use TestStr

Fold vs Reduce in DataStream API

2015-11-18 Thread Ron Crocker
Is there a succinct description of the distinction between these transforms?

Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835



RE: YARN High Availability

2015-11-18 Thread Gwenhael Pasquiers
Nevermind,

Looking at the logs I saw that it was having issues trying to connect to ZK.
To make I short is had the wrong port.

It is now starting.

Tomorrow I’ll try to kill some JobManagers *evil*.

Another question : if I have multiple HA flink jobs, are there some points to 
check in order to be sure that they won’t collide on hdfs or ZK ?

B.R.

Gwenhaël PASQUIERS

From: Till Rohrmann [mailto:till.rohrm...@gmail.com]
Sent: mercredi 18 novembre 2015 18:01
To: user@flink.apache.org
Subject: Re: YARN High Availability

Hi Gwenhaël,

do you have access to the yarn logs?

Cheers,
Till

On Wed, Nov 18, 2015 at 5:55 PM, Gwenhael Pasquiers 
mailto:gwenhael.pasqui...@ericsson.com>> wrote:
Hello,

We’re trying to set up high availability using an existing zookeeper quorum 
already running in our Cloudera cluster.

So, as per the doc we’ve changed the max attempt in yarn’s config as well as 
the flink.yaml.

recovery.mode: zookeeper
recovery.zookeeper.quorum: host1:3181,host2:3181,host3:3181
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
recovery.zookeeper.storageDir: hdfs:///flink/recovery/
yarn.application-attempts: 1000

Everything is ok as long as recovery.mode is commented.
As soon as I uncomment recovery.mode the deployment on yarn is stuck on :

“Deploying cluster, current state ACCEPTED”.
“Deployment took more than 60 seconds….”
Every second.

And I have more than enough resources available on my yarn cluster.

Do you have any idea of what could cause this, and/or what logs I should look 
for in order to understand ?

B.R.

Gwenhaël PASQUIERS



Re: Fold vs Reduce in DataStream API

2015-11-18 Thread Fabian Hueske
Hi Ron,

Have you checked:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#transformations
?

Fold is like reduce, except that you define a start element (of a different
type than the input type) and the result type is the type of the initial
value. In reduce, the result type must be identical to the input type.

Best, Fabian

2015-11-18 18:32 GMT+01:00 Ron Crocker :

> Is there a succinct description of the distinction between these
> transforms?
>
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com
> M: +1 630 363 8835
>
>


Re: Parallel file read in LocalEnvironment

2015-11-18 Thread Flavio Pompermaier
So why it takes so much to start the job?because in any case the job
manager has to read all the lines of the input files before generating the
splits?
On 18 Nov 2015 17:52, "Stephan Ewen"  wrote:

> Late answer, sorry:
>
> The splits are created in the JobManager, so the sub submission should not
> be affected by that.
>
> The assignment of splits to workers is very fast, so many splits with
> small data is not very different from few splits with large data.
>
> Lines are never materialized and the operators do not work differently
> based on different numbers of splits.
>
> On Wed, Oct 7, 2015 at 4:26 PM, Flavio Pompermaier 
> wrote:
>
>> I've tried to split my huge file by lines count (using the bash command
>> split -l) in 2 different ways:
>>
>>1. small lines count (huge number of small files)
>>2. big lines count (small number of big files)
>>
>> I can't understand why the time required to effectively start the job is
>> more or less the same
>>
>>- in 1. it takes a lot to fetch the file list (~50.000) and the split
>>assigner is fast to assign the splits (but also being fast they are a lot)
>>- in 2. Flink is fast in fetch the file list but it's extremely slow
>>to generate the splits to assign
>>
>> Initially I was thinking that Flink was eagerly materializing the lines
>> somewhere but both the memory and the disks doesn't increase.
>> What is going on underneath? Is it normal?
>>
>> Thanks in advance,
>> Flavio
>>
>>
>>
>> On Wed, Oct 7, 2015 at 3:27 PM, Stephan Ewen  wrote:
>>
>>> The split functionality is in the FileInputFormat and the functionality
>>> that takes care of lines across splits is in the DelimitedIntputFormat.
>>>
>>> On Wed, Oct 7, 2015 at 3:24 PM, Fabian Hueske  wrote:
>>>
 I'm sorry there is no such documentation.
 You need to look at the code :-(

 2015-10-07 15:19 GMT+02:00 Flavio Pompermaier :

> And what is the split policy for the FileInputFormat?it depends on the
> fs block size?
> Is there a pointer to the several flink input formats and a
> description of their internals?
>
> On Wed, Oct 7, 2015 at 3:09 PM, Fabian Hueske 
> wrote:
>
>> Hi Flavio,
>>
>> it is not possible to split by line count because that would mean to
>> read and parse the file just for splitting.
>>
>> Parallel processing of data sources depends on the input splits
>> created by the InputFormat. Local files can be split just like files in
>> HDFS. Usually, each file corresponds to at least one split but multiple
>> files could also be put into a single split if necessary.The logic for 
>> that
>> would go into to the InputFormat.createInputSplits() method.
>>
>> Cheers, Fabian
>>
>> 2015-10-07 14:47 GMT+02:00 Flavio Pompermaier :
>>
>>> Hi to all,
>>>
>>> is there a way to split a single local file by line count (e.g. a
>>> split every 100 lines) in a LocalEnvironment to speed up a simple map
>>> function? For me it is not very clear how the local files (files into
>>> directory if recursive=true) are managed by Flink..is there any ref to 
>>> this
>>> internals?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>
>

>>>
>>
>


Re: Parallel file read in LocalEnvironment

2015-11-18 Thread Stephan Ewen
The JobManager does not read all files, but is has to query the HDFS for
all file metadata (size, blocks, block locations), which can take a bit.
There is a separate call to the HDFS Namenode for each file. The more
files, the more metadata has to be collected.


On Wed, Nov 18, 2015 at 7:15 PM, Flavio Pompermaier 
wrote:

> So why it takes so much to start the job?because in any case the job
> manager has to read all the lines of the input files before generating the
> splits?
> On 18 Nov 2015 17:52, "Stephan Ewen"  wrote:
>
>> Late answer, sorry:
>>
>> The splits are created in the JobManager, so the sub submission should
>> not be affected by that.
>>
>> The assignment of splits to workers is very fast, so many splits with
>> small data is not very different from few splits with large data.
>>
>> Lines are never materialized and the operators do not work differently
>> based on different numbers of splits.
>>
>> On Wed, Oct 7, 2015 at 4:26 PM, Flavio Pompermaier 
>> wrote:
>>
>>> I've tried to split my huge file by lines count (using the bash command
>>> split -l) in 2 different ways:
>>>
>>>1. small lines count (huge number of small files)
>>>2. big lines count (small number of big files)
>>>
>>> I can't understand why the time required to effectively start the job is
>>> more or less the same
>>>
>>>- in 1. it takes a lot to fetch the file list (~50.000) and the
>>>split assigner is fast to assign the splits (but also being fast they 
>>> are a
>>>lot)
>>>- in 2. Flink is fast in fetch the file list but it's extremely slow
>>>to generate the splits to assign
>>>
>>> Initially I was thinking that Flink was eagerly materializing the lines
>>> somewhere but both the memory and the disks doesn't increase.
>>> What is going on underneath? Is it normal?
>>>
>>> Thanks in advance,
>>> Flavio
>>>
>>>
>>>
>>> On Wed, Oct 7, 2015 at 3:27 PM, Stephan Ewen  wrote:
>>>
 The split functionality is in the FileInputFormat and the functionality
 that takes care of lines across splits is in the DelimitedIntputFormat.

 On Wed, Oct 7, 2015 at 3:24 PM, Fabian Hueske 
 wrote:

> I'm sorry there is no such documentation.
> You need to look at the code :-(
>
> 2015-10-07 15:19 GMT+02:00 Flavio Pompermaier :
>
>> And what is the split policy for the FileInputFormat?it depends on
>> the fs block size?
>> Is there a pointer to the several flink input formats and a
>> description of their internals?
>>
>> On Wed, Oct 7, 2015 at 3:09 PM, Fabian Hueske 
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> it is not possible to split by line count because that would mean to
>>> read and parse the file just for splitting.
>>>
>>> Parallel processing of data sources depends on the input splits
>>> created by the InputFormat. Local files can be split just like files in
>>> HDFS. Usually, each file corresponds to at least one split but multiple
>>> files could also be put into a single split if necessary.The logic for 
>>> that
>>> would go into to the InputFormat.createInputSplits() method.
>>>
>>> Cheers, Fabian
>>>
>>> 2015-10-07 14:47 GMT+02:00 Flavio Pompermaier 
>>> :
>>>
 Hi to all,

 is there a way to split a single local file by line count (e.g. a
 split every 100 lines) in a LocalEnvironment to speed up a simple map
 function? For me it is not very clear how the local files (files into
 directory if recursive=true) are managed by Flink..is there any ref to 
 this
 internals?

 Best,
 Flavio

>>>
>>>
>>
>>
>

>>>
>>


Re: Parallel file read in LocalEnvironment

2015-11-18 Thread Flavio Pompermaier
in my test I was using the local fs (ext4)
On 18 Nov 2015 19:17, "Stephan Ewen"  wrote:

> The JobManager does not read all files, but is has to query the HDFS for
> all file metadata (size, blocks, block locations), which can take a bit.
> There is a separate call to the HDFS Namenode for each file. The more
> files, the more metadata has to be collected.
>
>
> On Wed, Nov 18, 2015 at 7:15 PM, Flavio Pompermaier 
> wrote:
>
>> So why it takes so much to start the job?because in any case the job
>> manager has to read all the lines of the input files before generating the
>> splits?
>> On 18 Nov 2015 17:52, "Stephan Ewen"  wrote:
>>
>>> Late answer, sorry:
>>>
>>> The splits are created in the JobManager, so the sub submission should
>>> not be affected by that.
>>>
>>> The assignment of splits to workers is very fast, so many splits with
>>> small data is not very different from few splits with large data.
>>>
>>> Lines are never materialized and the operators do not work differently
>>> based on different numbers of splits.
>>>
>>> On Wed, Oct 7, 2015 at 4:26 PM, Flavio Pompermaier >> > wrote:
>>>
 I've tried to split my huge file by lines count (using the bash command
 split -l) in 2 different ways:

1. small lines count (huge number of small files)
2. big lines count (small number of big files)

 I can't understand why the time required to effectively start the job
 is more or less the same

- in 1. it takes a lot to fetch the file list (~50.000) and the
split assigner is fast to assign the splits (but also being fast they 
 are a
lot)
- in 2. Flink is fast in fetch the file list but it's extremely
slow to generate the splits to assign

 Initially I was thinking that Flink was eagerly materializing the lines
 somewhere but both the memory and the disks doesn't increase.
 What is going on underneath? Is it normal?

 Thanks in advance,
 Flavio



 On Wed, Oct 7, 2015 at 3:27 PM, Stephan Ewen  wrote:

> The split functionality is in the FileInputFormat and the
> functionality that takes care of lines across splits is in the
> DelimitedIntputFormat.
>
> On Wed, Oct 7, 2015 at 3:24 PM, Fabian Hueske 
> wrote:
>
>> I'm sorry there is no such documentation.
>> You need to look at the code :-(
>>
>> 2015-10-07 15:19 GMT+02:00 Flavio Pompermaier :
>>
>>> And what is the split policy for the FileInputFormat?it depends on
>>> the fs block size?
>>> Is there a pointer to the several flink input formats and a
>>> description of their internals?
>>>
>>> On Wed, Oct 7, 2015 at 3:09 PM, Fabian Hueske 
>>> wrote:
>>>
 Hi Flavio,

 it is not possible to split by line count because that would mean
 to read and parse the file just for splitting.

 Parallel processing of data sources depends on the input splits
 created by the InputFormat. Local files can be split just like files in
 HDFS. Usually, each file corresponds to at least one split but multiple
 files could also be put into a single split if necessary.The logic for 
 that
 would go into to the InputFormat.createInputSplits() method.

 Cheers, Fabian

 2015-10-07 14:47 GMT+02:00 Flavio Pompermaier >>> >:

> Hi to all,
>
> is there a way to split a single local file by line count (e.g. a
> split every 100 lines) in a LocalEnvironment to speed up a simple map
> function? For me it is not very clear how the local files (files into
> directory if recursive=true) are managed by Flink..is there any ref 
> to this
> internals?
>
> Best,
> Flavio
>


>>>
>>>
>>
>

>>>
>


Re: Published test artifacts for flink streaming

2015-11-18 Thread Nick Dimiduk
Please see the above gist: my test makes no assertions until after the
env.execute() call. Adding setParallelism(1) to my sink appears to
stabilize my test. Indeed, very helpful. Thanks a lot!

-n

On Wed, Nov 18, 2015 at 9:15 AM, Stephan Ewen  wrote:

> Okay, I think I misunderstood your problem.
>
> Usually you can simply execute tests one after another by waiting until
> "env.execute()" returns. The streaming jobs terminate by themselves once
> the sources reach end of stream (finite streams are supported that way) but
> make sure all records flow through the entire stream (no barrier needed).
> So if you use a source that reaches an end, no extra work is needed.
>
> If you have a proper infinite source (like Kafka), things are a bit more
> tricky, since you have a proper infinite streaming program. What we do in
> our Kafka Tests is throw a "SuccessException" in the sink once we saw all
> data we expected. You can get the cause exceptions in a try/catch around
> env.execute() to check if the program "failed" with a SuccessException, or
> whether it failed proper.
>
> A "sink in DOP 1" (sorry for the unclear terminology) is a sink with
> parallelism 1, so all data is collected by the same function instance.
>
> Any of this helpful?
>
> Stephan
>
>
> On Wed, Nov 18, 2015 at 5:13 PM, Nick Dimiduk  wrote:
>
>> Sorry Stephan but I don't follow how global order applies in my case. I'm
>> merely checking the size of the sink results. I assume all tuples from a
>> given test invitation have sunk before the next test begins, which is
>> clearly not the case. Is there a way I can place a barrier in my tests to
>> ensure one streaming DAG runs at a time, and that all buffers have been
>> flushed to the sink before the next test begins?
>>
>> What is "sink in DOP 1"?
>>
>> Thanks,
>> Nick
>>
>>
>> On Wednesday, November 18, 2015, Stephan Ewen  wrote:
>>
>>> There is no global order in parallel streams, it is something that
>>> applications need to work with. We are thinking about adding operations to
>>> introduce event-time order (at the cost of some delay), but that is only
>>> plans at this point.
>>>
>>>
>>> What I do in my tests is run the test streams in parallel, but the Sink
>>> in DOP 1. The sink gathers the elements in a list, and the close() function
>>> validates the result.
>>>
>>> Validating the results may involve sorting the list where elements where
>>> gathered (make the order deterministic) or use a hash set if it is only
>>> about distinct count.
>>>
>>> Hope that helps.
>>>
>>> On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk 
>>> wrote:
>>>
 On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk 
 wrote:

> Thanks Stephan, I'll check that out in the morning. Generally
> speaking, it would be great to have some single-jvm example tests for 
> those
> of us getting started. Following the example of WindowingIntegrationTest 
> is
> mostly working, though reusing my single sink instance with its static
> collection results in non-deterministic results; there appears to be a 
> race
> between instances clearing the collection in their open method and the
> runtime returning the collection to my test harness.


 This inconsistent test result is pretty frustrating. I've created a
 sample project with an IT that demonstrates the issue. Run `mvn test`
 multiple times and see that sometimes it passes and sometimes it fails.
 Maybe someone has some thoughts?

 https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6

 Thanks,
 Nick

 I'd also appreciate some guidance on stream composition. It's nice to
> use the fluent API when exploring data in a shell, but it seems to me like
> that API is cumbersome when composing data pipelines of reusable partials.
> Or maybe I'm doing it all wrong... Hence the request for more examples :)
>
> While I'm asking, how might you model this: I have a set of predicates
> I'd like to flatMap over a stream. An input item should be compared vs
> every predicate (basically, I want a Clojure juxt of predicates over each
> stream element). Imagine those predicates expressed as where clauses via
> the Table API. Say I have hundreds of thousands of these predicates to run
> over every stream event. Is the java client API rich enough to express 
> such
> a flow, or should I examine something lower than DataStream?
>
> Thanks a lot, and sorry for all the newb questions.
> -n
>
>
> On Thursday, November 5, 2015, Stephan Ewen  wrote:
>
>> Hey!
>>
>> There is also a collect() sink in the "flink-streaming-contrib"
>> project, see here:
>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>>
>> It should work well locally for testing. In that case you can write a
>> program as usual an use "Da

Re: FlinkKafkaConsumer and multiple topics

2015-11-18 Thread Stephan Ewen
The new KafkaConsumer fro Kafka 0.9 should be able to handle this, as the
Kafka Client Code itself has support for this then.

For 0.8.x, we would need to implement support for recovery inside the
consumer ourselves, which is why we decided to initially let the Job
Recovery take care of that.
If that becomes much of an issue, we can look into this again...

On Thu, Sep 24, 2015 at 10:46 PM, Jakob Ericsson 
wrote:

> What I actually meant was partition reassignment (
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
> ).
> No topics were deleted.
> We added a bunch of new servers and needed to reassign some partitions to
> spread the load.
>
> No, I haven't set the setNumberOfExecutionRetries().
>
> On Thu, Sep 24, 2015 at 10:06 PM, Robert Metzger 
> wrote:
>
>> Hi Jakob,
>>
>> what do you exactly mean by rebalance of topics? Did the leader of the
>> partitions change?
>> Were topics deleted?
>>
>> Flink's KafkaConsumer does not try to recover from these exceptions. We
>> rely on Flink's fault tolerance mechanisms to restart the data consumption
>> (from the last valid offset).
>> Do you have set the setNumberOfExecutionRetries() on the ExecutionConfig?
>>
>>
>> On Thu, Sep 24, 2015 at 9:57 PM, Jakob Ericsson > > wrote:
>>
>>> We did some rebalance of topics in our Kafka cluster today. I had a
>>> flink job running and it crashed when some of the partitions were moved,
>>> other consumers (non flink) continued to work.
>>>
>>> Should I configure it differently or could this be a bug?
>>>
>>> 09/24/2015 15:34:31 Source: Custom Source(3/4) switched to FAILED
>>> java.lang.Exception: Error while fetching from broker:
>>> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at
>>> java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>> at java.lang.Class.newInstance(Class.java:442)
>>> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>>> at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.IOException: Error while fetching from broker:
>>> Exception for partition 6: kafka.common.UnknownTopicOrPartitionException
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at
>>> java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>> at java.lang.Class.newInstance(Class.java:442)
>>> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
>>> at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:405)
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:421)
>>>
>>>
>>> On Fri, Sep 18, 2015 at 2:30 PM, Robert Metzger 
>>> wrote:
>>>
 Hi,

 did you manually add a Kafka dependency into your project? Maybe you
 are overwriting the Kafka version to a lower version?

 I'm sorry that our consumer is crashing when its supposed to read an
 invalid topic .. but In general, thats a good behavior ;)

 Maybe you can check whether the topic exists from your user code?
 The getPartitionsForTopic() method is actually a public static method that
 you can call.
 If its throwing an exception, the topic doesn't exist anymore.


 Robert

 On Fri, Sep 18, 2015 at 2:21 PM, Jakob Ericsson <
 jakob.erics...@gmail.com> wrote:

> Hit another problem

Re: Parallel file read in LocalEnvironment

2015-11-18 Thread Stephan Ewen
Okay, let me take a step back and make sure I understand this right...

With many small files it takes longer to start the job, correct? How much
time did it actually take and how many files did you have?


On Wed, Nov 18, 2015 at 7:18 PM, Flavio Pompermaier 
wrote:

> in my test I was using the local fs (ext4)
> On 18 Nov 2015 19:17, "Stephan Ewen"  wrote:
>
>> The JobManager does not read all files, but is has to query the HDFS for
>> all file metadata (size, blocks, block locations), which can take a bit.
>> There is a separate call to the HDFS Namenode for each file. The more
>> files, the more metadata has to be collected.
>>
>>
>> On Wed, Nov 18, 2015 at 7:15 PM, Flavio Pompermaier > > wrote:
>>
>>> So why it takes so much to start the job?because in any case the job
>>> manager has to read all the lines of the input files before generating the
>>> splits?
>>> On 18 Nov 2015 17:52, "Stephan Ewen"  wrote:
>>>
 Late answer, sorry:

 The splits are created in the JobManager, so the sub submission should
 not be affected by that.

 The assignment of splits to workers is very fast, so many splits with
 small data is not very different from few splits with large data.

 Lines are never materialized and the operators do not work differently
 based on different numbers of splits.

 On Wed, Oct 7, 2015 at 4:26 PM, Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> I've tried to split my huge file by lines count (using the bash
> command split -l) in 2 different ways:
>
>1. small lines count (huge number of small files)
>2. big lines count (small number of big files)
>
> I can't understand why the time required to effectively start the job
> is more or less the same
>
>- in 1. it takes a lot to fetch the file list (~50.000) and the
>split assigner is fast to assign the splits (but also being fast they 
> are a
>lot)
>- in 2. Flink is fast in fetch the file list but it's extremely
>slow to generate the splits to assign
>
> Initially I was thinking that Flink was eagerly materializing the
> lines somewhere but both the memory and the disks doesn't increase.
> What is going on underneath? Is it normal?
>
> Thanks in advance,
> Flavio
>
>
>
> On Wed, Oct 7, 2015 at 3:27 PM, Stephan Ewen  wrote:
>
>> The split functionality is in the FileInputFormat and the
>> functionality that takes care of lines across splits is in the
>> DelimitedIntputFormat.
>>
>> On Wed, Oct 7, 2015 at 3:24 PM, Fabian Hueske 
>> wrote:
>>
>>> I'm sorry there is no such documentation.
>>> You need to look at the code :-(
>>>
>>> 2015-10-07 15:19 GMT+02:00 Flavio Pompermaier 
>>> :
>>>
 And what is the split policy for the FileInputFormat?it depends on
 the fs block size?
 Is there a pointer to the several flink input formats and a
 description of their internals?

 On Wed, Oct 7, 2015 at 3:09 PM, Fabian Hueske 
 wrote:

> Hi Flavio,
>
> it is not possible to split by line count because that would mean
> to read and parse the file just for splitting.
>
> Parallel processing of data sources depends on the input splits
> created by the InputFormat. Local files can be split just like files 
> in
> HDFS. Usually, each file corresponds to at least one split but 
> multiple
> files could also be put into a single split if necessary.The logic 
> for that
> would go into to the InputFormat.createInputSplits() method.
>
> Cheers, Fabian
>
> 2015-10-07 14:47 GMT+02:00 Flavio Pompermaier <
> pomperma...@okkam.it>:
>
>> Hi to all,
>>
>> is there a way to split a single local file by line count (e.g. a
>> split every 100 lines) in a LocalEnvironment to speed up a simple map
>> function? For me it is not very clear how the local files (files into
>> directory if recursive=true) are managed by Flink..is there any ref 
>> to this
>> internals?
>>
>> Best,
>> Flavio
>>
>
>


>>>
>>
>

>>


Re: Parallel file read in LocalEnvironment

2015-11-18 Thread Flavio Pompermaier
it was long ago..but if I remember correctly they were about 50k
On 18 Nov 2015 19:22, "Stephan Ewen"  wrote:

> Okay, let me take a step back and make sure I understand this right...
>
> With many small files it takes longer to start the job, correct? How much
> time did it actually take and how many files did you have?
>
>
> On Wed, Nov 18, 2015 at 7:18 PM, Flavio Pompermaier 
> wrote:
>
>> in my test I was using the local fs (ext4)
>> On 18 Nov 2015 19:17, "Stephan Ewen"  wrote:
>>
>>> The JobManager does not read all files, but is has to query the HDFS for
>>> all file metadata (size, blocks, block locations), which can take a bit.
>>> There is a separate call to the HDFS Namenode for each file. The more
>>> files, the more metadata has to be collected.
>>>
>>>
>>> On Wed, Nov 18, 2015 at 7:15 PM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 So why it takes so much to start the job?because in any case the job
 manager has to read all the lines of the input files before generating the
 splits?
 On 18 Nov 2015 17:52, "Stephan Ewen"  wrote:

> Late answer, sorry:
>
> The splits are created in the JobManager, so the sub submission should
> not be affected by that.
>
> The assignment of splits to workers is very fast, so many splits with
> small data is not very different from few splits with large data.
>
> Lines are never materialized and the operators do not work differently
> based on different numbers of splits.
>
> On Wed, Oct 7, 2015 at 4:26 PM, Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> I've tried to split my huge file by lines count (using the bash
>> command split -l) in 2 different ways:
>>
>>1. small lines count (huge number of small files)
>>2. big lines count (small number of big files)
>>
>> I can't understand why the time required to effectively start the job
>> is more or less the same
>>
>>- in 1. it takes a lot to fetch the file list (~50.000) and the
>>split assigner is fast to assign the splits (but also being fast they 
>> are a
>>lot)
>>- in 2. Flink is fast in fetch the file list but it's extremely
>>slow to generate the splits to assign
>>
>> Initially I was thinking that Flink was eagerly materializing the
>> lines somewhere but both the memory and the disks doesn't increase.
>> What is going on underneath? Is it normal?
>>
>> Thanks in advance,
>> Flavio
>>
>>
>>
>> On Wed, Oct 7, 2015 at 3:27 PM, Stephan Ewen 
>> wrote:
>>
>>> The split functionality is in the FileInputFormat and the
>>> functionality that takes care of lines across splits is in the
>>> DelimitedIntputFormat.
>>>
>>> On Wed, Oct 7, 2015 at 3:24 PM, Fabian Hueske 
>>> wrote:
>>>
 I'm sorry there is no such documentation.
 You need to look at the code :-(

 2015-10-07 15:19 GMT+02:00 Flavio Pompermaier >>> >:

> And what is the split policy for the FileInputFormat?it depends on
> the fs block size?
> Is there a pointer to the several flink input formats and a
> description of their internals?
>
> On Wed, Oct 7, 2015 at 3:09 PM, Fabian Hueske 
> wrote:
>
>> Hi Flavio,
>>
>> it is not possible to split by line count because that would mean
>> to read and parse the file just for splitting.
>>
>> Parallel processing of data sources depends on the input splits
>> created by the InputFormat. Local files can be split just like files 
>> in
>> HDFS. Usually, each file corresponds to at least one split but 
>> multiple
>> files could also be put into a single split if necessary.The logic 
>> for that
>> would go into to the InputFormat.createInputSplits() method.
>>
>> Cheers, Fabian
>>
>> 2015-10-07 14:47 GMT+02:00 Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> Hi to all,
>>>
>>> is there a way to split a single local file by line count (e.g.
>>> a split every 100 lines) in a LocalEnvironment to speed up a simple 
>>> map
>>> function? For me it is not very clear how the local files (files 
>>> into
>>> directory if recursive=true) are managed by Flink..is there any ref 
>>> to this
>>> internals?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>
>

>>>
>>
>
>>>
>


Re: Fold vs Reduce in DataStream API

2015-11-18 Thread Kostas Tzoumas
Granted, both are presented with the same example in the docs. They are
modeled after reduce and fold in functional programming. Perhaps we should
have a bit more enlightening examples.

On Wed, Nov 18, 2015 at 6:39 PM, Fabian Hueske  wrote:

> Hi Ron,
>
> Have you checked:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#transformations
> ?
>
> Fold is like reduce, except that you define a start element (of a
> different type than the input type) and the result type is the type of the
> initial value. In reduce, the result type must be identical to the input
> type.
>
> Best, Fabian
>
> 2015-11-18 18:32 GMT+01:00 Ron Crocker :
>
>> Is there a succinct description of the distinction between these
>> transforms?
>>
>> Ron
>> —
>> Ron Crocker
>> Principal Engineer & Architect
>> ( ( •)) New Relic
>> rcroc...@newrelic.com
>> M: +1 630 363 8835
>>
>>
>


Compiler Exception

2015-11-18 Thread Truong Duc Kien
Hi,

I'm hitting Compiler Exception with some of my data set, but not all of
them.

Exception in thread "main" org.apache.flink.optimizer.CompilerException: No
plan meeting the requirements could be created @ Bulk Iteration (Bulk
Iteration) (1:null). Most likely reason: Too restrictive plan hints.

Can I have some hints on how to troubleshoot this ?

Thanks,
Kien Truong


Re: Flink execution time benchmark.

2015-11-18 Thread Saleh
Hi rmetzger0,

Thanx for the response. I didn't know that I had to register before I could
receive responses for my posts. 
Now I am registered. But the problem is not resolved yet. I know it might
not be intuitive to get execution time from a long running streaming job but
it is possible to get total execution time after let say I stop the program
from running. Can I programatically compute this information? Or can I
retrieve it from Flink web UI?

cheers.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-execution-time-benchmark-tp3258p3573.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Specially introduced Flink to chinese users in CNCC(China National Computer Congress)

2015-11-18 Thread Liang Chen
Two aspects are attracting them:
1.Flink is using java, it is easy for most of them to start Flink, and be
more easy to maintain in comparison to Storm(as Clojure is difficult to
maintain, and less people know it.)
2.Users really want an unified system supporting streaming and batch
processing.




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Specially-introduced-Flink-to-chinese-users-in-CNCC-China-National-Computer-Congress-tp3254p3574.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Specially introduced Flink to chinese users in CNCC(China National Computer Congress)

2015-11-18 Thread Welly Tambunan
agree,

and Stateful Streaming operator instance in Flink is looks natural compare
to Apache Spark.

On Thu, Nov 19, 2015 at 10:54 AM, Liang Chen 
wrote:

> Two aspects are attracting them:
> 1.Flink is using java, it is easy for most of them to start Flink, and be
> more easy to maintain in comparison to Storm(as Clojure is difficult to
> maintain, and less people know it.)
> 2.Users really want an unified system supporting streaming and batch
> processing.
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Specially-introduced-Flink-to-chinese-users-in-CNCC-China-National-Computer-Congress-tp3254p3574.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: YARN High Availability

2015-11-18 Thread Till Rohrmann
Hi Gwenhaël,

good to hear that you could resolve the problem.

When you run multiple HA flink jobs in the same cluster, then you don’t
have to adjust the configuration of Flink. It should work out of the box.

However, if you run multiple HA Flink cluster, then you have to set for
each cluster a distinct ZooKeeper root path via the option
recovery.zookeeper.path.root in the Flink configuraiton. This is necessary
because otherwise all JobManagers (the ones of the different clusters) will
compete for a single leadership. Furthermore, all TaskManagers will only
see the one and only leader and connect to it. The reason is that the
TaskManagers will look up their leader at a ZNode below the ZooKeeper root
path.

If you have other questions then don’t hesitate asking me.

Cheers,
Till
​

On Wed, Nov 18, 2015 at 6:37 PM, Gwenhael Pasquiers <
gwenhael.pasqui...@ericsson.com> wrote:

> Nevermind,
>
>
>
> Looking at the logs I saw that it was having issues trying to connect to
> ZK.
>
> To make I short is had the wrong port.
>
>
>
> It is now starting.
>
>
>
> Tomorrow I’ll try to kill some JobManagers *evil*.
>
>
>
> Another question : if I have multiple HA flink jobs, are there some points
> to check in order to be sure that they won’t collide on hdfs or ZK ?
>
>
>
> B.R.
>
>
>
> Gwenhaël PASQUIERS
>
>
>
> *From:* Till Rohrmann [mailto:till.rohrm...@gmail.com]
> *Sent:* mercredi 18 novembre 2015 18:01
> *To:* user@flink.apache.org
> *Subject:* Re: YARN High Availability
>
>
>
> Hi Gwenhaël,
>
>
>
> do you have access to the yarn logs?
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Nov 18, 2015 at 5:55 PM, Gwenhael Pasquiers <
> gwenhael.pasqui...@ericsson.com> wrote:
>
> Hello,
>
>
>
> We’re trying to set up high availability using an existing zookeeper
> quorum already running in our Cloudera cluster.
>
>
>
> So, as per the doc we’ve changed the max attempt in yarn’s config as well
> as the flink.yaml.
>
>
>
> recovery.mode: zookeeper
>
> recovery.zookeeper.quorum: host1:3181,host2:3181,host3:3181
>
> state.backend: filesystem
>
> state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
>
> recovery.zookeeper.storageDir: hdfs:///flink/recovery/
>
> yarn.application-attempts: 1000
>
>
>
> Everything is ok as long as recovery.mode is commented.
>
> As soon as I uncomment recovery.mode the deployment on yarn is stuck on :
>
>
>
> “Deploying cluster, current state ACCEPTED”.
>
> “Deployment took more than 60 seconds….”
>
> Every second.
>
>
>
> And I have more than enough resources available on my yarn cluster.
>
>
>
> Do you have any idea of what could cause this, and/or what logs I should
> look for in order to understand ?
>
>
>
> B.R.
>
>
>
> Gwenhaël PASQUIERS
>
>
>