How to perform efficient DataSet reuse between iterations

2017-11-26 Thread Miguel Coimbra
Hello,

I'm facing a problem in an algorithm where I would like to constantly
update a DataSet representing a graph, perform some computation, output one
or more DataSink (such as a file on the local system) and then reuse
the DataSet
for a next iteration.
​I want to avoid spilling the results to disk at the end of an iteration
and to read it back in the next iterations - the graph is very big and I do
not wish to incur that time overhead.
I want to reuse the full result DataSet of each iteration in the next one
and I want to save to disk a small percentage of the produced DataSet for
each iteration.
The space complexity is rather constant - the number of edges in the graph
increases by only 100 between iterations (which is an extremely low
percentage of the original graph's edges) and is obtained using
env.fromCollection(edgesToAdd).

Although I am using Flink's Gelly API for graphs, I have no problem working
directly with the underlying vertex and edge DataSet elements.​

Two ways to do this occur to me, but it seems both are currently not
supported in Flink, as per Vasia's answer to this Stack Overflow quest​ion
[1]:

«​*​*


*Unfortunately, it is not currently possible to output intermediate results
from a bulk iteration.You can only output the final result at the end of
the iteration.Also, as you correctly noticed, Flink cannot efficiently
unroll a while-loop or for-loop, so that won't work either.»*

*1.* I thought I could create a bulk iteration, perform the computation and
between iterations, output the result to the file system.
However, this is not possible, as per Vasia's answer, and produces the
following exception on execution when I try (for example, to calculate a
centrality metric for every vertex and dump the results to disk), as
expected based on that information:

org.apache.flink.api.common.InvalidProgramException: A data set that is
part of an iteration was used as a sink or action. Did you forget to close
the iteration?

*2.* Using a for loop in my own program and triggering sequential Flink job
executions.
Problem: in this scenario, while I am able to use a DataSet produced in an
iteration's Flink job (and dump the desired output information to disk) and
pass it to the next Flink job, the computation time increases constantly:
(I also tried manually starting a session which is kept open with
env.startNewSession() before the loop - no impact)

Initial graph has 33511 vertices and 411578 edges.
Added 113 vertices and 100 edges.
1-th graph now has 33524 vertices and 411678 edges (2.543 s).
1-th graph algorithm produced 33524 elements. *(20.96 s)*.
Added 222 vertices and 200 edges.
2-th graph now has 33536 vertices and 411778 edges (1.919 s).
2-th graph algorithm produced 33536 elements. *(35.913 s)*.
Added 326 vertices and 300 edges.
3-th graph now has 33543 vertices and 411878 edges (1.825 s).
3-th graph algorithm produced 33543 elements. *(49.624 s)*.
Added 436 vertices and 400 edges.
4-th graph now has 33557 vertices and 411978 edges (1.482 s).
4-th graph algorithm produced 33557 elements. *(66.209 s)*.

Note that the number of elements in the output DataSet is equal to the
number of vertices in the graph.
On iteration i in my program, the executed graph algorithm incorporates the
result DataSet of iteration i - 1 by means of the
g.joinWithVertices(previousResultDataSet,
new RanksJoinFunction()) function.

The VertexJoinFunction is a simple forwarding mechanism to set the previous
values:

@FunctionAnnotation.ForwardedFieldsFirst("*->*")
private static class RanksJoinFunction implements
VertexJoinFunction {
@Override
public Double vertexJoin(final Double vertexValue, final Double
inputValue) throws Exception {
return inputValue;
}
}

​I have also used Flink's plan visualizer to check for discrepancies
between the first iteration and the tenth (for example), but the layout of
the plan remains exactly the same while the execution time continually
increases for what should be the same amount of computations.

*Bottom-line:* ​I was hoping someone could tell me how to overcome the
performance bottleneck using the sequential job approach or enabling the
output of intermediate results using Flink's Bulk Iterations.
​​​I believe others have stumbled upon this limitation before [2, 3].​​
I have tested this on a dual-core i7 with 8 GB RAM on Java 8 64-bit using a
local environment:

final Configuration conf = new Configuration();
final LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.
createLocalEnvironmentWithWebUI(conf);
final ExecutionEnvironment env = lenv;
env.getConfig().disableSysoutLogging().setParallelism(1);

​I wish to ​execute in a cluster later on with a bigger dataset, so it
would be essential that to maximize the ability to reuse the DataSets that
are distributed by the Flink runtime.
This would allow me to avoid the performance bottleneck that I described.
​Hopefully someone may shed light on this.​

​Thanks for your attention.​


​References:

Best way to wait for different events

2017-11-26 Thread Lothium
Hi,
I have a question to a specific use case and hope that you can help me with
that. I have a streaming pipeline and receive events of different types. I
parse the events to their internal representation and do some
transformations on them. Some of these events I want to collect internally
(grouped by a transaction id) and as soon as a specific event arrives, I
want to emit for example an aggregated event to the downstream operators (so
this is not bound to time or a count). 
I thought about keying the stream by some characteristic (so that all the
needed events are in the same logical partition), collect them in a stateful
process function and emit this after the specific event arrived.

Besides of the event types I also have to key the stream by a transaction
id, which all of these events are belong to (the transaction id is in all of
the events). The transaction id is unique and will only occur once, so I
will have a lot of unique short living keys.

I would clear the state of the process function after I have emitted the
aggregated event downstream, so this should hopefully release the state and
will clean it up. Is that correct? Would there be a problem, because of
these many keys that I would use (which will only be used once) or wouldn't
this be a problem and would Flink release the ressources (regarding memory
usage etc.)? Is this the right way to handle this use case?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Taskmanagers are quarantined

2017-11-26 Thread T Obi
Hello all,

We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
a problem. Suddenly a connection between a taskmanager and the
jobmanager is timed out and the taskmanager is "quarantined" by
jobmanager.
Once a taskmanager is quarantined, of course jobs are restarted, but
the timeout and quarantine happens to some taskmanager successively.

When a taskmanager's connection to jobmanager was timed out, its
connections to zookeeper and snapshot HDFS were also timed out. So the
problem doesn't seem to be one of Flink itself.
But though a taskmanager which runs on the same machine as jobmanager
is timed out, jobmanager is alright at the time. So I think it is not
OS problem too.

Could you give us some advice on how to investigate? Thank you.



Taskmanager command line:

java -XX:+UseG1GC -Xms219136M -Xmx219136M
-XX:MaxDirectMemorySize=8388607T
-Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log
-Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
-Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
-classpath 
/opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
org.apache.flink.runtime.taskmanager.TaskManager --configDir
/opt/flink/flink-1.3.2/conf


Taskmanager (on flink-jp-2) log:

2017-11-22 14:09:31,595 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap
backend snapshot (File Stream Factory @
hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011,
synchronous part) in thread
Thread[TriggerWindow(TumblingProcessingTimeWindows(6),
ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894},
ProcessingTimeTrigger(),
WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
(9/30),5,Flink Task Threads] took 142 ms.
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
 - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999
java.io.EOFException: Premature EOF: no length prefix available
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
 - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744
java.io.EOFException: Premature EOF: no length prefix available
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
 - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092
java.io.EOFException: Premature EOF: no length prefix available
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
 - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393
java.io.EOFException: Premature EOF: no length prefix available
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,041 WARN  org.apache.hadoop.hdfs.DFSClient
 - Error Recovery for block
BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in
pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
datanode 10.5.0.61:50010
2017-11-22 14:12:10,039 WARN  org.apache.hadoop.hdfs.DFSClient
 - Error Recovery for block
BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in
pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad
datanode 10.5.0.59:50010
2017-11-22 14:12:10,038 WARN  org.apache.ha

Re: Apache Flink - Question about TriggerResult.FIRE

2017-11-26 Thread Aljoscha Krettek
Hi,

Yes, after the watermark (or processing-time) pass "end-of-window + 
allowed-lateness" everything that is stored for a window is deleted.

Best,
Aljoscha

> On 25. Nov 2017, at 18:07, M Singh  wrote:
> 
> Hi:
> 
> Another question - what happens if the trigger never calls PURGE or 
> FIRE_AND_PURGE and only calls FIRE ?  Are the window and it's contents 
> removed after the end time + lateness are exceeded ?
> 
> Thanks
> 
> 
> On Monday, November 20, 2017 2:18 AM, Stefan Richter 
>  wrote:
> 
> 
> Hi,
> 
>> 
>> "In the first case, it is a new window without the previous elements, in the 
>> second case the window reflects the old contents plus all changes since the 
>> last trigger."
>> 
>> I am assuming the first case is FIRE and second case is FIRE_AND_PURGE - I 
>> was thinking that in the first case (FIRE), there would be elements from 
>> previous window since we did not purge and the second case it would have 
>> only new elements since it was purged.  Am I missing something ?  
>>> 
> 
> No, with first case, I meant the „truly new window case“ and the second case 
> is „another triggering of the previous (non-purged) window“. So the second 
> case is a simple FIRE without PURGE.
> 
> Best,
> Stefan
> 
>