Re: Working with State example /flink streaming

2015-11-30 Thread Anton Polyakov
Javier

sorry to jumping in, but I think your case is very similar to what I am
trying to achieve in the thread just next to yours (called "Watermarks as
"process completion" flags". I also need to process a stream which is
produced for some time, but then take an action after certain event. Also
window doesn't work for me because in my case stream producing data for 4-5
hours and I need to evaluate it continuously but then finalize upon
receiving certain "least event".

I am thinking that existing checkpointing would be very helpful as it
solves exactly this task but internally. If you'd be able to emit "special"
checkpoint in source and then react on it at the end of processing chain,
do you think you could solve your task?

On Fri, Nov 27, 2015 at 4:29 PM, Lopez, Javier 
wrote:

> Hi,
>
> Thanks for the example. We have done it with windows before and it works.
> We are using state because the data comes with a gap of several days and we
> can't handle a window size of several days. That's why we decided to use
> the state.
>
> On 27 November 2015 at 11:09, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> I’ll try to go into a bit more detail about the windows here. What you
>> can do is this:
>>
>> DataStream> input = … // fields are (id,
>> sum, count), where count is initialized to 1, similar to word count
>>
>> DataStream> counts = input
>>   .keyBy(0)
>>   .timeWindow(Time.minutes(10))
>>   .reduce(new MyCountingReducer())
>>
>> DataStream> result = counts.map( > that divides sum by count> )
>>
>> Does this help? Here, you don’t even have to deal with state, the
>> windowing system will keep the state (i.e. the reduced) value in internal
>> state in a fault tolerant fashion.
>>
>> Cheers,
>> Aljoscha
>> > On 26 Nov 2015, at 17:14, Stephan Ewen  wrote:
>> >
>> > Hi!
>> >
>> > In streaming, there is no "end" of the stream when you would emit the
>> final sum. That's why there are windows.
>> >
>> > If you do not want the partial sums, but only the final sum, you need
>> to define what window in which the sum is computed. At the end of that
>> window, that value is emitted. The window can be based on time, counts, or
>> other measures.
>> >
>> > Greetings,
>> > Stephan
>> >
>> >
>> > On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier 
>> wrote:
>> > Hi, thanks for the answer. It worked but not in the way we expected. We
>> expect to have only one sum per ID and we are getting all the consecutive
>> sums, for example:
>> >
>> > We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the
>> initial values are ID -> 11, values -> 1,2,3). Here is the code we are
>> using for our test:
>> >
>> > DataStream> > uple2> stream = ...;
>> >
>> >
>> > DataStream> result =
>> stream.keyBy(0).map(new RollingSum());
>> >
>> >
>> >
>> > public static class RollingSum extends RichMapFunction> Double>, Tuple4> {
>> >
>> > // persistent counter
>> >   private OperatorState sum;
>> >   private OperatorState count;
>> >
>> >
>> > @Override
>> > public Tuple4 map(Tuple2> Double> value1) {
>> >   try {
>> >   Double newSum = sum.value()+value1.f1;
>> >
>> >   sum.update(newSum);
>> >   count.update(count.value()+1);
>> >   return new Tuple4> Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
>> >   } catch (IOException e) {
>> >   // TODO Auto-generated catch block
>> >   e.printStackTrace();
>> >   }
>> >
>> >   return null;
>> >
>> > }
>> >
>> > @Override
>> > public void open(Configuration config) {
>> > sum = getRuntimeContext().getKeyValueState("mySum",
>> Double.class, 0D);
>> > count = getRuntimeContext().getKeyValueState("myCounter",
>> Long.class, 0L);
>> > }
>> >
>> > }
>> >
>> >
>> > We are using a Tuple4 because we want to calculate the sum and the
>> average (So our Tuple is ID, SUM, Count, AVG). Do we need to add another
>> step to get a single value out of it? or is this the expected behavior.
>> >
>> > Thanks again for your help.
>> >
>> > On 25 November 2015 at 17:19, Stephan Ewen  wrote:
>> > Hi Javier!
>> >
>> > You can solve this both using windows, or using manual state.
>> >
>> > What is better depends a bit on when you want to have the result (the
>> sum). Do you want a result emitted after each update (or do some other
>> operation with that value) or do you want only the final sum after a
>> certain time?
>> >
>> > For the 

Re: Working with State example /flink streaming

2015-11-27 Thread Aljoscha Krettek
Hi,
I’ll try to go into a bit more detail about the windows here. What you can do 
is this:

DataStream> input = … // fields are (id, sum, 
count), where count is initialized to 1, similar to word count

DataStream> counts = input
  .keyBy(0)
  .timeWindow(Time.minutes(10))
  .reduce(new MyCountingReducer())

