Thanks for your help!! That is exactly what we need :-) 





















______________________________________ 

Dr. Rubén Casado 
Head of Big Data 
Treelogic 
ruben.casado.treelogic 

+34 902 286 386 - +34 607 18 28 06 
Parque Tecnológico de Asturias · Parcela 30 
E33428 Llanera · Asturias [Spain] 
www.treelogic.com 
______________________________________ 

----- Mensaje original ----- 
De: "Aljoscha Krettek" <aljos...@apache.org> 
Para: user@flink.apache.org 
Enviados: Jueves, 21 de Abril 2016 11:21:00 GMT +01:00 Amsterdam / Berlín / 
Berna / Roma / Estocolmo / Viena 
Asunto: Re: save state in windows operation 


Hi, 
you should be able to do this using Flink's state abstraction in a 
RichWindowFunction like this: 



public static class MyApplyFunction extends RichWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, GlobalWindow> { 


ValueStateDescriptor<Tuple2<String, Integer>> stateDescriptor = 
new ValueStateDescriptor<>("last-result", 
new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo(), 
null); 


@Override 
public void apply(Tuple key, 
GlobalWindow window, 
Iterable<Tuple2<String, Integer>> input, 
Collector<Tuple2<String, Integer>> out) throws Exception { 
ValueState<Tuple2<String, Integer>> state = 
getRuntimeContext().getState(stateDescriptor); 


Tuple2<String, Integer> lastResult = state.value(); 
if (lastResult != null) { 
// do something with it 
} else { 

} 

// do our computation 

// store for future use 
state.update(new Tuple2<>("hey there", 42)); 
} 
} 


The arguments of ValueStateDescriptor are: state name, TypeInformation for the 
values in the state, default value of the state that you get if nothing is set. 


Also, keep in mind that the state is local to each key, just as the window is 
local to each key. 


Cheers, 
Aljoscha 


On Thu, 21 Apr 2016 at 11:10 Rubén Casado < ruben.cas...@treelogic.com > wrote: 




Hello, 


We have problems working with states in Flink and I am sure you can help us :-) 


Let's say we have a workflow something like: 


DataStream<K> myData = env.from... 


myData.map(new MyMap (..)) 
.keyBy(0) 
.countWindow(n) 
.apply(new MyApplyFunction()) 
.writeAsCSV(...) 


To implement the logic of our MyApplyFunction, in the apply() method we would 
need to have access to the result of the last window computation. Before 
emiting the resulst in the apply () using collector.collect(..), we could save 
that result in an external storage systems (e.g Redis /Hazelcast) and then, in 
the begininig of the next window computation read such value, but we woud like 
to use some internal mechanism of Flink to do that. 


Could some provide help about it? Thanks in advance!!! :-) 


Best 






















______________________________________ 

Dr. Rubén Casado 
Head of Big Data 
Treelogic 
ruben.casado.treelogic 

+34 902 286 386 - +34 607 18 28 06 
Parque Tecnológico de Asturias · Parcela 30 
E33428 Llanera · Asturias [Spain] 
www.treelogic.com 
______________________________________ 

Reply via email to