Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

2018-06-12 Thread Piotr Nowojski
Hi,

> Can you elaborate that a little bit? are you referring to 
> "Output> output" in AbstractStreamOperator class?

Yes. However I have never tried it, so I’m not 100% sure there are no pit falls 
with that.

Regarding processing time timers. You should be able to register the timer once 
and then re-register in `onTimer(…)` callback using  `ctx.timerService()`.

Piotrek

> On 11 Jun 2018, at 18:59, Steven Wu  wrote:
> 
> 
> @Override
> public void processElement(Integer value, Context ctx, Collector 
> out) throws Exception {
>ctx.timerService().registerProcessingTimeTimer(...);
> }
> 
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector 
> out) throws Exception {
>// …
> }
> 
> correcting myself regarding the above timer proposal. it still requires a 
> message/record come in. I am trying to guard against when there is a long gap 
> of idle. then I won't be able to register a timer.
> 
> 
> On Mon, Jun 11, 2018 at 9:22 AM, Steven Wu  > wrote:
> Pirotr,
> 
> > However you could do it via a custom Operator (there you have a constant 
> > access to output collector). 
> 
> Can you elaborate that a little bit? are you referring to 
> "Output> output" in AbstractStreamOperator class?
> 
> > register processing time service in your ProcessFunction.
> 
> I think your timer proposal can work. 
> 
> I was originally register timer like this. ProcessingTimeCallback interface 
> doesn't supply the Collector parameter
> 
> ((StreamingRuntimeContext) getRuntimeContext())
> .getProcessingTimeService()
> .registerTimer(..., this);
> 
> Thanks, 
> Steven
> 
> 
> 
> On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski  > wrote:
> Hi,
> 
> Indeed it seems like this is not possible to emit records on 
> checkpoint/snapshot through ProcessFunction. However you could do it via a 
> custom Operator (there you have a constant access to output collector). 
> Another workaround might be to register processing time service in your 
> ProcessFunction.
> 
> @Override
> public void processElement(Integer value, Context ctx, Collector 
> out) throws Exception {
>ctx.timerService().registerProcessingTimeTimer(...);
> }
> 
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector 
> out) throws Exception {
>// …
> }
> 
> Piotrek
> 
>> On 11 Jun 2018, at 01:07, Steven Wu > > wrote:
>> 
>> I have a process function defined with these interfaces
>> 
>> public class MyProcessFunction extends ProcessFunction 
>> implements CheckpointedFunction, ProcessingTimeCallback {...}
>> 
>> In snapshotState() method, I want to close files and emit the metadata about 
>> the closed files to downstream operator. it doesn't seem possible with 
>> snapshotState(FunctionSnapshotContext context) interface.
>> 
>> I can keep metadata in snapshot and restore them during recovery. but if 
>> there is no input record coming for a long time,  processElement(T value, 
>> Context ctx, Collector out) won't be called. Then I can't forward 
>> the restored data to downstream operator with guaranteed latency.
>> 
>> I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) 
>> allows me to forward output to downstream operator either.
>> 
>> Thanks,
>> Steven
> 
> 
> 



Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

2018-06-11 Thread Steven Wu
@Override
public void processElement(Integer value, Context ctx, Collector
out) throws Exception {
   ctx.timerService().registerProcessingTimeTimer(...);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<
Integer> out) throws Exception {
   // …
}

correcting myself regarding the above timer proposal. it still requires a
message/record come in. I am trying to guard against when there is a long
gap of idle. then I won't be able to register a timer.


On Mon, Jun 11, 2018 at 9:22 AM, Steven Wu  wrote:

> Pirotr,
>
> > However you could do it via a custom Operator (there you have a
> constant access to output collector).
>
> Can you elaborate that a little bit? are you referring to
> "Output> output" in AbstractStreamOperator class?
>
> > register processing time service in your ProcessFunction.
>
> I think your timer proposal can work.
>
> I was originally register timer like this. ProcessingTimeCallback
> interface doesn't supply the Collector parameter
>
> ((StreamingRuntimeContext) getRuntimeContext())
> .getProcessingTimeService()
> .registerTimer(..., this);
>
> Thanks,
> Steven
>
>
>
> On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Indeed it seems like this is not possible to emit records on
>> checkpoint/snapshot through ProcessFunction. However you could do it via a
>> custom Operator (there you have a constant access to output collector).
>> Another workaround might be to register processing time service in your
>> ProcessFunction.
>>
>> @Override
>> public void processElement(Integer value, Context ctx, Collector
>> out) throws Exception {
>>ctx.timerService().registerProcessingTimeTimer(...);
>> }
>>
>> @Override
>> public void onTimer(long timestamp, OnTimerContext ctx, Collector> nteger> out) throws Exception {
>>// …
>> }
>>
>> Piotrek
>>
>> On 11 Jun 2018, at 01:07, Steven Wu  wrote:
>>
>> I have a process function defined with these interfaces
>>
>> public class MyProcessFunction extends ProcessFunction
>> implements CheckpointedFunction, ProcessingTimeCallback {...}
>>
>> In snapshotState() method, I want to close files and emit the metadata
>> about the closed files to downstream operator. it doesn't seem possible
>> with *snapshotState(FunctionSnapshotContext context*) interface.
>>
>> I can keep metadata in snapshot and restore them during recovery. but if
>> there is no input record coming for a long time, * processElement(T
>> value, Context ctx, Collector out)* won't be called. Then I
>> can't forward the restored data to downstream operator with guaranteed
>> latency.
>>
>> I can add a timer. but it doesn't seem that *onProcessingTime(long
>> timestamp)* allows me to forward output to downstream operator either.
>>
>> Thanks,
>> Steven
>>
>>
>>
>


Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

