Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

2016-09-02 Thread Stefan Richter
Hi,

unfortunately, the log does not contain the required information for this case. 
It seems like a sender to the SortMerger failed. The best way to find this 
problem is to take a look to the exceptions that are reported in the web 
front-end for the failing job. Could you check if you find any reported 
exceptions there and provide them to us?

Best,
Stefan

> Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <74...@studenti.unimore.it>:
> 
> Sure. Here  
> you can find the complete logs file.
> Still can not run through the issue. Thank you for your help.
> 
> 2016-08-31 18:15 GMT+02:00 Flavio Pompermaier  >:
> I don't know whether my usual error is related to this one but is very 
> similar and it happens randomly...I still have to figure out the root cause 
> of the error:
> 
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map at 
> main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: -2
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: -2
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>   at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>   at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>   ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: -2
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>   at java.util.ArrayList.elementData(ArrayList.java:418)
>   at java.util.ArrayList.get(ArrayList.java:431)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:219)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:245)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:255)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>   at 
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> 
> 
> On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter  > wrote:
> Hi,
> 
> could you provide the log outputs for your job (ideally with debug logging 
> enabled)?
> 
> Best,
> Stefan
> 
>> Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <74...@studenti.unimore.it 
>> >:
>> 
>> Hi everyone.
>> I'm running the FlinkML ALS matrix factorization and I bumped into the 
>> following exception:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The program 
>> execution failed: Job execution failed.
>>  at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>  at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>  at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>  at 
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>  at 
>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
>>  at 
>> org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTools.scala:94)
>>  at

Re: Firing windows multiple times

2016-09-02 Thread Aljoscha Krettek
I see, I didn't forget about this, it's just that I'm thinking hard.

I think in your case (which I imagine some other people to also have) we
would need an addition to the windowing system that the original Google
Dataflow paper called retractions. The problem is best explained with an
example. Say you have this program:

DataStream input = ...

DataStream firstAggregate = input
  .keyBy(...)
  .window(TumblingTimeWindow(1 Day))

.trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

DataStream secondAggregate = firstAggregate
  .keyBy(...)
  .window(TumblingTimeWindow(5 Days)

.trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

The problem here is that the second windowing operation sees all the
incremental early-firing updates from the first window operation, it would
thus over count. This problem could be overcome by introducing meta data in
the windowing system and filtering out those results that indicate that
they come from an early (speculative) firing. A second problem is that of
late firings, i.e. if you have a window specification like this:

DataStream firstAggregate = input
  .keyBy(...)
  .window(TumblingTimeWindow(1 Day))
  .allowedLateness(1 Hour)
  .trigger(
EventTime.afterEndOfWindow()

 
.withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30

 
.withLateTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

where you also have late firing data after you got the primary firing when
the watermark passed the end of the window. That's were retractions come
into play, before sending data downstream form a late firing the window
operator has to send the inverse of the previous firing so that the
downstream operation can "subtract" that from the current aggregate and
replace it with the newly updated aggregate. This is a somewhat thorny
problem, though, and to the best of my knowledge Google never implemented
this in the publicly available Dataflow SDK or what is now Beam.

The reason why I'm thinking in this direction and not in the direction of
keeping track of the watermark and manually evicting elements as you go is
that I think that this approach would be more memory efficient and easier
to understand. I don't understand yet how a single window computation could
keep track of aggregates for differently sized time windows and evict the
correct elements without keeping all the elements in some store. Maybe you
could shed some light on this? I'd be happy if there was a simple solution
for this. :-)

Cheers,
Aljoscha



On Tue, 30 Aug 2016 at 23:49 Shannon Carey  wrote:

> I appreciate your suggestion!
>
> However, the main problem with your approach is the amount of time that
> goes by without an updated value from minuteAggregate and hourlyAggregate
> (lack of a continuously updated aggregate).
>
> For example, if we use a tumbling window of 1 month duration, then we only
> get an update for that value once a month! The values from that stream will
> be on average 0.5 months stale. A year-long window is even worse.
>
> -Shannon
>
> From: Aljoscha Krettek 
> Date: Tuesday, August 30, 2016 at 9:08 AM
> To: Shannon Carey , "user@flink.apache.org" <
> user@flink.apache.org>
>
> Subject: Re: Firing windows multiple times
>
> Hi,
> I think this can be neatly expressed by using something like a tree of
> windowed aggregations, i.e. you specify your smallest window computation
> first and then specify larger window computations based smaller windows.
> I've written an example that showcases this approach:
> https://gist.github.com/aljoscha/728ac69361f75c3ca87053b1a6f91fcd
>
> The basic idea in pseudo code is this:
>
> DataStream input = ...
> dailyAggregate = input.keyBy(...).window(Time.days(1)).reduce(new Sum())
> weeklyAggregate =
> dailyAggregate.keyBy(...).window(Time.days(7)).reduce(new Sum())
> monthlyAggregate = weeklyAggregate(...).window(Time.days(30)).reduce(new
> Sum())
>
> the benefit of this approach is that you don't duplicate computation and
> that you can have incremental aggregation using a reduce function. When
> manually keeping elements and evicting them based on time the amount of
> state that would have to be kept would be much larger.
>
> Does that make sense and would it help your use case?
>
> Cheers,
> Aljoscha
>
> On Mon, 29 Aug 2016 at 23:18 Shannon Carey  wrote:
>
>> Yes, let me describe an example use-case that I'm trying to implement
>> efficiently within Flink.
>>
>> We've been asked to aggregate per-user data on a daily level, and from
>> there produce aggregates on a variety of time frames. For example, 7 days,
>> 30 days, 180 days, and 365 days.
>>
>> We can talk about the hardest one, the 365 day window, with the knowledge
>> that adding the other time window

Storing JPMML-Model Object as a Variable Closure?

2016-09-02 Thread Bauss, Julian
Hello Everybody,

I’m currently refactoring some code and am looking for a better alternative to 
handle
JPMML-Models in data streams. At the moment the flink job I’m working on 
references a model-object
as a Singleton which I want to change because static references tend to cause 
problems in distributed systems.

I thought about handing the model-object to the function that uses it as a 
variable closure. The object
can be between 16MB and 250MB in size (depending on the depth of the decision 
tree).

According to 
https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables
 that’s way too large though.
Are there any viable alternatives or would this be the „right way“ to handle 
this situation?

Best Regards,

Julian


**

bonprix Handelsgesellschaft mbH
Sitz der Gesellschaft: Hamburg

Geschäftsführung:
Dr. Marcus Ackermann (Vorsitzender)
Dr. Kai Heck
Rien Jansen
Markus Fuchshofen
Beiratsvorsitzender: Alexander Birken

Handelsregister AG Hamburg HR B 36 455

Adresse:

bonprix Handelsgesellschaft mbH

Haldesdorfer Str. 61
22179 Hamburg

Diese E-Mail enthält vertrauliche und/oder rechtlich geschützte Informationen.
Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten 
haben,
informieren Sie bitte sofort den Absender und vernichten Sie diese Mail.
Das unerlaubte Kopieren sowie die unbefugte Weitergabe dieser E-Mail ist nicht 
gestattet.

This e-mail may contain confidential and/or privileged information.
If you are not the intended recipient (or have received the e-mail in error)
please notify the sender immediately and delete this e-mail. Any unauthorized 
copying,
disclosure or distribution of the material in this e-mail is strictly forbidden.

**



Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

2016-09-02 Thread ANDREA SPINA
Hi Stefan,
Thank you so much for the answer. Ok, I'll do it asap.
For the sake of argument, could the issue be related to the low number of
blocks? I noticed the Flink implementation, as default, set the number of
blocks to the input count (which is actually a lot). So with a low
cardinality and big sized blocks, maybe they don't fit somewhere...
Thank you again.

Andrea

2016-09-02 10:51 GMT+02:00 Stefan Richter :

> Hi,
>
> unfortunately, the log does not contain the required information for this
> case. It seems like a sender to the SortMerger failed. The best way to find
> this problem is to take a look to the exceptions that are reported in the
> web front-end for the failing job. Could you check if you find any reported
> exceptions there and provide them to us?
>
> Best,
> Stefan
>
> Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <74...@studenti.unimore.it>:
>
> Sure. Here 
> you can find the complete logs file.
> Still can not run through the issue. Thank you for your help.
>
> 2016-08-31 18:15 GMT+02:00 Flavio Pompermaier :
>
>> I don't know whether my usual error is related to this one but is very
>> similar and it happens randomly...I still have to figure out the root cause
>> of the error:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map
>> at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted
>> input: Thread 'SortMerger spilling thread' terminated due to an exception:
>> -2
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception: -2
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1079)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:94)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: -2
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadO
>> bject(MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:135)
>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.deserialize(KryoSerializer.java:219)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.deserialize(KryoSerializer.java:245)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.copy(KryoSerializer.java:255)
>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.
>> copy(PojoSerializer.java:556)
>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB
>> ase.copy(TupleSerializerBase.java:75)
>> at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.
>> writeToOutput(NormalizedKeySorter.java:499)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $SpillingThread.go(UnilateralSortMerger.java:1344)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>
>>
>> On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> could you provide the log outputs for your job (ideally with debug
>>> logging enabled)?
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <74...@studenti.unimore.it>:
>>>
>>> Hi everyone.
>>> I'm running the FlinkML ALS matrix factorization and I bumped into the
>>> following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>> at org.apache.flink.client.program.Co