DataStream> result = counts.map(  )

Does this help? Here, you don’t even have to deal with state, the windowing 
system will keep the state (i.e. the reduced) value in internal state in a 
fault tolerant fashion.

Cheers,
Aljoscha
> On 26 Nov 2015, at 17:14, Stephan Ewen  wrote:
> 
> Hi!
> 
> In streaming, there is no "end" of the stream when you would emit the final 
> sum. That's why there are windows.
> 
> If you do not want the partial sums, but only the final sum, you need to 
> define what window in which the sum is computed. At the end of that window, 
> that value is emitted. The window can be based on time, counts, or other 
> measures.
> 
> Greetings,
> Stephan
> 
> 
> On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier  
> wrote:
> Hi, thanks for the answer. It worked but not in the way we expected. We 
> expect to have only one sum per ID and we are getting all the consecutive 
> sums, for example:
> 
> We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial 
> values are ID -> 11, values -> 1,2,3). Here is the code we are using for our 
> test:
> 
> DataStream uple2> stream = ...;
> 
> 
> DataStream> result = 
> stream.keyBy(0).map(new RollingSum());
> 
> 
> 
> public static class RollingSum extends RichMapFunction Double>, Tuple4> {
> 
> // persistent counter
>   private OperatorState sum;
>   private OperatorState count;
>   
> 
> @Override
> public Tuple4 map(Tuple2 Double> value1) {
>   try {
>   Double newSum = sum.value()+value1.f1;
>   
>   sum.update(newSum);
>   count.update(count.value()+1);
>   return new Tuple4 Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
>   } catch (IOException e) {
>   // TODO Auto-generated catch block
>   e.printStackTrace();
>   }
> 
>   return null;
>
> }
> 
> @Override
> public void open(Configuration config) {
> sum = getRuntimeContext().getKeyValueState("mySum", Double.class, 
> 0D);
> count = getRuntimeContext().getKeyValueState("myCounter", 
> Long.class, 0L);
> }
> 
> }
> 
> 
> We are using a Tuple4 because we want to calculate the sum and the average 
> (So our Tuple is ID, SUM, Count, AVG). Do we need to add another step to get 
> a single value out of it? or is this the expected behavior.
> 
> Thanks again for your help.
> 
> On 25 November 2015 at 17:19, Stephan Ewen  wrote:
> Hi Javier!
> 
> You can solve this both using windows, or using manual state.
> 
> What is better depends a bit on when you want to have the result (the sum). 
> Do you want a result emitted after each update (or do some other operation 
> with that value) or do you want only the final sum after a certain time?
> 
> For the second variant, I would use a window, for the first variant, you 
> could use custom state as follows:
> 
> For each element, you take the current state for the key, add the value to 
> get the new sum. Then you update the state with the new sum and emit the 
> value as well...
> 
> Java:
> 
> DataStream uple2> stream = ...;
> 
> 
> DataStream> result = stream.keyBy(0).map(new 
> RollingSum());
> 
> 
> public
>  class RollingSum extends RichMapFunction, 
> Tuple2> {
> 
> 
> 
> private OperatorState sum;
> 
> 
> 
> @Override
> 
> 
> public Tuple2 map(Tuple2 value) {
> long 
> newSum = sum.value() + value.f1;
> 
> sum.update(newSum);
> 
> 
> return new Tuple2<>(value.f0, newSum);
> 
> 
> }
> 
> 
> 
> @Override
> 
> 
> public void open(Configuration config) {
> 
> 
> counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L);
> 
> 
> }
> }
> 
> 
> In Scala, you can write this briefly as:
> 
> val stream: DataStream[(String, Int)] = ...
> 
> 
> 
> val counts: DataStream[(String, Int)] = stream
> 
>   
> .keyBy(_._1)
> 
>   
> .mapWithState((in: (String, Int), sum: Option[Int]) 
> => {
> 
> val newSum = in._2 + sum.getOrElse(0)
> 
> ( (
> in._1, newSum), 

Re: Working with State example /flink streaming

2015-11-26 Thread Lopez, Javier
Hi, thanks for the answer. It worked but not in the way we expected. We
expect to have only one sum per ID and we are getting all the consecutive
sums, for example:

We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial
values are ID -> 11, values -> 1,2,3). Here is the code we are using for
our test:

DataStream> stream =
...;DataStream> result =
stream.keyBy(0).map(new RollingSum());

public static class RollingSum extends RichMapFunction, Tuple4> {

// persistent counter
private OperatorState sum;
private OperatorState count;


@Override
public Tuple4 map(Tuple2 value1) {
try {
Double newSum = sum.value()+value1.f1;

sum.update(newSum);
count.update(count.value()+1);
return new Tuple4(value1.f0,sum.value(),count.value(),sum.value()/count.value());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return null;

}

@Override
public void open(Configuration config) {
sum = getRuntimeContext().getKeyValueState("mySum",
Double.class, 0D);
count = getRuntimeContext().getKeyValueState("myCounter",
Long.class, 0L);
}

}


We are using a Tuple4 because we want to calculate the sum and the average
(So our Tuple is ID, SUM, Count, AVG). Do we need to add another step to
get a single value out of it? or is this the expected behavior.

Thanks again for your help.

On 25 November 2015 at 17:19, Stephan Ewen  wrote:

> Hi Javier!
>
> You can solve this both using windows, or using manual state.
>
> What is better depends a bit on when you want to have the result (the
> sum). Do you want a result emitted after each update (or do some other
> operation with that value) or do you want only the final sum after a
> certain time?
>
> For the second variant, I would use a window, for the first variant, you
> could use custom state as follows:
>
> For each element, you take the current state for the key, add the value to
> get the new sum. Then you update the state with the new sum and emit the
> value as well...
>
> Java:
>
> DataStream> stream = ...;DataStream Long>> result = stream.keyBy(0).map(new RollingSum());
>
>
> public class RollingSum extends RichMapFunction, 
> Tuple2> {
>
> private OperatorState sum;
>
> @Override
> public Tuple2 map(Tuple2 value) {
> *long *newSum = sum.value() + value.f1;sum.update(newSum);
> return *new* Tuple2<>(value.f0, newSum);
> }
>
> @Override
> public void open(Configuration config) {
> counter = getRuntimeContext().getKeyValueState("myCounter", 
> Long.class, 0L);
> }}
>
>
>
> In Scala, you can write this briefly as:
>
> val stream: DataStream[(String, Int)] = *...*
> val counts: DataStream[(String, Int)] = stream
>   .keyBy(_._1)
>   .mapWithState((in: (String, Int), sum: Option[Int]) => {*val* newSum = 
> in._2 + sum.getOrElse(0)
> ( (in._1, newSum), Some(newSum) )
>  }
>
>
> Does that help?
>
> Thanks also for pointing out the error in the sample code...
>
> Greetings,
> Stephan
>
>
> On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier 
> wrote:
>
>> Hi,
>>
>> We are trying to do a test using States but we have not been able to
>> achieve our desired result. Basically we have a data stream with data as
>> [{"id":"11","value":123}] and we want to calculate the sum of all values
>> grouping by ID. We were able to achieve this using windows but not with
>>  states. The example that is in the documentation (
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
>> is not very clear and even has some errors, for example:
>>
>> public class CounterSum implements RichReduceFunction
>>
>> should be
>>
>> public class CounterSum extends RichReduceFunction
>>
>> as RichReduceFuncion is a Class, not an interface.
>>
>> We wanted to ask you if you have an example of how to use States with
>> Flink.
>>
>> Thanks in advance for your help.
>>
>>
>>
>
>


Re: Working with State example /flink streaming

2015-11-26 Thread Stephan Ewen
Hi!

In streaming, there is no "end" of the stream when you would emit the final
sum. That's why there are windows.

If you do not want the partial sums, but only the final sum, you need to
define what window in which the sum is computed. At the end of that window,
that value is emitted. The window can be based on time, counts, or other
measures.

Greetings,
Stephan


On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier 
wrote:

> Hi, thanks for the answer. It worked but not in the way we expected. We
> expect to have only one sum per ID and we are getting all the consecutive
> sums, for example:
>
> We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial
> values are ID -> 11, values -> 1,2,3). Here is the code we are using for
> our test:
>
> DataStream> stream = ...;DataStream Double, Long, Double>> result = stream.keyBy(0).map(new RollingSum());
>
> public static class RollingSum extends RichMapFunction Double>, Tuple4> {
>
> // persistent counter
>   private OperatorState sum;
>   private OperatorState count;
>   
>
> @Override
> public Tuple4 map(Tuple2 Double> value1) {
>   try {
>   Double newSum = sum.value()+value1.f1;
>   
>   sum.update(newSum);
>   count.update(count.value()+1);
>   return new Tuple4 Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
>   } catch (IOException e) {
>   // TODO Auto-generated catch block
>   e.printStackTrace();
>   }
>
>   return null;
>
> }
>
> @Override
> public void open(Configuration config) {
> sum = getRuntimeContext().getKeyValueState("mySum", Double.class, 
> 0D);
> count = getRuntimeContext().getKeyValueState("myCounter", 
> Long.class, 0L);
> }
>
> }
>
>
> We are using a Tuple4 because we want to calculate the sum and the average
> (So our Tuple is ID, SUM, Count, AVG). Do we need to add another step to
> get a single value out of it? or is this the expected behavior.
>
> Thanks again for your help.
>
> On 25 November 2015 at 17:19, Stephan Ewen  wrote:
>
>> Hi Javier!
>>
>> You can solve this both using windows, or using manual state.
>>
>> What is better depends a bit on when you want to have the result (the
>> sum). Do you want a result emitted after each update (or do some other
>> operation with that value) or do you want only the final sum after a
>> certain time?
>>
>> For the second variant, I would use a window, for the first variant, you
>> could use custom state as follows:
>>
>> For each element, you take the current state for the key, add the value
>> to get the new sum. Then you update the state with the new sum and emit the
>> value as well...
>>
>> Java:
>>
>> DataStream> stream = ...;DataStream> Long>> result = stream.keyBy(0).map(new RollingSum());
>>
>>
>> public class RollingSum extends RichMapFunction, 
>> Tuple2> {
>>
>> private OperatorState sum;
>>
>> @Override
>> public Tuple2 map(Tuple2 value) {
>> *long *newSum = sum.value() + value.f1;sum.update(newSum);
>> return *new* Tuple2<>(value.f0, newSum);
>> }
>>
>> @Override
>> public void open(Configuration config) {
>> counter = getRuntimeContext().getKeyValueState("myCounter", 
>> Long.class, 0L);
>> }}
>>
>>
>>
>> In Scala, you can write this briefly as:
>>
>> val stream: DataStream[(String, Int)] = *...*
>> val counts: DataStream[(String, Int)] = stream
>>   .keyBy(_._1)
>>   .mapWithState((in: (String, Int), sum: Option[Int]) => {*val* newSum = 
>> in._2 + sum.getOrElse(0)
>> ( (in._1, newSum), Some(newSum) )
>>  }
>>
>>
>> Does that help?
>>
>> Thanks also for pointing out the error in the sample code...
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier 
>> wrote:
>>
>>> Hi,
>>>
>>> We are trying to do a test using States but we have not been able to
>>> achieve our desired result. Basically we have a data stream with data as
>>> [{"id":"11","value":123}] and we want to calculate the sum of all values
>>> grouping by ID. We were able to achieve this using windows but not with
>>>  states. The example that is in the documentation (
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
>>> is not very clear and even has some errors, for example:
>>>
>>> public class CounterSum implements RichReduceFunction
>>>
>>> should be
>>>
>>> public 

Re: Working with State example /flink streaming

2015-11-25 Thread Stephan Ewen
Hi Javier!

You can solve this both using windows, or using manual state.

What is better depends a bit on when you want to have the result (the sum).
Do you want a result emitted after each update (or do some other operation
with that value) or do you want only the final sum after a certain time?

For the second variant, I would use a window, for the first variant, you
could use custom state as follows:

For each element, you take the current state for the key, add the value to
get the new sum. Then you update the state with the new sum and emit the
value as well...

Java:

DataStream> stream =
...;DataStream> result = stream.keyBy(0).map(new
RollingSum());


public class RollingSum extends RichMapFunction,
Tuple2> {

private OperatorState sum;

@Override
public Tuple2 map(Tuple2 value) {
*long *newSum = sum.value() + value.f1;sum.update(newSum);
return *new* Tuple2<>(value.f0, newSum);
}

@Override
public void open(Configuration config) {
counter = getRuntimeContext().getKeyValueState("myCounter",
Long.class, 0L);
}}



In Scala, you can write this briefly as:

val stream: DataStream[(String, Int)] = *...*
val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), sum: Option[Int]) => {*val*
newSum = in._2 + sum.getOrElse(0)
( (in._1, newSum), Some(newSum) )
 }


Does that help?

Thanks also for pointing out the error in the sample code...

Greetings,
Stephan


On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier 
wrote:

> Hi,
>
> We are trying to do a test using States but we have not been able to
> achieve our desired result. Basically we have a data stream with data as
> [{"id":"11","value":123}] and we want to calculate the sum of all values
> grouping by ID. We were able to achieve this using windows but not with
>  states. The example that is in the documentation (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state)
> is not very clear and even has some errors, for example:
>
> public class CounterSum implements RichReduceFunction
>
> should be
>
> public class CounterSum extends RichReduceFunction
>
> as RichReduceFuncion is a Class, not an interface.
>
> We wanted to ask you if you have an example of how to use States with
> Flink.
>
> Thanks in advance for your help.
>
>
>