2018-06-11 Thread Steven Wu
Pirotr,

> However you could do it via a custom Operator (there you have a constant
access to output collector).

Can you elaborate that a little bit? are you referring to
"Output> output" in AbstractStreamOperator class?

> register processing time service in your ProcessFunction.

I think your timer proposal can work.

I was originally register timer like this. ProcessingTimeCallback interface
doesn't supply the Collector parameter

((StreamingRuntimeContext) getRuntimeContext())
.getProcessingTimeService()
.registerTimer(..., this);

Thanks,
Steven



On Mon, Jun 11, 2018 at 2:52 AM, Piotr Nowojski 
wrote:

> Hi,
>
> Indeed it seems like this is not possible to emit records on
> checkpoint/snapshot through ProcessFunction. However you could do it via a
> custom Operator (there you have a constant access to output collector).
> Another workaround might be to register processing time service in your
> ProcessFunction.
>
> @Override
> public void processElement(Integer value, Context ctx, Collector
> out) throws Exception {
>ctx.timerService().registerProcessingTimeTimer(...);
> }
>
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx, Collector<
> Integer> out) throws Exception {
>// …
> }
>
> Piotrek
>
> On 11 Jun 2018, at 01:07, Steven Wu  wrote:
>
> I have a process function defined with these interfaces
>
> public class MyProcessFunction extends ProcessFunction
> implements CheckpointedFunction, ProcessingTimeCallback {...}
>
> In snapshotState() method, I want to close files and emit the metadata
> about the closed files to downstream operator. it doesn't seem possible
> with *snapshotState(FunctionSnapshotContext context*) interface.
>
> I can keep metadata in snapshot and restore them during recovery. but if
> there is no input record coming for a long time, * processElement(T
> value, Context ctx, Collector out)* won't be called. Then I
> can't forward the restored data to downstream operator with guaranteed
> latency.
>
> I can add a timer. but it doesn't seem that *onProcessingTime(long
> timestamp)* allows me to forward output to downstream operator either.
>
> Thanks,
> Steven
>
>
>


Re: how to emit record to downstream operator in snapshotState and/or onProcessingTime

2018-06-11 Thread Piotr Nowojski
Hi,

Indeed it seems like this is not possible to emit records on 
checkpoint/snapshot through ProcessFunction. However you could do it via a 
custom Operator (there you have a constant access to output collector). Another 
workaround might be to register processing time service in your ProcessFunction.

@Override
public void processElement(Integer value, Context ctx, Collector out) 
throws Exception {
   ctx.timerService().registerProcessingTimeTimer(...);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception {
   // …
}

Piotrek

> On 11 Jun 2018, at 01:07, Steven Wu  wrote:
> 
> I have a process function defined with these interfaces
> 
> public class MyProcessFunction extends ProcessFunction 
> implements CheckpointedFunction, ProcessingTimeCallback {...}
> 
> In snapshotState() method, I want to close files and emit the metadata about 
> the closed files to downstream operator. it doesn't seem possible with 
> snapshotState(FunctionSnapshotContext context) interface.
> 
> I can keep metadata in snapshot and restore them during recovery. but if 
> there is no input record coming for a long time,  processElement(T value, 
> Context ctx, Collector out) won't be called. Then I can't forward 
> the restored data to downstream operator with guaranteed latency.
> 
> I can add a timer. but it doesn't seem that onProcessingTime(long timestamp) 
> allows me to forward output to downstream operator either.
> 
> Thanks,
> Steven



how to emit record to downstream operator in snapshotState and/or onProcessingTime

2018-06-10 Thread Steven Wu
I have a process function defined with these interfaces

public class MyProcessFunction extends ProcessFunction
implements CheckpointedFunction, ProcessingTimeCallback {...}

In snapshotState() method, I want to close files and emit the metadata
about the closed files to downstream operator. it doesn't seem possible
with *snapshotState(FunctionSnapshotContext context*) interface.

I can keep metadata in snapshot and restore them during recovery. but if
there is no input record coming for a long time, * processElement(T value,
Context ctx, Collector out)* won't be called. Then I can't
forward the restored data to downstream operator with guaranteed latency.

I can add a timer. but it doesn't seem that *onProcessingTime(long
timestamp)* allows me to forward output to downstream operator either.

Thanks,
Steven