checkpoints not removed on hdfs.

2016-09-02 Thread Dong-iL, Kim
Hi,

I’m using HDFS as state backend.
The checkpoints folder grows bigger every moments.
What shall I do?

Regards.

Apache Flink: How does it handle the backpressure?

2016-09-02 Thread jiecxy
For an operator, the input stream is faster than its output stream, so its
input buffer will block the previous operator's output thread that transfers
the data to this operator. Right?

Do the Flink and the Spark both handle the backpressure by blocking the
thread? So what's the difference between them?

For the data source, it is continuously producing the data, what if its
output thread is blocked? Would the buffer overflow?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-How-does-it-handle-the-backpressure-tp8866.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Remote upload and execute

2016-09-02 Thread Paul Wilson
Hi,

I'd like to write a client that can execute an already 'uploaded' JAR (i.e.
the JAR is deployed and available by some other external process). This is
similar to what the web console allows which consists of 2 steps: upload
the JAR followed by a submit with parameters.

I'm looking at the Flink client however ClusterClient appears to require
a PackagedProgram or local access to the required JAR. However I do not
want to have to re-upload the JAR each time (I don't even want the client
to have access to the JAR).

Is there some way to specify that the JAR is available on some filesystem
(s3) location. have that cached in Flink more locally and then trigger a
parameterised execution of that from a client?

Regards,
Paul


[DISCUSS] Storm 1.x.x support in the compatibility layer

2016-09-02 Thread Maximilian Michels
This should be of concern mostly to the users of the Storm compatibility layer:

We just received a pull request [1] for updating the Storm
compatibility layer to support Storm versions >= 1.0.0. This is a
major change because all Storm imports have changed their namespace
due to package renaming.

If we merged this pull request we would have two modules, one for
Storm < 1.0.0 and one for Storm >= 1.0.0. This would require a lot of
duplicate work for the Flink community.

Thus, I would suggest to remove the Storm compatibility for < 1.0.0
and focus on supporting 1.x.x versions of Storm.

What do you think? In particular, how do Storm users think about this?

Cheers,
Max

[1] https://github.com/apache/flink/pull/2452


Re: Storing JPMML-Model Object as a Variable Closure?

2016-09-02 Thread Stephan Ewen
How about using a source and broadcast variable?

You could write the model to the storage (DFS), the read it with a source
and use a broadcast variable to send it to all tasks.
A single record can be very large, so it should work even if your model is
quite big.

Does that sound feasible?

In future versions of flink, you may be able to skip the "write to DFS"
step and simply have the model in a collection source (when large RPC
messages are supported).

Best,
Stephan



On Fri, Sep 2, 2016 at 11:20 AM, Bauss, Julian 
wrote:

> Hello Everybody,
>
>
>
> I’m currently refactoring some code and am looking for a better
> alternative to handle
>
> JPMML-Models in data streams. At the moment the flink job I’m working on
> references a model-object
>
> as a Singleton which I want to change because static references tend to
> cause problems in distributed systems.
>
>
>
> I thought about handing the model-object to the function that uses it as a
> variable closure. The object
>
> can be between 16MB and 250MB in size (depending on the depth of the
> decision tree).
>
>
>
> According to https://cwiki.apache.org/confluence/display/FLINK/
> Variables+Closures+vs.+Broadcast+Variables that’s way too large though.
>
> Are there any viable alternatives or would this be the „right way“ to
> handle this situation?
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
> 
> **
>
> bonprix Handelsgesellschaft mbH
> Sitz der Gesellschaft: Hamburg
>
> Geschäftsführung:
> Dr. Marcus Ackermann (Vorsitzender)
> Dr. Kai Heck
> Rien Jansen
> Markus Fuchshofen
> Beiratsvorsitzender: Alexander Birken
>
> Handelsregister AG Hamburg HR B 36 455
>
> Adresse:
>
> bonprix Handelsgesellschaft mbH
>
> Haldesdorfer Str. 61
> 22179 Hamburg
>
> Diese E-Mail enthält vertrauliche und/oder rechtlich geschützte
> Informationen.
> Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich
> erhalten haben,
> informieren Sie bitte sofort den Absender und vernichten Sie diese Mail.
> Das unerlaubte Kopieren sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet.
>
> This e-mail may contain confidential and/or privileged information.
> If you are not the intended recipient (or have received the e-mail in
> error)
> please notify the sender immediately and delete this e-mail. Any
> unauthorized copying,
> disclosure or distribution of the material in this e-mail is strictly
> forbidden.
>
> 
> **
>
>


Re: Flink Iterations vs. While loop

2016-09-02 Thread Greg Hogan
Hi Dan,

Where are you reading the 200 GB "data" from? How much memory per node? If
the DataSet is read from a distributed filesystem and if with iterations
Flink must spill to disk then I wouldn't expect much difference. About how
many iterations are run in the 30 minutes? I don't know that this is
reported explicitly, but if your convergence function only has one input
record per iteration then the reported total is the iteration count.

One other thought, we should soon have support for object reuse with arrays
(FLINK-3695). This would be implemented as DoubleValueArray or
ValueArray rather than double[] but it would be interesting to
test for a change in performance.

Greg

On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes 
wrote:

> Hi,
>
> for my bachelor thesis I'm testing an implementation of L-BFGS algorithm
> with Flink Iterations against a version without Flink Iterations but a
> casual while loop instead. Both programs use the same Map and Reduce
> transformations in each iteration. It was expected, that the performance of
> the Flink Iterations would scale better with increasing size of the input
> data set. However, the measured results on an ibm-power-cluster are very
> similar for both versions, e.g. around 30 minutes for 200 GB data. The
> cluster has 8 nodes, was configured with 4 slots per node and I used a
> total parallelism of 32.
> In every Iteration of the while loop a new flink job is started and I
> thought, that also the data would be distributed over the network again in
> each iteration which should consume a significant and measurable amount of
> time. Is that thought wrong or what is the computional overhead of the
> flink iterations that is equalizing this disadvantage?
> I include the relevant part of both programs and also attach the generated
> execution plans.
> Thank you for any ideas as I could not find much about this issue in the
> flink docs.
>
> Best, Dan
>
> *Flink Iterations:*
>
> DataSet data = ...
>
> State state = initialState(m, initweights,0,new double[initweights.length]);
> DataSet statedataset = env.fromElements(state);
> //start of iteration sectionIterativeDataSet loop= 
> statedataset.iterate(niter);;
>
>
> DataSet statewithnewlossgradient = 
> data.map(difffunction).withBroadcastSet(loop, "state")
>   .reduce(accumulate)
>   .map(new NormLossGradient(datasize))
>   .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>   .map(new LBFGS());
>
>
> DataSet converged = statewithnewlossgradient.filter(
>new FilterFunction() {
>   @Override  public boolean filter(State value) throws Exception {
>  if(value.getIflag()[0] == 0){
> return false;
>  }
>  return true;
>   }
>}
> );
>
> DataSet finalstate = 
> loop.closeWith(statewithnewlossgradient,converged);
>
>
>
>
> *While loop: *
>
> DataSet data =...
> State state = initialState(m, initweights,0,new double[initweights.length]);
> int cnt=0;do{
>LBFGS lbfgs = new LBFGS();
>statedataset=data.map(difffunction).withBroadcastSet(statedataset, "state")
>   .reduce(accumulate)
>   .map(new NormLossGradient(datasize))
>   .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>   .map(lbfgs);
>cnt++;
> }while (cnt
>
>


Re: How to get latency info from benchmark

2016-09-02 Thread Eric Fukuda
Hi Robert,

I've been trying to build the "performance" project using various versions
of Flink, but failing. It seems that I need both KafkaZKStringSerializer
class and FlinkKafkaConsumer082 class to build the project, but none of the
branches has both of them. KafkaZKStringSerializer existed in 0.9.0-x
branches but deleted in 0.9.1-x branches, and FlinkKafkaConsumer082 goes
the other way, therefore they don't exist in a same branch. I'm guessing
you were using a snapshot somewhere between 0.9.0 and 0.9.1. Could you tell
me the SHA you were using?

Regards,
Eric


On Wed, Aug 24, 2016 at 4:57 PM, Robert Metzger  wrote:

> Hi,
>
> Version 0.10-SNAPSHOT is pretty old. The snapshot repository of Apache
> probably doesn't keep old artifacts around forever.
> Maybe you can migrate the tests to Flink 0.10.0, or maybe even to a higher
> version.
>
> Regards,
> Robert
>
> On Wed, Aug 24, 2016 at 10:32 PM, Eric Fukuda 
> wrote:
>
>> Hi Max, Robert,
>>
>> Thanks for the advice. I'm trying to build the "performance" project, but
>> failing with the following error. Is there a solution for this?
>>
>> [ERROR] Failed to execute goal on project streaming-state-demo: Could not
>> resolve dependencies for project com.dataartisans.flink:streami
>> ng-state-demo:jar:1.0-SNAPSHOT: Failure to find
>> org.apache.flink:flink-connector-kafka-083:jar:0.10-SNAPSHOT in
>> https://repository.apache.org/content/repositories/snapshots/ was cached
>> in the local repository, resolution will not be reattempted until the
>> update interval of apache.snapshots has elapsed or updates are forced ->
>> [Help 1]
>>
>>
>>
>>
>> On Wed, Aug 24, 2016 at 8:12 AM, Robert Metzger 
>> wrote:
>>
>>> Hi Eric,
>>>
>>> Max is right, the tool has been used for a different benchmark [1]. The
>>> throughput logger that should produce the right output is this one [2].
>>> Very recently, I've opened a pull request for adding metric-measuring
>>> support into the engine [3]. Maybe that's helpful for your experiments.
>>>
>>>
>>> [1] http://data-artisans.com/high-throughput-low-latency-and
>>> -exactly-once-stream-processing-with-apache-flink/
>>> [2] https://github.com/dataArtisans/performance/blob/master/
>>> flink-jobs/src/main/java/com/github/projectflink/streaming/T
>>> hroughput.java#L203
>>> [3] https://github.com/apache/flink/pull/2386
>>>
>>>
>>>
>>> On Wed, Aug 24, 2016 at 2:04 PM, Maximilian Michels 
>>> wrote:
>>>
 I believe the AnaylzeTool is for processing logs of a different
 benchmark.

 CC Jamie and Robert who worked on the benchmark.

 On Wed, Aug 24, 2016 at 3:25 AM, Eric Fukuda 
 wrote:
 > Hi,
 >
 > I'm trying to benchmark Flink without Kafka as mentioned in this post
 > (http://data-artisans.com/extending-the-yahoo-streaming-benchmark/).
 After
 > running flink.benchmark.state.AdvertisingTopologyFlinkState with
 > user.local.event.generator in localConf.yaml set to 1, I ran
 > flink.benchmark.utils.AnalyzeTool giving
 > flink-1.0.1/log/flink-[username]-jobmanager-0-[servername].log as a
 > command-line argument. I got the following output and it does not
 have the
 > information about the latency.
 >
 >
 > = Latency (0 reports ) =
 > = Throughput (1 reports ) =
 > == null (entries: 10150)===
 > Mean throughput 639078.5018497099
 > Exception in thread "main" java.lang.IndexOutOfBoundsException:
 toIndex = 2
 > at java.util.ArrayList.subListRangeCheck(ArrayList.java:962)
 > at java.util.ArrayList.subList(ArrayList.java:954)
 > at flink.benchmark.utils.AnalyzeT
 ool.main(AnalyzeTool.java:133)
 >
 >
 > Reading the code in AnalyzeTool.java, I found that it's looking for
 lines
 > that include "Latency" in the log file, but apparently it's not
 finding any.
 > I tried grepping the log file, and couldn't find any either. I have
 one
 > server that runs both JobManager and Task Manager and another server
 that
 > runs Redis, and they are connected through a network with each other.
 >
 > I think I have to do something to read the data stored in Redis before
 > running AnalyzeTool, but can't figure out what. Does anyone know how
 to get
 > the latency information?
 >
 > Thanks,
 > Eric

>>>
>>>
>>
>


Re: Flink Iterations vs. While loop

2016-09-02 Thread Dan Drewes

Hi Greg,

thanks for your response!

I just had a look and realized that it's just about 85 GB of data. Sorry 
about that wrong information.


It's read from a csv file on the master node's local file system. The 8 
nodes have more than 40 GB available memory each and since the data is 
equally distributed I assume there should be no need to spill anything 
on disk.


There are 9 iterations.

Is it possible that also with Flink Iterations the data is repeatedly 
distributed? Or the other way around: Might it be that flink "remembers" 
somehow that the data is already distributed even for the while loop?


-Dan



Am 02.09.2016 um 16:39 schrieb Greg Hogan:

Hi Dan,

Where are you reading the 200 GB "data" from? How much memory per 
node? If the DataSet is read from a distributed filesystem and if with 
iterations Flink must spill to disk then I wouldn't expect much 
difference. About how many iterations are run in the 30 minutes? I 
don't know that this is reported explicitly, but if your convergence 
function only has one input record per iteration then the reported 
total is the iteration count.


One other thought, we should soon have support for object reuse with 
arrays (FLINK-3695). This would be implemented as DoubleValueArray or 
ValueArray rather than double[] but it would be 
interesting to test for a change in performance.


Greg

On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes > wrote:


Hi,

for my bachelor thesis I'm testing an implementation of L-BFGS
algorithm with Flink Iterations against a version without Flink
Iterations but a casual while loop instead. Both programs use the
same Map and Reduce transformations in each iteration. It was
expected, that the performance of the Flink Iterations would scale
better with increasing size of the input data set. However, the
measured results on an ibm-power-cluster are very similar for both
versions, e.g. around 30 minutes for 200 GB data. The cluster has
8 nodes, was configured with 4 slots per node and I used a total
parallelism of 32.
In every Iteration of the while loop a new flink job is started
and I thought, that also the data would be distributed over the
network again in each iteration which should consume a significant
and measurable amount of time. Is that thought wrong or what is
the computional overhead of the flink iterations that is
equalizing this disadvantage?
I include the relevant part of both programs and also attach the
generated execution plans.
Thank you for any ideas as I could not find much about this issue
in the flink docs.

Best, Dan

*Flink Iterations:*

DataSet data = ...

State  state =initialState(m, initweights,0,new double[initweights.length]);
DataSet statedataset = env.fromElements(state);

//start of iteration section IterativeDataSet loop= 
statedataset.iterate(niter);;


DataSet statewithnewlossgradient = 
data.map(difffunction).withBroadcastSet(loop,"state")
   .reduce(accumulate)
   .map(new NormLossGradient(datasize))
   .map(new SetLossGradient()).withBroadcastSet(loop,"state")
   .map(new LBFGS());


DataSet converged = statewithnewlossgradient.filter(
new FilterFunction() {
   @Override public boolean filter(State  value)throws Exception {
  if(value.getIflag()[0] ==0){
 return false;
  }
  return true;
   }
}
);

DataSet finalstate = 
loop.closeWith(statewithnewlossgradient,converged);

***While loop: *

DataSet data =... State  state =initialState(m, initweights,0,new 
double[initweights.length]);

int cnt=0;
do{
LBFGS lbfgs =new LBFGS();

statedataset=data.map(difffunction).withBroadcastSet(statedataset,"state")
   .reduce(accumulate)
   .map(new NormLossGradient(datasize))
   .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
   .map(lbfgs);
cnt++;
}while (cnt

emit a single Map per window

2016-09-02 Thread Luis Mariano Guerra
hi!

I'm trying to collect some metrics by key per window and emiting the full
result at the end of the window to kafka, I started with a simple count by
key to test it but my requirements are a little more complex than that.

what I want to do is to fold the stream events as they come and then at the
end of the window merge them together and emit a singe result, I don't want
to accumulate all the events and calculate at the end of the window, from
my understanding of fold in other languages/libraries, this would be what I
need, using it via apply(stateIn, foldFun, windowFun) but it's not working:

the basic is:

input
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.of(5, TimeUnit.SECONDS))
.apply(new HashMap(), foldFunction,
winFunction);

where foldFunction accumulates by key and winFunction iterate over the
hasmaps and merges them into a single result hashmap and emits that one at
the end.

this emits many one-key hash maps instead of only one with all the keys, I
tried setting setParallelism(1) in multiple places but still doesn't work.
More confusingly, in one run it emited a single map but after I ran it
again it went back to the previous behavior.

what I'm doing wrong? is there any other approach?

I can provide the implementation of foldFunction and winFunction if
required but I think it doesn't change much.

Reading the source code I see:

Applies the given window function to each window. The window function
is called for each evaluation of the window for each key individually. The
output of the window function is interpreted as a regular non-windowed
stream.

emphasis on " for each key individually", the return type of apply is
SingleOutputStreamOperator which doesn't provide many operations to group
the emited values.

thanks in advance.


Re: How to get latency info from benchmark

2016-09-02 Thread Robert Metzger
Hi Eric,

I'm sorry that you are running into these issues. I think the version is
0.10-SNAPSHOT, and I think I've used this commit:
https://github.com/rmetzger/flink/commit/547e749 for some of the runs (of
the throughput / latency tests, not for the yahoo benchmark). The commit
should at least point to the right point in time.
Note that these benchmarks are pretty old by now, and the performance
characteristics have probably changed in Flink 1.1 because we've put a lot
of effort into optimizing Flink for common streaming use cases.

Regards,
Robert


On Fri, Sep 2, 2016 at 5:09 PM, Eric Fukuda  wrote:

> Hi Robert,
>
> I've been trying to build the "performance" project using various versions
> of Flink, but failing. It seems that I need both KafkaZKStringSerializer
> class and FlinkKafkaConsumer082 class to build the project, but none of the
> branches has both of them. KafkaZKStringSerializer existed in 0.9.0-x
> branches but deleted in 0.9.1-x branches, and FlinkKafkaConsumer082 goes
> the other way, therefore they don't exist in a same branch. I'm guessing
> you were using a snapshot somewhere between 0.9.0 and 0.9.1. Could you tell
> me the SHA you were using?
>
> Regards,
> Eric
>
>
> On Wed, Aug 24, 2016 at 4:57 PM, Robert Metzger 
> wrote:
>
>> Hi,
>>
>> Version 0.10-SNAPSHOT is pretty old. The snapshot repository of Apache
>> probably doesn't keep old artifacts around forever.
>> Maybe you can migrate the tests to Flink 0.10.0, or maybe even to a
>> higher version.
>>
>> Regards,
>> Robert
>>
>> On Wed, Aug 24, 2016 at 10:32 PM, Eric Fukuda 
>> wrote:
>>
>>> Hi Max, Robert,
>>>
>>> Thanks for the advice. I'm trying to build the "performance" project,
>>> but failing with the following error. Is there a solution for this?
>>>
>>> [ERROR] Failed to execute goal on project streaming-state-demo: Could
>>> not resolve dependencies for project com.dataartisans.flink:streami
>>> ng-state-demo:jar:1.0-SNAPSHOT: Failure to find
>>> org.apache.flink:flink-connector-kafka-083:jar:0.10-SNAPSHOT in
>>> https://repository.apache.org/content/repositories/snapshots/ was
>>> cached in the local repository, resolution will not be reattempted until
>>> the update interval of apache.snapshots has elapsed or updates are forced
>>> -> [Help 1]
>>>
>>>
>>>
>>>
>>> On Wed, Aug 24, 2016 at 8:12 AM, Robert Metzger 
>>> wrote:
>>>
 Hi Eric,

 Max is right, the tool has been used for a different benchmark [1]. The
 throughput logger that should produce the right output is this one [2].
 Very recently, I've opened a pull request for adding metric-measuring
 support into the engine [3]. Maybe that's helpful for your experiments.


 [1] http://data-artisans.com/high-throughput-low-latency-and
 -exactly-once-stream-processing-with-apache-flink/
 [2] https://github.com/dataArtisans/performance/blob/master/
 flink-jobs/src/main/java/com/github/projectflink/streaming/T
 hroughput.java#L203
 [3] https://github.com/apache/flink/pull/2386



 On Wed, Aug 24, 2016 at 2:04 PM, Maximilian Michels 
 wrote:

> I believe the AnaylzeTool is for processing logs of a different
> benchmark.
>
> CC Jamie and Robert who worked on the benchmark.
>
> On Wed, Aug 24, 2016 at 3:25 AM, Eric Fukuda 
> wrote:
> > Hi,
> >
> > I'm trying to benchmark Flink without Kafka as mentioned in this post
> > (http://data-artisans.com/extending-the-yahoo-streaming-benchmark/).
> After
> > running flink.benchmark.state.AdvertisingTopologyFlinkState with
> > user.local.event.generator in localConf.yaml set to 1, I ran
> > flink.benchmark.utils.AnalyzeTool giving
> > flink-1.0.1/log/flink-[username]-jobmanager-0-[servername].log as a
> > command-line argument. I got the following output and it does not
> have the
> > information about the latency.
> >
> >
> > = Latency (0 reports ) =
> > = Throughput (1 reports ) =
> > == null (entries: 10150)===
> > Mean throughput 639078.5018497099
> > Exception in thread "main" java.lang.IndexOutOfBoundsException:
> toIndex = 2
> > at java.util.ArrayList.subListRangeCheck(ArrayList.java:962)
> > at java.util.ArrayList.subList(ArrayList.java:954)
> > at flink.benchmark.utils.AnalyzeT
> ool.main(AnalyzeTool.java:133)
> >
> >
> > Reading the code in AnalyzeTool.java, I found that it's looking for
> lines
> > that include "Latency" in the log file, but apparently it's not
> finding any.
> > I tried grepping the log file, and couldn't find any either. I have
> one
> > server that runs both JobManager and Task Manager and another server
> that
> > runs Redis, and they are connected through a network with each other.
> >
> > I think I have to do something to read the data stored in Re

Re: emit a single Map per window

2016-09-02 Thread Aljoscha Krettek
Hi,
from this I would expect to get as many HashMaps as you have keys. The
winFunction is also executed per-key so it cannot combine the HashMaps of
all keys.

Does this describe the behavior that you're seeing?

Cheers,
Aljoscha

On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra 
wrote:

> hi!
>
> I'm trying to collect some metrics by key per window and emiting the full
> result at the end of the window to kafka, I started with a simple count by
> key to test it but my requirements are a little more complex than that.
>
> what I want to do is to fold the stream events as they come and then at
> the end of the window merge them together and emit a singe result, I don't
> want to accumulate all the events and calculate at the end of the window,
> from my understanding of fold in other languages/libraries, this would be
> what I need, using it via apply(stateIn, foldFun, windowFun) but it's not
> working:
>
> the basic is:
>
> input
> .flatMap(new LineSplitter())
> .keyBy(0)
> .timeWindow(Time.of(5, TimeUnit.SECONDS))
> .apply(new HashMap(), foldFunction,
> winFunction);
>
> where foldFunction accumulates by key and winFunction iterate over the
> hasmaps and merges them into a single result hashmap and emits that one at
> the end.
>
> this emits many one-key hash maps instead of only one with all the keys, I
> tried setting setParallelism(1) in multiple places but still doesn't work.
> More confusingly, in one run it emited a single map but after I ran it
> again it went back to the previous behavior.
>
> what I'm doing wrong? is there any other approach?
>
> I can provide the implementation of foldFunction and winFunction if
> required but I think it doesn't change much.
>
> Reading the source code I see:
>
> Applies the given window function to each window. The window function
> is called for each evaluation of the window for each key individually. The
> output of the window function is interpreted as a regular non-windowed
> stream.
>
> emphasis on " for each key individually", the return type of apply is
> SingleOutputStreamOperator which doesn't provide many operations to group
> the emited values.
>
> thanks in advance.
>


Re: emit a single Map per window

2016-09-02 Thread Luis Mariano Guerra
On Fri, Sep 2, 2016 at 5:24 PM, Aljoscha Krettek 
wrote:

> Hi,
> from this I would expect to get as many HashMaps as you have keys. The
> winFunction is also executed per-key so it cannot combine the HashMaps of
> all keys.
>
> Does this describe the behavior that you're seeing?
>

yes, it's the behaviour I'm seeing, I'm looking for a way to merge those
HashMaps from the same window into a single one, I can't find how.


>
> Cheers,
> Aljoscha
>
> On Fri, 2 Sep 2016 at 17:37 Luis Mariano Guerra 
> wrote:
>
>> hi!
>>
>> I'm trying to collect some metrics by key per window and emiting the full
>> result at the end of the window to kafka, I started with a simple count by
>> key to test it but my requirements are a little more complex than that.
>>
>> what I want to do is to fold the stream events as they come and then at
>> the end of the window merge them together and emit a singe result, I don't
>> want to accumulate all the events and calculate at the end of the window,
>> from my understanding of fold in other languages/libraries, this would be
>> what I need, using it via apply(stateIn, foldFun, windowFun) but it's not
>> working:
>>
>> the basic is:
>>
>> input
>> .flatMap(new LineSplitter())
>> .keyBy(0)
>> .timeWindow(Time.of(5, TimeUnit.SECONDS))
>> .apply(new HashMap(), foldFunction,
>> winFunction);
>>
>> where foldFunction accumulates by key and winFunction iterate over the
>> hasmaps and merges them into a single result hashmap and emits that one at
>> the end.
>>
>> this emits many one-key hash maps instead of only one with all the keys,
>> I tried setting setParallelism(1) in multiple places but still doesn't
>> work. More confusingly, in one run it emited a single map but after I ran
>> it again it went back to the previous behavior.
>>
>> what I'm doing wrong? is there any other approach?
>>
>> I can provide the implementation of foldFunction and winFunction if
>> required but I think it doesn't change much.
>>
>> Reading the source code I see:
>>
>> Applies the given window function to each window. The window function
>> is called for each evaluation of the window for each key individually. The
>> output of the window function is interpreted as a regular non-windowed
>> stream.
>>
>> emphasis on " for each key individually", the return type of apply is
>> SingleOutputStreamOperator which doesn't provide many operations to group
>> the emited values.
>>
>> thanks in advance.
>>
>


Re: Firing windows multiple times

2016-09-02 Thread Shannon Carey
Of course! I really appreciate your interest & attention. I hope we will figure 
out solutions that other people can use.

I agree with your analysis. Your triggering syntax is particularly nice. I 
wrote a custom trigger which does exactly that but without the nice fluent API. 
As I considered the approach you mentioned, it was clear that I would not be 
able to easily solve the problem of multiple windows with early-firing events 
causing over-counting. Modifying the windowing system as you describe would be 
helpful. Events could either be filtered out, as you describe, or perhaps the 
windows themselves could be muted/un-muted depending on whether they are the 
closest window (by end time) to the current watermark.

I'm not clear on the purpose of the late firing you describe. I believe that 
was added in Flink 1.1 and it's a new concept to me. I thought late events were 
completely handled by decisions made in the watermark & timestamp assigner. 
Does this feature allow events after the watermark to still be incorporated 
into windows that have already been closed by a watermark? Perhaps it's 
intended to allow window-specific lateness allowance, rather than the 
stream-global watermarker? That does sound problematic. I assume there's a 
reason for closing the window before the allowed lateness has elapsed? 
Otherwise, the window (trigger, really) could just add the lateness to the 
watermark and pretend that the watermark hadn't been reached until the lateness 
had already passed.

I agree that your idea is potentially a lot better than the approach I 
described, if it can be implemented! You are right that the approach I 
described requires that all the events be retained in the window state so that 
aggregation can be done repeatedly from the raw events as new events come in 
and old events are evicted. In practice, we are currently writing the first 
aggregations (day-level) to an external database and then querying that 
time-series from the second-level (year) aggregation so that we don't actually 
need to keep all that data around in Flink state. Obviously, that approach can 
have an impact on the processing guarantees when a failure/recovery occurs if 
we don't do it carefully. Also, we're not particularly sophisticated yet with 
regard to avoiding unnecessary queries to the time series data.

-Shannon


From: Aljoscha Krettek mailto:aljos...@apache.org>>
Date: Friday, September 2, 2016 at 4:02 AM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

I see, I didn't forget about this, it's just that I'm thinking hard.

I think in your case (which I imagine some other people to also have) we would 
need an addition to the windowing system that the original Google Dataflow 
paper called retractions. The problem is best explained with an example. Say 
you have this program:

DataStream input = ...

DataStream firstAggregate = input
  .keyBy(...)
  .window(TumblingTimeWindow(1 Day))
  
.trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

DataStream secondAggregate = firstAggregate
  .keyBy(...)
  .window(TumblingTimeWindow(5 Days)
  
.trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

The problem here is that the second windowing operation sees all the 
incremental early-firing updates from the first window operation, it would thus 
over count. This problem could be overcome by introducing meta data in the 
windowing system and filtering out those results that indicate that they come 
from an early (speculative) firing. A second problem is that of late firings, 
i.e. if you have a window specification like this:

DataStream firstAggregate = input
  .keyBy(...)
  .window(TumblingTimeWindow(1 Day))
  .allowedLateness(1 Hour)
  .trigger(
EventTime.afterEndOfWindow()
 
.withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30
 
.withLateTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

where you also have late firing data after you got the primary firing when the 
watermark passed the end of the window. That's were retractions come into play, 
before sending data downstream form a late firing the window operator has to 
send the inverse of the previous firing so that the downstream operation can 
"subtract" that from the current aggregate and replace it with the newly 
updated aggregate. This is a somewhat thorny problem, though, and to the best 
of my knowledge Google never implemented this in the publicly available 
Dataflow SDK or what is now Beam.

The reason why I'm thinking in this direction and not in the direction of 
keeping track of the watermark and manually evicting elements as you go is that 
I

Re: How to get latency info from benchmark

2016-09-02 Thread Eric Fukuda
Thanks Robert,

I tried to checkout the commit you mentioned, but git returns an error
"fatal: reference if not a tree: 547e7490fb99562ca15a2127f0ce1e784db97f3e".
I've searched for a solution but could not find any. Am I doing something
wrong?

-
$ git clone https://github.com/rmetzger/flink.git
Cloning into 'flink'...
remote: Counting objects: 321185, done.
remote: Compressing objects: 100% (3/3), done.
remote: Total 321185 (delta 1), reused 0 (delta 0), pack-reused 321182
Receiving objects: 100% (321185/321185), 93.60 MiB | 10.63 MiB/s, done.
Resolving deltas: 100% (141424/141424), done.
Checking connectivity... done.
$ cd flink/
$ git checkout 547e7490fb99562ca15a2127f0ce1e784db97f3e
fatal: reference is not a tree: 547e7490fb99562ca15a2127f0ce1e784db97f3e
--

Regards,
Eric

On Fri, Sep 2, 2016 at 12:01 PM, Robert Metzger  wrote:

> Hi Eric,
>
> I'm sorry that you are running into these issues. I think the version is
> 0.10-SNAPSHOT, and I think I've used this commit: https://github.com/
> rmetzger/flink/commit/547e749 for some of the runs (of the throughput /
> latency tests, not for the yahoo benchmark). The commit should at least
> point to the right point in time.
> Note that these benchmarks are pretty old by now, and the performance
> characteristics have probably changed in Flink 1.1 because we've put a lot
> of effort into optimizing Flink for common streaming use cases.
>
> Regards,
> Robert
>
>
> On Fri, Sep 2, 2016 at 5:09 PM, Eric Fukuda  wrote:
>
>> Hi Robert,
>>
>> I've been trying to build the "performance" project using various
>> versions of Flink, but failing. It seems that I need both
>> KafkaZKStringSerializer class and FlinkKafkaConsumer082 class to build the
>> project, but none of the branches has both of them. KafkaZKStringSerializer
>> existed in 0.9.0-x branches but deleted in 0.9.1-x branches, and
>> FlinkKafkaConsumer082 goes the other way, therefore they don't exist in a
>> same branch. I'm guessing you were using a snapshot somewhere between 0.9.0
>> and 0.9.1. Could you tell me the SHA you were using?
>>
>> Regards,
>> Eric
>>
>>
>> On Wed, Aug 24, 2016 at 4:57 PM, Robert Metzger 
>> wrote:
>>
>>> Hi,
>>>
>>> Version 0.10-SNAPSHOT is pretty old. The snapshot repository of Apache
>>> probably doesn't keep old artifacts around forever.
>>> Maybe you can migrate the tests to Flink 0.10.0, or maybe even to a
>>> higher version.
>>>
>>> Regards,
>>> Robert
>>>
>>> On Wed, Aug 24, 2016 at 10:32 PM, Eric Fukuda 
>>> wrote:
>>>
 Hi Max, Robert,

 Thanks for the advice. I'm trying to build the "performance" project,
 but failing with the following error. Is there a solution for this?

 [ERROR] Failed to execute goal on project streaming-state-demo: Could
 not resolve dependencies for project com.dataartisans.flink:streami
 ng-state-demo:jar:1.0-SNAPSHOT: Failure to find
 org.apache.flink:flink-connector-kafka-083:jar:0.10-SNAPSHOT in
 https://repository.apache.org/content/repositories/snapshots/ was
 cached in the local repository, resolution will not be reattempted until
 the update interval of apache.snapshots has elapsed or updates are forced
 -> [Help 1]




 On Wed, Aug 24, 2016 at 8:12 AM, Robert Metzger 
 wrote:

> Hi Eric,
>
> Max is right, the tool has been used for a different benchmark [1].
> The throughput logger that should produce the right output is this one 
> [2].
> Very recently, I've opened a pull request for adding metric-measuring
> support into the engine [3]. Maybe that's helpful for your experiments.
>
>
> [1] http://data-artisans.com/high-throughput-low-latency-and
> -exactly-once-stream-processing-with-apache-flink/
> [2] https://github.com/dataArtisans/performance/blob/master/
> flink-jobs/src/main/java/com/github/projectflink/streaming/T
> hroughput.java#L203
> [3] https://github.com/apache/flink/pull/2386
>
>
>
> On Wed, Aug 24, 2016 at 2:04 PM, Maximilian Michels 
> wrote:
>
>> I believe the AnaylzeTool is for processing logs of a different
>> benchmark.
>>
>> CC Jamie and Robert who worked on the benchmark.
>>
>> On Wed, Aug 24, 2016 at 3:25 AM, Eric Fukuda 
>> wrote:
>> > Hi,
>> >
>> > I'm trying to benchmark Flink without Kafka as mentioned in this
>> post
>> > (http://data-artisans.com/extending-the-yahoo-streaming-benchmark/).
>> After
>> > running flink.benchmark.state.AdvertisingTopologyFlinkState with
>> > user.local.event.generator in localConf.yaml set to 1, I ran
>> > flink.benchmark.utils.AnalyzeTool giving
>> > flink-1.0.1/log/flink-[username]-jobmanager-0-[servername].log as a
>> > command-line argument. I got the following output and it does not
>> have the
>> > information about the latency.
>> >
>> >
>> > = Latency (0 reports ) ==

Re: Apache Flink: How does it handle the backpressure?

2016-09-02 Thread rss rss
Hi,

  some time ago I found a problem with backpressure in Spark and prepared a
simple test to check it and compare with Flink.
https://github.com/rssdev10/spark-kafka-streaming


+
https://mail-archives.apache.org/mod_mbox/spark-user/201607.mbox/%3CCA+AWphp=2VsLrgSTWFFknw_KMbq88fZhKfvugoe4YYByEt7a=w...@mail.gmail.com%3E

In case of Flink it works. In case of Spark it works if you setup
limitations of input rates per data sources. See source code an example.
And actually backpressure detector in Spark works very bad.

Best regards

2016-09-02 15:07 GMT+03:00 jiecxy <253441...@qq.com>:

> For an operator, the input stream is faster than its output stream, so its
> input buffer will block the previous operator's output thread that
> transfers
> the data to this operator. Right?
>
> Do the Flink and the Spark both handle the backpressure by blocking the
> thread? So what's the difference between them?
>
> For the data source, it is continuously producing the data, what if its
> output thread is blocked? Would the buffer overflow?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-
> How-does-it-handle-the-backpressure-tp8866.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Memory Management in Streaming?

2016-09-02 Thread Shaosu Liu
Hi,

I have had issues when I processed large amount of data (large windows
where I could not do incremental updates), flink slowed down significantly.
It did help when I increased the amount of memory and used off heap
allocation. But it only delayed the onset of the probelm without solving
it.

Could some one give me some hints on how Flink manage window buffer and how
streaming manages its memory. I see this page on batch api memory
management and wonder what is the equivalent for streaming?
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525

-- 
Cheers,
Shaosu


fromParallelCollection

2016-09-02 Thread rimin515
Hi,val env = StreamExecutionEnvironment.getExecutionEnvironment  val tr 
= env.fromParallelCollection(data)
the data i do not know initialize,some one can tell me..