Impact of BoundedOutOfOrderness on KeyedProcessFunction

2023-10-05 Thread Hou, Lijuan via user
Hi team,

I have one question, hoping to get some help.

Will BoundedOutOfOrderness have any impact on the KeyedProcessFunction? If so, 
in what way it can impact KeyedProcessFunction?

Thanks!

Best,
Lijuan


Re: KeyedProcessFunction within an iteration

2023-02-18 Thread Lorenzo Nicora
Hi Zhipeng

IterativeStreams does have keyBy() methods, but they all throw
UnsupportedOperationException [1]

For some context: the whole thing is to do message enrichment with asyncIO,
caching the enrichment info in state (with TTL).
I am using an iteration as RichAsyncFunction does not support state.
I didn't find a simpler way to do both async IO and caching in state.

Here is a shortened version of the flow

-8<-8<-8<-8<-8<-8<
// Measure, SensorData and EnrichedMeasure are POJOs. The key (sensorId) is
an Integer.

/// flow
DataStream measures = env.addSource(new MeasuresSource());
IterativeStream.ConnectedIterativeStreams iteration =
measures

.iterate().withFeedbackType(TypeInformation.of(SensorData.class));
ConnectedStreams measureAndSensorDataBySensorId =
iteration
// THIS THROWS UnsupportedOperationException
// "Cannot change the input partitioning of an iteration hea
directly. Apply the partitioning on the input and feedback streams instead."
.keyBy(Measure::getSensorId, SensorData::getSensorId);

//CachedEnrichment extends KeyedCoProcessFunction
//It emits cache hit on main output and cache miss on a side-output
CachedEnrichment cachedEnrichment = new CachedEnrichment();
// Try enrichment from cache
SingleOutputStreamOperator cacheHitEnrichedMeasures =
measureAndSensorDataBySensorId
.process(cachedEnrichment);
DataStream cacheMissMeasures = cacheHitEnrichedMeasures
.getSideOutput(cachedEnrichment.cacheMissOutputTag);

// On cache miss fetch SensorData with async IO
SingleOutputStreamOperator>
enrichedMeasuresAndFetchedSensorData =
AsyncDataStream.unorderedWait(
cacheMissMeasures,
//AsyncEnrich extends RichAsyncFunction>
new AsyncEnrich(),
ASYNC_CALL_TIMEOUT, TimeUnit.MILLISECONDS,
ASYNC_OPERATOR_CAPACITY);

// Close the loop with newly fetched SensorData
iteration.closeWith(enrichedMeasuresAndFetchedSensorData.map(t -> t.f1));

// Merge outputs
DataStream allEnrichedMeasures = cacheHitEnrichedMeasures
.union(enrichedMeasuresAndFetchedSensorData.map(t -> t.f0));
-8<-8<-8<-8<-8<-8<

Also, the message of the UnsupportedOperationException thrown by
IterativeStreams.keyBy()
("...Apply the partitioning on the input and feedback streams instead")
does not look right.

I tried that (I made a loop with a single stream of
Either) but it seems there is no way
of processing an IterativeStream with a KeyedProcessFunction, nor to feed
back a KeyedStream into the loop.

-8<-8<-8<-8<-8<-8<

KeyedStream, Integer>
measuresOrSensorDataBySensorId = measures
.map(m -> Either.Left(m)).returns(new
TypeHint<>() {})
.keyBy(msd -> msd.isLeft() ? msd.left().getSensorId() :
msd.right().getSensorId());
IterativeStream> iteration =
measuresOrSensorDataBySensorId.iterate();

CachedEnrichment cachedEnrichment = new CachedEnrichment();
// The following line DOES NOT COMPILE: IterativeStream.process() expects
ProcessFunction, not KeyedProcessFunction
SingleOutputStreamOperator cacheHitEnrichedMeasures =
 iteration.process(cachedEnrichment,
TypeInformation.of(EnrichedMeasure.class));

KeyedStream fetchedSensorDataBySensorId =
enrichedMeasureAndFetchedSensorData
.map(t -> t.f1).keyBy(SensorData::getSensorId);
// The following line DOES NOT COMPILE: closeWith() does not expect
KeyedStream
iteration.closeWith(fetchedSensorDataBySensorId);
-8<-8<-8<-8<-8<-8<

I will have a look at the iteration module you mentioned.
I wasn't aware.

Thanks
Lorenzo

[1]
https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java#L196


On Sat, 18 Feb 2023 at 12:38, Zhipeng Zhang  wrote:

> Hi Lorenzo,
>
> Could you provide some code example to reproduce your question? As I
> understand, IterativeStream#keyBy is supported since it is a subclass
> of DataStream.
>
> Moreover, we have implemented an unified iteration module for Flink
> [1] in Flink ML [2], which relies on Flink 1.15.2. Probably you can
> also have a try.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300
> [2]
> https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
>
> Lorenzo Nicora  于2023年2月18日周六 17:00写道:
> >
> > Hi all,
> >
> > I am trying to implement an iterative streaming job that processes the
> loop with a KeyedProcessFunction.
> >
> > I need a KeyedProcessFunction to use keyed state and to emit a
> side-output (that aft

Re: KeyedProcessFunction within an iteration

2023-02-18 Thread Zhipeng Zhang
Hi Lorenzo,

Could you provide some code example to reproduce your question? As I
understand, IterativeStream#keyBy is supported since it is a subclass
of DataStream.

Moreover, we have implemented an unified iteration module for Flink
[1] in Flink ML [2], which relies on Flink 1.15.2. Probably you can
also have a try.

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300
[2] 
https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java

Lorenzo Nicora  于2023年2月18日周六 17:00写道:
>
> Hi all,
>
> I am trying to implement an iterative streaming job that processes the loop 
> with a KeyedProcessFunction.
>
> I need a KeyedProcessFunction to use keyed state and to emit a side-output 
> (that after further transformations becomes the feedback)
>
> Problem is IterativeStream.process() only accepts ProcessFunction, no 
> KeyedProcessFunction.
>
> The main and feedback streams have the same payload type, and I am keying 
> both before starting and closing the iteration.
> I understand I cannot re-key after starting the iteration, as IterativeStream 
> does not support keyBy() and throws an UnsupportedOperationException "Cannot 
> change the input partitioning of an iteration head directly. Apply the 
> partitioning on the input and feedback streams instead."
>
> Is there any way of using keyed state within an iteration?
> BTW,
> I am using Flink 1.15.2 and I am bound to that version
>
> Regards
> Lorenzo



-- 
best,
Zhipeng


KeyedProcessFunction within an iteration

2023-02-18 Thread Lorenzo Nicora
Hi all,

I am trying to implement an iterative streaming job that processes the loop
with a KeyedProcessFunction.

I need a KeyedProcessFunction to use keyed state and to emit a side-output
(that after further transformations becomes the feedback)

Problem is IterativeStream.process() only accepts ProcessFunction, no
KeyedProcessFunction.

The main and feedback streams have the same payload type, and I am keying
both before starting and closing the iteration.
I understand I cannot re-key after starting the iteration, as
IterativeStream does not support keyBy() and throws an
UnsupportedOperationException "Cannot change the input partitioning of an
iteration head directly. Apply the partitioning on the input and feedback
streams instead."

Is there any way of using keyed state within an iteration?
BTW,
I am using Flink 1.15.2 and I am bound to that version

Regards
Lorenzo


Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-23 Thread Binil Benjamin
Yes, restarting the app with a clean state does seem to fix the issue, but
I think I may have found a bug in Flink.

Here's how we can replicate it:
- Create a simple application with KeyedProcessFunction (with onTimer())
- Send a few records with the same key. In processElement(), register a
timer for each of these records:

 
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
+ 1000)
- The onTimer() gets triggered as expected for the above timer registrations
- Now for the same key, register a timer with a negative value (say, when
value = some_special_value)
   ctx.timerService().registerProcessingTimeTimer(Long.MIN_VALUE)
- Now send more records with same key and register regular timers:

 
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
+ 1000)
- As originally reported in this mail, the newly registered timers don't
get triggered
- Now delete the only timer with negative timestamp
   ctx.timerService().deleteProcessingTimeTimer(Long.MIN_VALUE);
- Now send more records with same key and register regular timers:

 
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
+ 1000)
- The new timers still don't get triggered.

I expected the new timers to go off after deleting the bad timer, but it
looks like there is no way to recover once a bad timer is registered. Could
this be a Flink bug?

Thanks!

On Tue, Mar 22, 2022 at 11:38 PM yu'an huang  wrote:

> [ External sender. Exercise caution. ]
>
> After fixing your negative timestamp bug, can the timer be triggered?
>
>
>
>
> On 23 Mar 2022, at 2:39 AM, Binil Benjamin  wrote:
>
> Here are some more findings as I was debugging this. I peeked into the
> snapshot to see the current values in
> "_timer_state/processing_user-timers" and here is how they look:
>
> Timer{timestamp=-9223372036854715808, key=(FFX22...),
> namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22...),
> namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22...),
> namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22), namespace=VoidNamespace}
> Timer{timestamp=1644897305245, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1644998232084, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1645730447266, key=(FFX1...), namespace=VoidNamespace}
> Timer{timestamp=1645742358651, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1645743288774, key=(FFX22...), namespace=VoidNamespace}
> ...
>
> As you can see, the priorityQueue has some negative values (there was a
> bug in our code at some point that added these negative values). Could this
> be the root cause of why the timer is not getting triggered?
>
> Thanks!
>
> On Fri, Mar 18, 2022 at 6:50 PM Binil Benjamin 
> wrote:
>
>> Hi,
>>
>> Parallelism is currently set to 9 and it appears to be occurring for all
>> subtasks.
>>
>> We did put logs to see the various timestamps. The following logs are
>> from the last 5 days.
>>
>> - logs from processElement() - logged immediately after timer
>> registration:
>> "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229483281, Current
>> step duration=6, Current processing time=1647229483281, Next trigger
>> time=1647229543281,
>> CurrentKey=(FFX22OJAEAA,d7d337b6-9dbc-4898-9001-4d10f2cd9796e1b9ba96-b4b6-3472-94ae-8a1d3e3d9ce1:::d7d337b6-9dbc-4898-9001-4d10f2cd9796:::service:none:)",
>> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>> "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229512107, Current
>> step duration=6, Current processing time=1647229512107, Next trigger
>> time=1647229572107,
>> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a44bbe719-2ed4-33c9-aeb7-e7cb5f61dd91:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::service:none:)",
>> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>> "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229543475, Current
>> step duration=6, Current processing time=1647229543475, Next trigger
>> time=1647229603475,
>> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a0bea4f14-9f23-3368-b74f-531a67fbe9f0:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::d11f4623-dab0-3232-92aa-341628e96330)",
>> "threadName

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-23 Thread yu'an huang
After fixing your negative timestamp bug, can the timer be triggered?




> On 23 Mar 2022, at 2:39 AM, Binil Benjamin  wrote:
> 
> Here are some more findings as I was debugging this. I peeked into the 
> snapshot to see the current values in "_timer_state/processing_user-timers" 
> and here is how they look:
> 
> Timer{timestamp=-9223372036854715808, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22), namespace=VoidNamespace}
> Timer{timestamp=1644897305245, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1644998232084, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1645730447266, key=(FFX1...), namespace=VoidNamespace}
> Timer{timestamp=1645742358651, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1645743288774, key=(FFX22...), namespace=VoidNamespace}
> ...
> 
> As you can see, the priorityQueue has some negative values (there was a bug 
> in our code at some point that added these negative values). Could this be 
> the root cause of why the timer is not getting triggered?
> 
> Thanks!
> 
> On Fri, Mar 18, 2022 at 6:50 PM Binil Benjamin  > wrote:
> Hi, 
> 
> Parallelism is currently set to 9 and it appears to be occurring for all 
> subtasks.
> 
> We did put logs to see the various timestamps. The following logs are from 
> the last 5 days.
> 
> - logs from processElement() - logged immediately after timer registration:
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647229483281, Current 
> step duration=6, Current processing time=1647229483281, Next trigger 
> time=1647229543281, 
> CurrentKey=(FFX22OJAEAA,d7d337b6-9dbc-4898-9001-4d10f2cd9796e1b9ba96-b4b6-3472-94ae-8a1d3e3d9ce1:::d7d337b6-9dbc-4898-9001-4d10f2cd9796:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (7/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647229512107, Current 
> step duration=6, Current processing time=1647229512107, Next trigger 
> time=1647229572107, 
> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a44bbe719-2ed4-33c9-aeb7-e7cb5f61dd91:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (2/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647229543475, Current 
> step duration=6, Current processing time=1647229543475, Next trigger 
> time=1647229603475, 
> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a0bea4f14-9f23-3368-b74f-531a67fbe9f0:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::d11f4623-dab0-3232-92aa-341628e96330)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (8/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647229633747, Current 
> step duration=6, Current processing time=1647229633747, Next trigger 
> time=1647229693746, 
> CurrentKey=(FFX22OJAEAA,0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd50fe795a7-64db-3350-b56e-37400b19ae07:::0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd5:::743f32f2-4a6c-315b-9850-9992b88f2b67)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (9/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647236501795, Current 
> step duration=6, Current processing time=1647236501795, Next trigger 
> time=1647236561795, 
> CurrentKey=(FFX22OJAEAA,4b6fbc31-5f41-45c3-aa08-4f865062e2a2dae46709-ff86-35d1-a830-1c01888a4cde:::4b6fbc31-5f41-45c3-aa08-4f865062e2a2:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (3/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647236513004, Current 
> step duration=6, Current processing time=1647236513004, Next trigger 
> time=1647236573004, 
> CurrentKey=(FFX22OJAEAA,90ba0c88-0a1e-43b5-8e3a-65613ccd7943e4c9234f-5f83-3ef6-8b22-28224d070404:::90ba0c88-0a1e-43b5-8e3a-65613ccd7943:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (2/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647236561848, Current 
> step duration=6, Current processing time=1647236561848, Next trigger 
> time=1647236621848, 
> CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9d0a3e195-56b2-3242-9d03-326ccbfbc040:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::service:none:)",
> 

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-22 Thread Binil Benjamin
Here are some more findings as I was debugging this. I peeked into the
snapshot to see the current values in
"_timer_state/processing_user-timers" and here is how they look:

Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FFX22), namespace=VoidNamespace}
Timer{timestamp=1644897305245, key=(FFX22...), namespace=VoidNamespace}
Timer{timestamp=1644998232084, key=(FFX22...), namespace=VoidNamespace}
Timer{timestamp=1645730447266, key=(FFX1...), namespace=VoidNamespace}
Timer{timestamp=1645742358651, key=(FFX22...), namespace=VoidNamespace}
Timer{timestamp=1645743288774, key=(FFX22...), namespace=VoidNamespace}
...

As you can see, the priorityQueue has some negative values (there was a bug
in our code at some point that added these negative values). Could this be
the root cause of why the timer is not getting triggered?

Thanks!

On Fri, Mar 18, 2022 at 6:50 PM Binil Benjamin  wrote:

> Hi,
>
> Parallelism is currently set to 9 and it appears to be occurring for all
> subtasks.
>
> We did put logs to see the various timestamps. The following logs are from
> the last 5 days.
>
> - logs from processElement() - logged immediately after timer registration:
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229483281, Current
> step duration=6, Current processing time=1647229483281, Next trigger
> time=1647229543281,
> CurrentKey=(FFX22OJAEAA,d7d337b6-9dbc-4898-9001-4d10f2cd9796e1b9ba96-b4b6-3472-94ae-8a1d3e3d9ce1:::d7d337b6-9dbc-4898-9001-4d10f2cd9796:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (7/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229512107, Current
> step duration=6, Current processing time=1647229512107, Next trigger
> time=1647229572107,
> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a44bbe719-2ed4-33c9-aeb7-e7cb5f61dd91:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (2/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229543475, Current
> step duration=6, Current processing time=1647229543475, Next trigger
> time=1647229603475,
> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a0bea4f14-9f23-3368-b74f-531a67fbe9f0:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::d11f4623-dab0-3232-92aa-341628e96330)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (8/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229633747, Current
> step duration=6, Current processing time=1647229633747, Next trigger
> time=1647229693746,
> CurrentKey=(FFX22OJAEAA,0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd50fe795a7-64db-3350-b56e-37400b19ae07:::0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd5:::743f32f2-4a6c-315b-9850-9992b88f2b67)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (9/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236501795, Current
> step duration=6, Current processing time=1647236501795, Next trigger
> time=1647236561795,
> CurrentKey=(FFX22OJAEAA,4b6fbc31-5f41-45c3-aa08-4f865062e2a2dae46709-ff86-35d1-a830-1c01888a4cde:::4b6fbc31-5f41-45c3-aa08-4f865062e2a2:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (3/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236513004, Current
> step duration=6, Current processing time=1647236513004, Next trigger
> time=1647236573004,
> CurrentKey=(FFX22OJAEAA,90ba0c88-0a1e-43b5-8e3a-65613ccd7943e4c9234f-5f83-3ef6-8b22-28224d070404:::90ba0c88-0a1e-43b5-8e3a-65613ccd7943:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (2/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236561848, Current
> step duration=6, Current processing time=1647236561848, Next trigger
> time=1647236621848,
> CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9d0a3e195-56b2-3242-9d03-326ccbfbc040:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (4/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236591875, Current
> step duration=6, 

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Binil Benjamin
Hi,

Parallelism is currently set to 9 and it appears to be occurring for all
subtasks.

We did put logs to see the various timestamps. The following logs are from
the last 5 days.

- logs from processElement() - logged immediately after timer registration:
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647229483281, Current
step duration=6, Current processing time=1647229483281, Next trigger
time=1647229543281,
CurrentKey=(FFX22OJAEAA,d7d337b6-9dbc-4898-9001-4d10f2cd9796e1b9ba96-b4b6-3472-94ae-8a1d3e3d9ce1:::d7d337b6-9dbc-4898-9001-4d10f2cd9796:::service:none:)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(7/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647229512107, Current
step duration=6, Current processing time=1647229512107, Next trigger
time=1647229572107,
CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a44bbe719-2ed4-33c9-aeb7-e7cb5f61dd91:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::service:none:)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(2/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647229543475, Current
step duration=6, Current processing time=1647229543475, Next trigger
time=1647229603475,
CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a0bea4f14-9f23-3368-b74f-531a67fbe9f0:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::d11f4623-dab0-3232-92aa-341628e96330)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(8/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647229633747, Current
step duration=6, Current processing time=1647229633747, Next trigger
time=1647229693746,
CurrentKey=(FFX22OJAEAA,0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd50fe795a7-64db-3350-b56e-37400b19ae07:::0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd5:::743f32f2-4a6c-315b-9850-9992b88f2b67)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(9/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647236501795, Current
step duration=6, Current processing time=1647236501795, Next trigger
time=1647236561795,
CurrentKey=(FFX22OJAEAA,4b6fbc31-5f41-45c3-aa08-4f865062e2a2dae46709-ff86-35d1-a830-1c01888a4cde:::4b6fbc31-5f41-45c3-aa08-4f865062e2a2:::service:none:)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(3/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647236513004, Current
step duration=6, Current processing time=1647236513004, Next trigger
time=1647236573004,
CurrentKey=(FFX22OJAEAA,90ba0c88-0a1e-43b5-8e3a-65613ccd7943e4c9234f-5f83-3ef6-8b22-28224d070404:::90ba0c88-0a1e-43b5-8e3a-65613ccd7943:::service:none:)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(2/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647236561848, Current
step duration=6, Current processing time=1647236561848, Next trigger
time=1647236621848,
CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9d0a3e195-56b2-3242-9d03-326ccbfbc040:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::service:none:)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(4/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647236591875, Current
step duration=6, Current processing time=1647236591875, Next trigger
time=1647236651875,
CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9eb796957-ef3a-3b67-8e63-8ba136e1b86d:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::45f60cdb-7cc9-3e5a-ace7-b0ca50b6c230)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(4/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647236761584, Current
step duration=6, Current processing time=1647236761584, Next trigger
time=1647236821584,
CurrentKey=(FFX22OJAEAA,585594a0-9421-4719-97bd-34920582cd260fea860d-93a3-3514-b470-4bd49670a298:::585594a0-9421-4719-97bd-34920582cd26:::e9e49df8-30d4-3dc1-93df-64024609acc3)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(2/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647241184750, Current
step duration=6, Current processing time=1647241184750, Next trigger
time=1647241244750,
CurrentKey=(FFX22OJAEAA,45d4124e-675d-4d5c-a8fe-715038032bd8e920e91b-e4c2-310f-82d5-a64768f32035:::45d4124e-675d-4d5c-a8fe-715038032bd8:::9634ad13-c121-33b8-87b0-b71e8dfe4f77)",

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Yun Gao
Hi Binil,

I think the code itself also looks good to me. May I have a double confirmation 
on the 
details of the issue:
1. What is the parallelism of this operator, and does the issues occurs for all 
the subtasks?
2. Have we already added some logs in the processElement and onTimer to print 
the time of 
registered processing timer and the time of the callbacks? Could you also share 
this part of result?

Best,
Yun Gao


--
From:Binil Benjamin 
Send Time:2022 Mar. 18 (Fri.) 16:07
To:"yu'an huang" 
Cc:user 
Subject:Re: onTimer() of a KeyedProcessFunction stops getting triggered after a 
while

Hi,

Unfortunately, I cannot share the entire code, but the class roughly looks like 
this:

public class WfProcessFunction extends KeyedProcessFunction, Map, Map> {

@Override
public void processElement(Map inputRecord,
Context context, Collector> collector) throws 
Exception {
...

context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()
 + 5 * TimeUnit.SECONDS.toMillis(1L));
...
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector> out) throws Exception {
...
} 
}
Thanks!
On Thu, Mar 17, 2022 at 9:24 PM yu'an huang  wrote:
[ External sender. Exercise caution. ]

 Hi, can you share your code so we can check whether it is written correctly.



 > On 18 Mar 2022, at 7:54 AM, Binil Benjamin  wrote:
 > 
 > Hi,
 > 
 > We have a class that extends KeyedProcessFunction and overrides onTimer() 
 > method. During processElement(), we register a timer callback using 
 > context.timerService().registerProcessingTimeTimer(). For 
 > a while, we see that the onTimer() method is getting called back and 
 > everything works as expected; however, after a while, the onTimer() stops 
 > getting any callbacks from Flink (the registration of the timer via. 
 > registerProcessingTimeTimer() is working just fine). Does anyone know what 
 > could be wrong here and how we can debug this?
 > 
 > Flink version is 1.13.2 (running on AWS KDA)
 > 
 > Thanks!





Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Binil Benjamin
Hi,

Unfortunately, I cannot share the entire code, but the class roughly looks
like this:

 public class WfProcessFunction extends KeyedProcessFunction, Map, Map> {

@Override
public void processElement(Map inputRecord,
Context context, Collector> collector) throws
Exception {
...

context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()
+ 5 * TimeUnit.SECONDS.toMillis(1L));
...
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector> out) throws Exception {
...
}
}

Thanks!

On Thu, Mar 17, 2022 at 9:24 PM yu'an huang  wrote:

> [ External sender. Exercise caution. ]
>
> Hi, can you share your code so we can check whether it is written
> correctly.
>
>
>
> > On 18 Mar 2022, at 7:54 AM, Binil Benjamin  wrote:
> >
> > Hi,
> >
> > We have a class that extends KeyedProcessFunction and overrides
> onTimer() method. During processElement(), we register a timer callback
> using
> context.timerService().registerProcessingTimeTimer(). For
> a while, we see that the onTimer() method is getting called back and
> everything works as expected; however, after a while, the onTimer() stops
> getting any callbacks from Flink (the registration of the timer via.
> registerProcessingTimeTimer() is working just fine). Does anyone know what
> could be wrong here and how we can debug this?
> >
> > Flink version is 1.13.2 (running on AWS KDA)
> >
> > Thanks!
>
>
>


Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-17 Thread yu'an huang
Hi, can you share your code so we can check whether it is written correctly.



> On 18 Mar 2022, at 7:54 AM, Binil Benjamin  wrote:
> 
> Hi,
> 
> We have a class that extends KeyedProcessFunction and overrides onTimer() 
> method. During processElement(), we register a timer callback using 
> context.timerService().registerProcessingTimeTimer(). For a 
> while, we see that the onTimer() method is getting called back and everything 
> works as expected; however, after a while, the onTimer() stops getting any 
> callbacks from Flink (the registration of the timer via. 
> registerProcessingTimeTimer() is working just fine). Does anyone know what 
> could be wrong here and how we can debug this?
> 
> Flink version is 1.13.2 (running on AWS KDA)
> 
> Thanks!



onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-17 Thread Binil Benjamin
Hi,

We have a class that extends KeyedProcessFunction and overrides onTimer()
method. During processElement(), we register a timer callback using
context.timerService().registerProcessingTimeTimer(). For
a while, we see that the onTimer() method is getting called back and
everything works as expected; however, after a while, the onTimer() stops
getting any callbacks from Flink (the registration of the timer via.
registerProcessingTimeTimer() is working just fine). Does anyone know what
could be wrong here and how we can debug this?

Flink version is 1.13.2 (running on AWS KDA)

Thanks!


flink KeyedProcessFunction ????????

2021-06-16 Thread ????
??KeyedProcessFunctionprocessElementKeyBy??processElement100

RE: Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi Zhang,

Please find the code snippet.

private ReducingState aggrRecord; // record being aggregated

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {
// FIXME timer is not working? or re-registration not working?
NatLogData event = aggrRecord.get();   // Always get null value.


Thanks,
Suchithra


From: JING ZHANG 
Sent: Wednesday, June 9, 2021 2:20 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: user@flink.apache.org; Jash, Shaswata (Nokia - IN/Bangalore) 

Subject: Re: Issue with onTimer method of KeyedProcessFunction

Hi Suchithra,
Would you please provide more information in detail or paste the main code?

Best regards,
JING ZHANG

V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 于2021年6月9日周三 下午3:42写道:
Hello,

We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3 
version. After upgrading to 1.12.3 version,  the onTimer method of  
KeyedProcessFunction is not behaving correctly, the value of ReducingState and 
ValueState always return null.

Could you please help in debugging the issue.

Thanks,
Suchithra



RE: Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi Zhang,

Please find the code snippet.

private ReducingState aggrRecord; // record being aggregated

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {
// FIXME timer is not working? or re-registration not working?
NatLogData event = aggrRecord.get();   // Always get null value.


Thanks,
Suchithra

From: JING ZHANG 
Sent: Wednesday, June 9, 2021 2:20 PM
To: V N, Suchithra (Nokia - IN/Bangalore) 
Cc: user@flink.apache.org; Jash, Shaswata (Nokia - IN/Bangalore) 

Subject: Re: Issue with onTimer method of KeyedProcessFunction

Hi Suchithra,
Would you please provide more information in detail or paste the main code?

Best regards,
JING ZHANG

V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> 于2021年6月9日周三 下午3:42写道:
Hello,

We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3 
version. After upgrading to 1.12.3 version,  the onTimer method of  
KeyedProcessFunction is not behaving correctly, the value of ReducingState and 
ValueState always return null.

Could you please help in debugging the issue.

Thanks,
Suchithra



Re: Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread JING ZHANG
Hi Suchithra,
Would you please provide more information in detail or paste the main code?

Best regards,
JING ZHANG

V N, Suchithra (Nokia - IN/Bangalore) 
于2021年6月9日周三 下午3:42写道:

> Hello,
>
>
>
> We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3
> version. After upgrading to 1.12.3 version,  the onTimer method of
>  KeyedProcessFunction is not behaving correctly, the value of ReducingState
> and ValueState always return null.
>
>
>
> Could you please help in debugging the issue.
>
>
>
> Thanks,
>
> Suchithra
>
>
>


Issue with onTimer method of KeyedProcessFunction

2021-06-09 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hello,

We are using apache flink 1.7 and now trying to upgrade to flink 1.12.3 
version. After upgrading to 1.12.3 version,  the onTimer method of  
KeyedProcessFunction is not behaving correctly, the value of ReducingState and 
ValueState always return null.

Could you please help in debugging the issue.

Thanks,
Suchithra



Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-10 Thread Miguel Araújo
I've realized this is not such a big issue because it's also upper bounded
by the number of watermarks received, and it won't be one per event.

Miguel Araújo  escreveu no dia segunda,
10/05/2021 à(s) 09:39:

> Thanks Dawid, having a look at CepOperator was useful. I implemented
> something with one difference I feel might be important:
>
> I noticed that in the CepOperator the timer is being registered for
> currentWatermark+1, instead of using the event's timestamp. Is there a
> reason for this? I think this implies a quadratic number of triggers, on
> the number of keys with events that arrived after the current watermark.
> For instance, if you have 1000 events per second on different keys (and
> different timestamps), a watermark that is delayed 1 second will fire ~1
> million times. Is this a requirement to the NFA implementation? Would this
> not be a problem?
>
> Thanks, once again.
>
> Dawid Wysakowicz  escreveu no dia segunda,
> 10/05/2021 à(s) 09:13:
>
>> Hey Miguel,
>>
>> I think you could take a look at the CepOperator which does pretty much
>> what you are describing.
>>
>> As for more direct answers for your questions. If you use
>> KeyedProcessFunction it is always scoped to a single Key. There is no way
>> to process events from other keys. If you want to have more control over
>> state and e.g. use PriorityQueue which would be snapshotted on checkpoint
>> you could look into using Operator API. Bare in mind it is a semi-public
>> API. It is very low level and subject to change rather frequently. Another
>> thing to consider is that if you use PriorityQueue instead of e.g. MapState
>> for buffering and ordering events you are constrained by the available
>> memory. We used PriorityQueue in the past in the CepOperator but migrated
>> it to MapState.
>>
>> It is possible that events in downstream operators can become late. It
>> all depends on the timestamp of the events you emit from the "sorting"
>> operator. If you emit records with timestamps larger than the Watermark
>> that "triggered" its generation it can become late.
>>
>> Hope those tips could help you a bit.
>>
>> Best,
>>
>> Dawid
>> On 04/05/2021 14:49, Miguel Araújo wrote:
>>
>> Hi Timo,
>>
>> Thanks for your answer. I think I wasn't clear enough in my initial
>> message, so let me give more details.
>>
>> The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
>> user-id) and then fed into a KeyedProcessFunction. I want to process all
>> events for a given user in order, before sending them downstream for
>> further processing in other operators. I don't want to hold events longer
>> than needed, hence using the watermark to signal which events can be
>> processed.
>> I don't think your suggestion of using a ListState would work, because we
>> would effectively have one list per user. That would imply (among other
>> things) that an event could only be processed when a new event for the same
>> user arrives, which would not only imply a (potentially) huge latency, but
>> also data leakage. Not to mention that the events being sent could easily
>> be considered late-events to the downstream operators.
>> The idea of keying by timestamp was an "evolution" of the ListState
>> suggestion, where events-to-be-later-processed would be kept sorted in the
>> map (which is what would be keyed by timestamp). We could iterate the map
>> to process the events, instead of fetching the full list and sorting it to
>> process the events in order. I don't think this solves any of the problems
>> mentioned above, so I think that mentioning it only raised confusion.
>>
>> Regarding global event-time order, that's not really what I'm after. I
>> only need event-time order per key, but I want to process the event as soon
>> as possible, constrained by knowing that it is "safe" to do so because no
>> event with a smaller timestamp for this key is yet to come.
>>
>> So, rephrasing my question as I'm not sure that part was clear in the
>> initial message, here is the idea:
>> - keeping one priority queue (ordered by timestamp) in each
>> KeyedProcessFunction instance. Therefore, each priority queue would store
>> events for multiple keys.
>> - when an event arrives, we push it to the queue and then process events
>> (updating state and sending them downstream) while their timestamp is lower
>> than the current watermark.
>>
>> The question is:
>> - is this fault tolerant? The priority queue is not state that is managed
>> 

Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-10 Thread Miguel Araújo
Thanks Dawid, having a look at CepOperator was useful. I implemented
something with one difference I feel might be important:

I noticed that in the CepOperator the timer is being registered for
currentWatermark+1, instead of using the event's timestamp. Is there a
reason for this? I think this implies a quadratic number of triggers, on
the number of keys with events that arrived after the current watermark.
For instance, if you have 1000 events per second on different keys (and
different timestamps), a watermark that is delayed 1 second will fire ~1
million times. Is this a requirement to the NFA implementation? Would this
not be a problem?

Thanks, once again.

Dawid Wysakowicz  escreveu no dia segunda,
10/05/2021 à(s) 09:13:

> Hey Miguel,
>
> I think you could take a look at the CepOperator which does pretty much
> what you are describing.
>
> As for more direct answers for your questions. If you use
> KeyedProcessFunction it is always scoped to a single Key. There is no way
> to process events from other keys. If you want to have more control over
> state and e.g. use PriorityQueue which would be snapshotted on checkpoint
> you could look into using Operator API. Bare in mind it is a semi-public
> API. It is very low level and subject to change rather frequently. Another
> thing to consider is that if you use PriorityQueue instead of e.g. MapState
> for buffering and ordering events you are constrained by the available
> memory. We used PriorityQueue in the past in the CepOperator but migrated
> it to MapState.
>
> It is possible that events in downstream operators can become late. It all
> depends on the timestamp of the events you emit from the "sorting"
> operator. If you emit records with timestamps larger than the Watermark
> that "triggered" its generation it can become late.
>
> Hope those tips could help you a bit.
>
> Best,
>
> Dawid
> On 04/05/2021 14:49, Miguel Araújo wrote:
>
> Hi Timo,
>
> Thanks for your answer. I think I wasn't clear enough in my initial
> message, so let me give more details.
>
> The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
> user-id) and then fed into a KeyedProcessFunction. I want to process all
> events for a given user in order, before sending them downstream for
> further processing in other operators. I don't want to hold events longer
> than needed, hence using the watermark to signal which events can be
> processed.
> I don't think your suggestion of using a ListState would work, because we
> would effectively have one list per user. That would imply (among other
> things) that an event could only be processed when a new event for the same
> user arrives, which would not only imply a (potentially) huge latency, but
> also data leakage. Not to mention that the events being sent could easily
> be considered late-events to the downstream operators.
> The idea of keying by timestamp was an "evolution" of the ListState
> suggestion, where events-to-be-later-processed would be kept sorted in the
> map (which is what would be keyed by timestamp). We could iterate the map
> to process the events, instead of fetching the full list and sorting it to
> process the events in order. I don't think this solves any of the problems
> mentioned above, so I think that mentioning it only raised confusion.
>
> Regarding global event-time order, that's not really what I'm after. I
> only need event-time order per key, but I want to process the event as soon
> as possible, constrained by knowing that it is "safe" to do so because no
> event with a smaller timestamp for this key is yet to come.
>
> So, rephrasing my question as I'm not sure that part was clear in the
> initial message, here is the idea:
> - keeping one priority queue (ordered by timestamp) in each
> KeyedProcessFunction instance. Therefore, each priority queue would store
> events for multiple keys.
> - when an event arrives, we push it to the queue and then process events
> (updating state and sending them downstream) while their timestamp is lower
> than the current watermark.
>
> The question is:
> - is this fault tolerant? The priority queue is not state that is managed
> by flink, but it should be recoverable on replay.
> - is it possible that the events I'm sending downstream become late-events
> for a different operator, for some reason? Will they always be sent before
> the watermark of the event that originated the processElement() call?
> - I would effectively be processing multiple elements (from multiple keys)
> in the same call to processElement(). Is there a way to access the state of
> different keys?
>
> This doesn't feel like the right approach. Is there an operator more
> suitable than a KeyedProces

Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-10 Thread Dawid Wysakowicz
Hey Miguel,

I think you could take a look at the CepOperator which does pretty much
what you are describing.

As for more direct answers for your questions. If you use
KeyedProcessFunction it is always scoped to a single Key. There is no
way to process events from other keys. If you want to have more control
over state and e.g. use PriorityQueue which would be snapshotted on
checkpoint you could look into using Operator API. Bare in mind it is a
semi-public API. It is very low level and subject to change rather
frequently. Another thing to consider is that if you use PriorityQueue
instead of e.g. MapState for buffering and ordering events you are
constrained by the available memory. We used PriorityQueue in the past
in the CepOperator but migrated it to MapState.

It is possible that events in downstream operators can become late. It
all depends on the timestamp of the events you emit from the "sorting"
operator. If you emit records with timestamps larger than the Watermark
that "triggered" its generation it can become late.

Hope those tips could help you a bit.

Best,

Dawid

On 04/05/2021 14:49, Miguel Araújo wrote:
> Hi Timo,
>
> Thanks for your answer. I think I wasn't clear enough in my initial
> message, so let me give more details.
>
> The stream is not keyed by timestamp, it's keyed by a custom field
> (e.g., user-id) and then fed into a KeyedProcessFunction. I want to
> process all events for a given user in order, before sending them
> downstream for further processing in other operators. I don't want to
> hold events longer than needed, hence using the watermark to signal
> which events can be processed.
> I don't think your suggestion of using a ListState would work, because
> we would effectively have one list per user. That would imply (among
> other things) that an event could only be processed when a new event
> for the same user arrives, which would not only imply a (potentially)
> huge latency, but also data leakage. Not to mention that the events
> being sent could easily be considered late-events to the downstream
> operators.
> The idea of keying by timestamp was an "evolution" of the ListState
> suggestion, where events-to-be-later-processed would be kept sorted in
> the map (which is what would be keyed by timestamp). We could iterate
> the map to process the events, instead of fetching the full list and
> sorting it to process the events in order. I don't think this solves
> any of the problems mentioned above, so I think that mentioning it
> only raised confusion.
>
> Regarding global event-time order, that's not really what I'm after. I
> only need event-time order per key, but I want to process the event as
> soon as possible, constrained by knowing that it is "safe" to do so
> because no event with a smaller timestamp for this key is yet to come.
>
> So, rephrasing my question as I'm not sure that part was clear in the
> initial message, here is the idea:
> - keeping one priority queue (ordered by timestamp) in each
> KeyedProcessFunction instance. Therefore, each priority queue would
> store events for multiple keys.
> - when an event arrives, we push it to the queue and then process
> events (updating state and sending them downstream) while their
> timestamp is lower than the current watermark.
>
> The question is:
> - is this fault tolerant? The priority queue is not state that is
> managed by flink, but it should be recoverable on replay.
> - is it possible that the events I'm sending downstream become
> late-events for a different operator, for some reason? Will they
> always be sent before the watermark of the event that originated the
> processElement() call?
> - I would effectively be processing multiple elements (from multiple
> keys) in the same call to processElement(). Is there a way to access
> the state of different keys?
>
> This doesn't feel like the right approach. Is there an operator more
> suitable than a KeyedProcessFunction which would allow me to handle
> the state for multiple keys in this task manager? Should I register a
> timer to trigger on the event timestamp instead? I believe timers
> trigger on watermarks, so that could theoretically work, although it
> feels a little weird. After all, what I want is just to buffer events
> so that they are only processed when the watermark has caught up to them.
>
> Thanks
>
> Timo Walther mailto:twal...@apache.org>> escreveu
> no dia sexta, 30/04/2021 à(s) 12:05:
>
> Hi Miguel,
>
> your initial idea sounds not too bad but why do you want to key by
> timestamp? Usually, you can simply key your stream by a custom key
> and
> store the events in a ListState until a watermark comes in.
>
> But if you really want t

Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-04 Thread Miguel Araújo
Hi Timo,

Thanks for your answer. I think I wasn't clear enough in my initial
message, so let me give more details.

The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
user-id) and then fed into a KeyedProcessFunction. I want to process all
events for a given user in order, before sending them downstream for
further processing in other operators. I don't want to hold events longer
than needed, hence using the watermark to signal which events can be
processed.
I don't think your suggestion of using a ListState would work, because we
would effectively have one list per user. That would imply (among other
things) that an event could only be processed when a new event for the same
user arrives, which would not only imply a (potentially) huge latency, but
also data leakage. Not to mention that the events being sent could easily
be considered late-events to the downstream operators.
The idea of keying by timestamp was an "evolution" of the ListState
suggestion, where events-to-be-later-processed would be kept sorted in the
map (which is what would be keyed by timestamp). We could iterate the map
to process the events, instead of fetching the full list and sorting it to
process the events in order. I don't think this solves any of the problems
mentioned above, so I think that mentioning it only raised confusion.

Regarding global event-time order, that's not really what I'm after. I only
need event-time order per key, but I want to process the event as soon as
possible, constrained by knowing that it is "safe" to do so because no
event with a smaller timestamp for this key is yet to come.

So, rephrasing my question as I'm not sure that part was clear in the
initial message, here is the idea:
- keeping one priority queue (ordered by timestamp) in each
KeyedProcessFunction instance. Therefore, each priority queue would store
events for multiple keys.
- when an event arrives, we push it to the queue and then process events
(updating state and sending them downstream) while their timestamp is lower
than the current watermark.

The question is:
- is this fault tolerant? The priority queue is not state that is managed
by flink, but it should be recoverable on replay.
- is it possible that the events I'm sending downstream become late-events
for a different operator, for some reason? Will they always be sent before
the watermark of the event that originated the processElement() call?
- I would effectively be processing multiple elements (from multiple keys)
in the same call to processElement(). Is there a way to access the state of
different keys?

This doesn't feel like the right approach. Is there an operator more
suitable than a KeyedProcessFunction which would allow me to handle the
state for multiple keys in this task manager? Should I register a timer to
trigger on the event timestamp instead? I believe timers trigger on
watermarks, so that could theoretically work, although it feels a little
weird. After all, what I want is just to buffer events so that they are
only processed when the watermark has caught up to them.

Thanks

Timo Walther  escreveu no dia sexta, 30/04/2021 à(s)
12:05:

> Hi Miguel,
>
> your initial idea sounds not too bad but why do you want to key by
> timestamp? Usually, you can simply key your stream by a custom key and
> store the events in a ListState until a watermark comes in.
>
> But if you really want to have some kind of global event-time order, you
> have two choices:
>
> - either a single operator with parallelism 1 that performs the ordering
> - or you send the every event to every operator using the broadcast
> state pattern [1]
>
> It is guaranteed that watermark will reach the downstream operator or
> sink after all events. Watermarks are synchronized across all parallel
> operator instances. You can store a Map uncheckpointed by this means
> that you have to ensure to initialize the map again during recovery.
>
> Regards,
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>
> On 30.04.21 11:37, Miguel Araújo wrote:
> > Hi everyone,
> >
> > I have a KeyedProcessFunction whose events I would like to process in
> > event-time order.
> > My initial idea was to use a Map keyed by timestamp and, when a new
> > event arrives, iterate over the Map to process events older than the
> > current watermark.
> >
> > The issue is that I obviously can't use a MapState, as my stream is
> > keyed, so the map would be scoped to the current key.
> > Is using a "regular" (i.e., not checkpointed) Map an option, given that
> > its content will be recreated by the replay of the events on a restart?
> > Is it guaranteed that the watermark that triggered the processing of
> > multiple events (and their subsequent push downstream) is not sent
> > downstream before these events themselves?
> >
> > Thanks,
> > Miguel
>
>


Re: Guaranteeing event order in a KeyedProcessFunction

2021-04-30 Thread Timo Walther

Hi Miguel,

your initial idea sounds not too bad but why do you want to key by 
timestamp? Usually, you can simply key your stream by a custom key and 
store the events in a ListState until a watermark comes in.


But if you really want to have some kind of global event-time order, you 
have two choices:


- either a single operator with parallelism 1 that performs the ordering
- or you send the every event to every operator using the broadcast 
state pattern [1]


It is guaranteed that watermark will reach the downstream operator or 
sink after all events. Watermarks are synchronized across all parallel 
operator instances. You can store a Map uncheckpointed by this means 
that you have to ensure to initialize the map again during recovery.


Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html


On 30.04.21 11:37, Miguel Araújo wrote:

Hi everyone,

I have a KeyedProcessFunction whose events I would like to process in 
event-time order.
My initial idea was to use a Map keyed by timestamp and, when a new 
event arrives, iterate over the Map to process events older than the 
current watermark.


The issue is that I obviously can't use a MapState, as my stream is 
keyed, so the map would be scoped to the current key.
Is using a "regular" (i.e., not checkpointed) Map an option, given that 
its content will be recreated by the replay of the events on a restart? 
Is it guaranteed that the watermark that triggered the processing of 
multiple events (and their subsequent push downstream) is not sent 
downstream before these events themselves?


Thanks,
Miguel




Guaranteeing event order in a KeyedProcessFunction

2021-04-30 Thread Miguel Araújo
Hi everyone,

I have a KeyedProcessFunction whose events I would like to process in
event-time order.
My initial idea was to use a Map keyed by timestamp and, when a new event
arrives, iterate over the Map to process events older than the current
watermark.

The issue is that I obviously can't use a MapState, as my stream is keyed,
so the map would be scoped to the current key.
Is using a "regular" (i.e., not checkpointed) Map an option, given that its
content will be recreated by the replay of the events on a restart? Is it
guaranteed that the watermark that triggered the processing of multiple
events (and their subsequent push downstream) is not sent downstream before
these events themselves?

Thanks,
Miguel


Re: KeyedProcessFunction

2021-03-11 Thread Maminspapin
I missed in documentation:

A KeyedProcessFunction is always a RichFunction. Therefore, access to the
RuntimeContext is always available and setup and teardown methods can be
implemented. See
RichFunction.open(org.apache.flink.configuration.Configuration) and
RichFunction.close().

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


KeyedProcessFunction

2021-03-11 Thread Maminspapin
Hello,

I'm learning State Processor API:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

There is example in this page with StatefulFunctionWithTime extends
KeyedProcessFunction. And here we can see method open() we need implement to
initialize state. But when I try KeyedProcessFunction abstract class in my
code there is no method open().

Using Flink 1.12.

What's wrong I do?

Thanks.
Yuri L.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-26 Thread Timo Walther
 below.
 >>>
 >>>     I managed to figure out the root cause is that Flink stream
 >>>     execution environment has a default parallelism as 8.*I
didn't
 >>>     notice in the doc, could the Community add this
explicitly into
 >>>     the official doc to avoid some confusion? Thanks.*
 >>>
 >>>  From my understanding, the watermark advances based on the
 >>>     lowest watermark among the 8, so I can not advance the
bound out
 >>>     of orderness watermark since I am only advancing 1 of the 8
 >>>     parallelisms. If I set the entire stream execution
environment
 >>>     to be of parallelism 1, it will reflect the watermark
in the
 >>>     context correctly. One more thing is that this behavior
is not
 >>>     reflected in the Flink Cluster web UI interface. I can
see the
 >>>     watermark is advancing, but it is not in reality. *That's
 >>>     causing the inconsistency problem I mentioned in the
other email
 >>>     I mentioned above. Will this be considered as a bug in
the UI?*
 >>>
 >>>     My current question is, since I have full outer join
operation
 >>>     before the KeyedProcessFunction here. How can I let the
bound of
 >>>     orderness watermark / punctuated watermark strategy
work if the
 >>>     parallelism > 1? It can only update one of the 8
parallelisms
 >>>     for the watermark for this onTimer operator. Is this
related to
 >>>     my Table full outer join operation before this step?
According
 >>>     to the doc,
 >>>

https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$

<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$>

 >>>

<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$

<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$>

 >>> >
 >>>
 >>>     Default parallelism should be the same like the stream
 >>>     environment. Why can't I update the watermarks for all 8
 >>>     parallelisms? What should I do to enable this function with
 >>>     Parallelism larger than 1? Thanks.
 >>>
 >>>     First round: (Note the first column of each log row is the
 >>>     timelag strategy, it is getting updated correctly for all 8
 >>>     parallelism, but the other two strategies I mentioned above
 >>>     can't do that..)
 >>>
 >>>     14:28:01,199 INFO
 >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
 >>>     - Emit Watermark: watermark based on system time:
1605047266198,
 >>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
 >>>     14:28:01,199 INFO
 >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
 >>>     - Emit Watermark: watermark based on system time:
1605047266199,
 >>>     periodicEmitWatermarkTime: 1605047172881,
currentMaxTimestamp:
 >>>     1605047187881 (only one of the 8 parallelism for bound
out of
 >>>     orderness is getting my new watermark)
 >>>     14:28:01,199 INFO
 >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
 >>>     - Emit Watermark: watermark based on system time:
1605047266199,
 >>>     periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
 >>>     14:28:01,199 INFO
 >>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
 >>>     - Emit Watermark: watermark based on system time:
1605047266198,
 >>>    

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-25 Thread Fuyao Li
/ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$
> >>> >
> >>>
> >>> Maybe that is the root cause of my problem here, with less than 8
> >>> partitions (only 1 partition in my case), using the default
> >>> parallelism of 8 will cause this wrong behavior. This is my guess, it
> >>> takes a while to test it out... What's your opinion on this? Thanks!
> >>>
> >>> Best,
> >>>
> >>> Fuyao
> >>>
> >>>
> >>> On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li  >>> <mailto:fuyaoli2...@gmail.com>> wrote:
> >>>
> >>> Hi Matthias,
> >>>
> >>> One more question regarding Flink table parallelism, is it possible
> >>> to configure the parallelism for Table operation at operator level,
> >>> it seems we don't have such API available, right? Thanks!
> >>>
> >>> Best,
> >>> Fuyao
> >>>
> >>> On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li  >>> <mailto:fuyaoli2...@gmail.com>> wrote:
> >>>
> >>> Hi Matthias,
> >>>
> >>> Thanks for your information. I have managed to figure out the
> >>> first issue you mentioned. Regarding the second issue. I have
> >>> got some progress on it.
> >>>
> >>> I have sent another email with the title 'BoundedOutOfOrderness
> >>> Watermark Generator is NOT making the event time to advance'
> >>> using another email of mine, fuyao...@oracle.com
> >>> <mailto:fuyao...@oracle.com>. That email contains some more
> >>> context on my issue. Please take a look. I have made some
> >>> progress after sending that new email.
> >>>
> >>> Previously, I had managed to make timelag watermark strategy
> >>> working in my code, but my bound out of orderness strategy or
> >>> punctuated watermark strategy doesn't work well. It produces 8
> >>> watermarks each time. Two cycles are shown below.
> >>>
> >>> I managed to figure out the root cause is that Flink stream
> >>> execution environment has a default parallelism as 8.*I didn't
> >>> notice in the doc, could the Community add this explicitly into
> >>> the official doc to avoid some confusion? Thanks.*
> >>>
> >>>  From my understanding, the watermark advances based on the
> >>> lowest watermark among the 8, so I can not advance the bound
> out
> >>> of orderness watermark since I am only advancing 1 of the 8
> >>> parallelisms. If I set the entire stream execution environment
> >>> to be of parallelism 1, it will reflect the watermark in the
> >>> context correctly. One more thing is that this behavior is not
> >>> reflected in the Flink Cluster web UI interface. I can see the
> >>> watermark is advancing, but it is not in reality. *That's
> >>> causing the inconsistency problem I mentioned in the other
> email
> >>> I mentioned above. Will this be considered as a bug in the UI?*
> >>>
> >>> My current question is, since I have full outer join operation
> >>> before the KeyedProcessFunction here. How can I let the bound
> of
> >>> orderness watermark / punctuated watermark strategy work if the
> >>> parallelism > 1? It can only update one of the 8 parallelisms
> >>> for the watermark for this onTimer operator. Is this related to
> >>> my Table full outer join operation before this step? According
> >>> to the doc,
> >>>
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$
> >>> <
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$
> >>> >
> >>>
>

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-24 Thread Timo Walther
elisms. If I set the entire stream execution environment
    to be of parallelism 1, it will reflect the watermark in the
    context correctly. One more thing is that this behavior is not
    reflected in the Flink Cluster web UI interface. I can see the
    watermark is advancing, but it is not in reality. *That's
    causing the inconsistency problem I mentioned in the other email
    I mentioned above. Will this be considered as a bug in the UI?*

    My current question is, since I have full outer join operation
    before the KeyedProcessFunction here. How can I let the bound of
    orderness watermark / punctuated watermark strategy work if the
    parallelism > 1? It can only update one of the 8 parallelisms
    for the watermark for this onTimer operator. Is this related to
    my Table full outer join operation before this step? According
    to the doc,
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$ 
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$ 
>


    Default parallelism should be the same like the stream
    environment. Why can't I update the watermarks for all 8
    parallelisms? What should I do to enable this function with
    Parallelism larger than 1? Thanks.

    First round: (Note the first column of each log row is the
    timelag strategy, it is getting updated correctly for all 8
    parallelism, but the other two strategies I mentioned above
    can't do that..)

    14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047266199,
    periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
    1605047187881 (only one of the 8 parallelism for bound out of
    orderness is getting my new watermark)
    14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047266199,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047266198,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

    Second round: (I set the autoWatermark interval to be 5 seconds)
    14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
    1605047187881
    14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emit Watermark: watermark based on system time: 1605047271200,
    periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
    14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
    - Emi

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-23 Thread Jark Wu
ed if you can use an interval join instead of a full join
> with state retention? Table/SQL pipelines that don't preserve a time
> attribute in the end might also erase the underlying watermarks. Thus,
> event time triggers will not work after your join.
>
> "Why can't I update the watermarks for all 8 parallelisms?"
>
> You could play around with idleness for your source [2]. Or you set the
> source parallelism to 1 (while keeping the rest of the pipeline globally
> set to 8), would that be an option?
>
> "Some type cast behavior of retracted streams I can't explain."
>
> toAppendStream/toRetractStream still need an update to the new type
> system. This is explained in FLIP-136 which will be part of Flink 1.13 [3].
>
> I hope I could help a bit.
>
> Regards,
> Timo
>
>
> [1]
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-113*3A*Supports*Dynamic*Table*Options*for*Flink*SQL__;JSsrKysrKys!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J6qWrWNk$
> [2]
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html*dealing-with-idle-sources__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JMW06Who$
> [3]
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-136*3A**AImprove*interoperability*between*DataStream*and*Table*API__;JSsrKysrKysr!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JfAjyGyQ$
> On 13.11.20 21:39, Fuyao Li wrote:
>
> Hi Matthias,
>
> Just to provide more context on this problem. I only have 1 partition per
> each Kafka Topic at the beginning before the join operation. After reading
> the doc:
> https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$
> >
> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$>
>
> Maybe that is the root cause of my problem here, with less than 8
> partitions (only 1 partition in my case), using the default parallelism of
> 8 will cause this wrong behavior. This is my guess, it takes a while to
> test it out... What's your opinion on this? Thanks!
>
> Best,
>
> Fuyao
>
>
> On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li  <mailto:fuyaoli2...@gmail.com> > wrote:
>
> Hi Matthias,
>
> One more question regarding Flink table parallelism, is it possible
> to configure the parallelism for Table operation at operator level,
> it seems we don't have such API available, right? Thanks!
>
> Best,
> Fuyao
>
> On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li  <mailto:fuyaoli2...@gmail.com> > wrote:
>
> Hi Matthias,
>
> Thanks for your information. I have managed to figure out the
> first issue you mentioned. Regarding the second issue. I have
> got some progress on it.
>
> I have sent another email with the title 'BoundedOutOfOrderness
> Watermark Generator is NOT making the event time to advance'
> using another email of mine, fuyao...@oracle.com
> <mailto:fuyao...@oracle.com> . That email
> contains some more
> context on my issue. Please take a look. I have made some
> progress after sending that new email.
>
> Previously, I had managed to make timelag watermark strategy
> working in my code, but my bound out of orderness strategy or
> punctuated watermark strategy doesn't work well. It produces 8
> watermarks each time. Two cycles are shown below.
>
> I managed to figure out the root cause is that Flink stream
> execution environment has a default parallelism as 8.*I didn't
> notice in the doc, could the Community add this explicitly into
> the official doc to avoid some confusion? Thanks.*
>
>  From my understanding, the watermark advances based on the
> lowest watermark among the 8, so I can not advance the bound out
> of orderness watermark since I am only advancing 1 of the 8
> parallelisms. If I set the entire stream ex

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread fuyao . li
ctStream still need an update to the new type 
system. This is explained in FLIP-136 which will be part of Flink 
1.13 [3].


I hope I could help a bit.

Regards,
Timo


[1] 
https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-113*3A*Supports*Dynamic*Table*Options*for*Flink*SQL__;JSsrKysrKys!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J6qWrWNk$ 
[2] 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html*dealing-with-idle-sources__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JMW06Who$ 
[3] 
https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-136*3A**AImprove*interoperability*between*DataStream*and*Table*API__;JSsrKysrKysr!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JfAjyGyQ$ 


On 13.11.20 21:39, Fuyao Li wrote:

Hi Matthias,

Just to provide more context on this problem. I only have 1 
partition per each Kafka Topic at the beginning before the join 
operation. After reading the doc: 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$ 
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$ 
>


Maybe that is the root cause of my problem here, with less than 8 
partitions (only 1 partition in my case), using the default 
parallelism of 8 will cause this wrong behavior. This is my guess, 
it takes a while to test it out... What's your opinion on this? Thanks!


Best,

Fuyao


On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li <mailto:fuyaoli2...@gmail.com>> wrote:


    Hi Matthias,

    One more question regarding Flink table parallelism, is it possible
    to configure the parallelism for Table operation at operator level,
    it seems we don't have such API available, right? Thanks!

    Best,
    Fuyao

    On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li mailto:fuyaoli2...@gmail.com>> wrote:

    Hi Matthias,

    Thanks for your information. I have managed to figure out the
    first issue you mentioned. Regarding the second issue. I have
    got some progress on it.

    I have sent another email with the title 'BoundedOutOfOrderness
    Watermark Generator is NOT making the event time to advance'
    using another email of mine, fuyao...@oracle.com
<mailto:fuyao...@oracle.com>. That email contains some more
    context on my issue. Please take a look. I have made some
    progress after sending that new email.

    Previously, I had managed to make timelag watermark strategy
    working in my code, but my bound out of orderness strategy or
    punctuated watermark strategy doesn't work well. It produces 8
    watermarks each time. Two cycles are shown below.

    I managed to figure out the root cause is that Flink stream
    execution environment has a default parallelism as 8.*I didn't
    notice in the doc, could the Community add this explicitly into
    the official doc to avoid some confusion? Thanks.*

 From my understanding, the watermark advances based on the
    lowest watermark among the 8, so I can not advance the bound 
out

    of orderness watermark since I am only advancing 1 of the 8
    parallelisms. If I set the entire stream execution environment
    to be of parallelism 1, it will reflect the watermark in the
    context correctly. One more thing is that this behavior is not
    reflected in the Flink Cluster web UI interface. I can see the
    watermark is advancing, but it is not in reality. *That's
    causing the inconsistency problem I mentioned in the other 
email

    I mentioned above. Will this be considered as a bug in the UI?*

    My current question is, since I have full outer join operation
    before the KeyedProcessFunction here. How can I let the 
bound of

    orderness watermark / punctuated watermark strategy work if the
    parallelism > 1? It can only update one of the 8 parallelisms
    for the watermark for this onTimer operator. Is this related to
    my Table full outer join operation before this step? According
    to the doc,
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$ 
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/tab

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread fuyao . li
ts*Dynamic*Table*Options*for*Flink*SQL__;JSsrKysrKys!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J6qWrWNk$ 
[2] 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html*dealing-with-idle-sources__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JMW06Who$ 
[3] 
https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-136*3A**AImprove*interoperability*between*DataStream*and*Table*API__;JSsrKysrKysr!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JfAjyGyQ$ 


On 13.11.20 21:39, Fuyao Li wrote:

Hi Matthias,

Just to provide more context on this problem. I only have 1 partition 
per each Kafka Topic at the beginning before the join operation. 
After reading the doc: 
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$ 
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html*kafka-consumers-and-timestamp-extractionwatermark-emission__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7JnAwo_lc$ 
>


Maybe that is the root cause of my problem here, with less than 8 
partitions (only 1 partition in my case), using the default 
parallelism of 8 will cause this wrong behavior. This is my guess, it 
takes a while to test it out... What's your opinion on this? Thanks!


Best,

Fuyao


On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li <mailto:fuyaoli2...@gmail.com>> wrote:


    Hi Matthias,

    One more question regarding Flink table parallelism, is it possible
    to configure the parallelism for Table operation at operator level,
    it seems we don't have such API available, right? Thanks!

    Best,
    Fuyao

    On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li mailto:fuyaoli2...@gmail.com>> wrote:

    Hi Matthias,

    Thanks for your information. I have managed to figure out the
    first issue you mentioned. Regarding the second issue. I have
    got some progress on it.

    I have sent another email with the title 'BoundedOutOfOrderness
    Watermark Generator is NOT making the event time to advance'
    using another email of mine, fuyao...@oracle.com
    <mailto:fuyao...@oracle.com>. That email contains some more
    context on my issue. Please take a look. I have made some
    progress after sending that new email.

    Previously, I had managed to make timelag watermark strategy
    working in my code, but my bound out of orderness strategy or
    punctuated watermark strategy doesn't work well. It produces 8
    watermarks each time. Two cycles are shown below.

    I managed to figure out the root cause is that Flink stream
    execution environment has a default parallelism as 8.*I didn't
    notice in the doc, could the Community add this explicitly into
    the official doc to avoid some confusion? Thanks.*

 From my understanding, the watermark advances based on the
    lowest watermark among the 8, so I can not advance the bound out
    of orderness watermark since I am only advancing 1 of the 8
    parallelisms. If I set the entire stream execution environment
    to be of parallelism 1, it will reflect the watermark in the
    context correctly. One more thing is that this behavior is not
    reflected in the Flink Cluster web UI interface. I can see the
    watermark is advancing, but it is not in reality. *That's
    causing the inconsistency problem I mentioned in the other email
    I mentioned above. Will this be considered as a bug in the UI?*

    My current question is, since I have full outer join operation
    before the KeyedProcessFunction here. How can I let the bound of
    orderness watermark / punctuated watermark strategy work if the
    parallelism > 1? It can only update one of the 8 parallelisms
    for the watermark for this onTimer operator. Is this related to
    my Table full outer join operation before this step? According
    to the doc,
https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$ 
<https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html*table-exec-resource-default-parallelism__;Iw!!GqivPVa7Brio!ItHlGfYT1dLQeAolQoFNfXPN876842lnF4hOE7cxmmTJY4tJkXUmkz7J4wxLjc0$ 
>


    Default parallelism should be the same like the stream
    environment. Why can't I update the watermarks for all 8
    parallelisms? What should I do to enable 

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread Timo Walther
e. Will this be considered as a bug in the UI?*

My current question is, since I have full outer join operation
before the KeyedProcessFunction here. How can I let the bound of
orderness watermark / punctuated watermark strategy work if the
parallelism > 1? It can only update one of the 8 parallelisms
for the watermark for this onTimer operator. Is this related to
my Table full outer join operation before this step? According
to the doc,

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism

<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism>

Default parallelism should be the same like the stream
environment. Why can't I update the watermarks for all 8
parallelisms? What should I do to enable this function with
Parallelism larger than 1? Thanks.

First round: (Note the first column of each log row is the
timelag strategy, it is getting updated correctly for all 8
parallelism, but the other two strategies I mentioned above
can't do that..)

14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
1605047187881 (only one of the 8 parallelism for bound out of
orderness is getting my new watermark)
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

Second round: (I set the autoWatermark interval to be 5 seconds)
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
1605047187881
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator
- Emit Watermark: watermark based on system time: 1605047271200,
  

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Fuyao Li
Hi Matthias,

Just to provide more context on this problem. I only have 1 partition per
each Kafka Topic at the beginning before the join operation. After reading
the doc:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

Maybe that is the root cause of my problem here, with less than 8
partitions (only 1 partition in my case), using the default parallelism of
8 will cause this wrong behavior. This is my guess, it takes a while to
test it out... What's your opinion on this? Thanks!

Best,

Fuyao

On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li  wrote:

> Hi Matthias,
>
> One more question regarding Flink table parallelism, is it possible to
> configure the parallelism for Table operation at operator level, it seems
> we don't have such API available, right? Thanks!
>
> Best,
> Fuyao
>
> On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li  wrote:
>
>> Hi Matthias,
>>
>> Thanks for your information. I have managed to figure out the first issue
>> you mentioned. Regarding the second issue. I have got some progress on it.
>>
>> I have sent another email with the title 'BoundedOutOfOrderness Watermark
>> Generator is NOT making the event time to advance' using another email of
>> mine, fuyao...@oracle.com. That email contains some more context on my
>> issue. Please take a look. I have made some progress after sending that new
>> email.
>>
>> Previously, I had managed to make timelag watermark strategy working in
>> my code, but my bound out of orderness strategy or punctuated watermark
>> strategy doesn't work well. It produces 8 watermarks each time. Two cycles
>> are shown below.
>>
>> I managed to figure out the root cause is that Flink stream execution
>> environment has a default parallelism as 8.* I didn't notice in the doc,
>> could the Community add this explicitly into the official doc to avoid some
>> confusion? Thanks.*
>>
>> From my understanding, the watermark advances based on the lowest
>> watermark among the 8, so I can not advance the bound out of orderness
>> watermark since I am only advancing 1 of the 8 parallelisms. If I set the
>> entire stream execution environment to be of parallelism 1, it will reflect
>> the watermark in the context correctly. One more thing is that this
>> behavior is not reflected in the Flink Cluster web UI interface. I can see
>> the watermark is advancing, but it is not in reality. *That's causing
>> the inconsistency problem I mentioned in the other email I mentioned above.
>> Will this be considered as a bug in the UI?*
>>
>> My current question is, since I have full outer join operation before the
>> KeyedProcessFunction here. How can I let the bound of orderness watermark /
>> punctuated watermark strategy work if the parallelism > 1? It can only
>> update one of the 8 parallelisms for the watermark for this onTimer
>> operator. Is this related to my Table full outer join operation before this
>> step? According to the doc,
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism
>>
>> Default parallelism should be the same like the stream environment. Why
>> can't I update the watermarks for all 8 parallelisms? What should I do to
>> enable this function with Parallelism larger than 1? Thanks.
>>
>> First round: (Note the first column of each log row is the timelag
>> strategy, it is getting updated correctly for all 8 parallelism, but the
>> other two strategies I mentioned above can't do that..)
>>
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266198,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266199,
>> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
>> 1605047187881 (only one of the 8 parallelism for bound out of orderness is
>> getting my new watermark)
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266199,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 INFO
>> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
>> Watermark: watermark based on system time: 1605047266198,
>> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>> 14:28:01,199 

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Fuyao Li
Hi Matthias,

One more question regarding Flink table parallelism, is it possible to
configure the parallelism for Table operation at operator level, it seems
we don't have such API available, right? Thanks!

Best,
Fuyao

On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li  wrote:

> Hi Matthias,
>
> Thanks for your information. I have managed to figure out the first issue
> you mentioned. Regarding the second issue. I have got some progress on it.
>
> I have sent another email with the title 'BoundedOutOfOrderness Watermark
> Generator is NOT making the event time to advance' using another email of
> mine, fuyao...@oracle.com. That email contains some more context on my
> issue. Please take a look. I have made some progress after sending that new
> email.
>
> Previously, I had managed to make timelag watermark strategy working in my
> code, but my bound out of orderness strategy or punctuated watermark
> strategy doesn't work well. It produces 8 watermarks each time. Two cycles
> are shown below.
>
> I managed to figure out the root cause is that Flink stream execution
> environment has a default parallelism as 8.* I didn't notice in the doc,
> could the Community add this explicitly into the official doc to avoid some
> confusion? Thanks.*
>
> From my understanding, the watermark advances based on the lowest
> watermark among the 8, so I can not advance the bound out of orderness
> watermark since I am only advancing 1 of the 8 parallelisms. If I set the
> entire stream execution environment to be of parallelism 1, it will reflect
> the watermark in the context correctly. One more thing is that this
> behavior is not reflected in the Flink Cluster web UI interface. I can see
> the watermark is advancing, but it is not in reality. *That's causing the
> inconsistency problem I mentioned in the other email I mentioned above.
> Will this be considered as a bug in the UI?*
>
> My current question is, since I have full outer join operation before the
> KeyedProcessFunction here. How can I let the bound of orderness watermark /
> punctuated watermark strategy work if the parallelism > 1? It can only
> update one of the 8 parallelisms for the watermark for this onTimer
> operator. Is this related to my Table full outer join operation before this
> step? According to the doc,
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism
>
> Default parallelism should be the same like the stream environment. Why
> can't I update the watermarks for all 8 parallelisms? What should I do to
> enable this function with Parallelism larger than 1? Thanks.
>
> First round: (Note the first column of each log row is the timelag
> strategy, it is getting updated correctly for all 8 parallelism, but the
> other two strategies I mentioned above can't do that..)
>
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266199,
> periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
> 1605047187881 (only one of the 8 parallelism for bound out of orderness is
> getting my new watermark)
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266199,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
> 14:28:01,199 INFO
> org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
> Watermark: watermark based on system time: 1605047266198,
> periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
>
> Second round: (I set the autoWatermark interval 

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Fuyao Li
Hi Matthias,

Thanks for your information. I have managed to figure out the first issue
you mentioned. Regarding the second issue. I have got some progress on it.

I have sent another email with the title 'BoundedOutOfOrderness Watermark
Generator is NOT making the event time to advance' using another email of
mine, fuyao...@oracle.com. That email contains some more context on my
issue. Please take a look. I have made some progress after sending that new
email.

Previously, I had managed to make timelag watermark strategy working in my
code, but my bound out of orderness strategy or punctuated watermark
strategy doesn't work well. It produces 8 watermarks each time. Two cycles
are shown below.

I managed to figure out the root cause is that Flink stream execution
environment has a default parallelism as 8.* I didn't notice in the doc,
could the Community add this explicitly into the official doc to avoid some
confusion? Thanks.*

>From my understanding, the watermark advances based on the lowest watermark
among the 8, so I can not advance the bound out of orderness watermark
since I am only advancing 1 of the 8 parallelisms. If I set the entire
stream execution environment to be of parallelism 1, it will reflect the
watermark in the context correctly. One more thing is that this behavior is
not reflected in the Flink Cluster web UI interface. I can see the
watermark is advancing, but it is not in reality. *That's causing the
inconsistency problem I mentioned in the other email I mentioned above.
Will this be considered as a bug in the UI?*

My current question is, since I have full outer join operation before the
KeyedProcessFunction here. How can I let the bound of orderness watermark /
punctuated watermark strategy work if the parallelism > 1? It can only
update one of the 8 parallelisms for the watermark for this onTimer
operator. Is this related to my Table full outer join operation before this
step? According to the doc,
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/config.html#table-exec-resource-default-parallelism

Default parallelism should be the same like the stream environment. Why
can't I update the watermarks for all 8 parallelisms? What should I do to
enable this function with Parallelism larger than 1? Thanks.

First round: (Note the first column of each log row is the timelag
strategy, it is getting updated correctly for all 8 parallelism, but the
other two strategies I mentioned above can't do that..)

14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp:
1605047187881 (only one of the 8 parallelism for bound out of orderness is
getting my new watermark)
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266199,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:01,199 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047266198,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000

Second round: (I set the autoWatermark interval to be 5 seconds)
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 0, currentMaxTimestamp: 15000
14:28:06,200 INFO
org.myorg.quickstart.operator.PeriodicTableOutputWatermarkGenerator - Emit
Watermark: watermark based on system time: 1605047271200,
periodicEmitWatermarkTime: 1605047172881, currentMaxTimestamp: 1605047187881
14:28:06,20

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-13 Thread Matthias Pohl
Sink")
> .setParallelism(1);
>
> Best regards,
> Fuyao
>
> On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li  wrote:
>
>> Hi Flink Community,
>>
>> I am doing some research work on Flink Datastream and Table API and I
>> meet two major problems. I am using Flink 1.11.2, scala version 2.11, java
>> 8. My use case looks like this. I plan to write a data processing pipeline
>> with two stages. My goal is to construct a business object containing
>> information from several Kafka streams with a primary key and emit the
>> complete business object if such primary key doesn't  appear in the
>> pipeline for 10 seconds.
>>
>> In the first stage, I first consume three Kafka streams and transform it
>> to Flink Datastream using a deserialization schema containing some type and
>> date format transformation, and then I register these data streams as Table
>> and do a full outer join one by one using Table API. I also add query
>> configuration for this to avoid excessive state. The primary key is also
>> the join key.
>>
>> In the second stage, I transform the joined table to a retracted stream
>> and put it into KeyedProcessFunction to generate the business object if the
>> business object's primary key is inactive for 10 second.
>>
>> Is this way of handling the data the suggested approach? (I understand I
>> can directly consume kafka data in Table API. I haven't tried that yet,
>> maybe that's better?) Any suggestion is welcomed. During implementing this,
>> I meet two major problems and several smaller questions under each problem.
>>
>>
>> 1. Some type cast behavior of retracted streams I can't explain.
>>
>> (1) In the initial stage, I registered some field as *java.sql.Date* or
>> *java.sql.timestamp* following the examples at (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction)
>> . After join and transform to retracted stream, it becomes
>> *java.time.LocalDate* and *java.time.LocalDateTime* instead.
>>
>> For example, when first ingesting the Kafka streams, I registerd a
>> attribute in java.sql.Timestamp type.
>>
>>  @JsonAlias("ATTRIBUTE1")
>>  private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo =
>> java.sql.Timestamp.class) Timestamp ATTRIBUTE1;
>>
>> When I tried to cast the type information back after the retracted
>> stream, the code gives me error information below.
>>
>>  java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to
>> java.sql.Timestamp
>>
>> Maybe I should use toAppendStream instead since append stream could
>> register type information, but toRetractedStream can't do that? (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>> )
>>
>> My work around is to cast it to LocalDateTime first and extract the epoch
>> time, this doesn't seem to be a final solution.
>>
>> (2) During timestamp conversion, the Flink to retracted stream seems to
>> lost the AM/PM information in the stream and causing a 12 hour difference
>> if it is PM.
>>
>> I use joda time to do some timestamp conversion in the first
>> deserialization stage, my pattern looks like this. "a" means AM/PM
>> information
>>
>>  DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy
>> HH.mm.ss.SS a").withZone(DateTimeZone.getDefault());
>>
>> After the retracted stream, the AM/PM information is not preserved.
>>
>>
>> 2. My onTimer method in KeyedProcessFunction can not be triggered when I
>> scheduled a event timer timer.
>>
>> I am using event time in my code. I am new to configure watermarks and I
>> might miss something to configure it correctly. I also tried to register a
>> processing time, it could enter and produce some results.
>>
>> I am trying to follow the example here:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>>
>> My onTimer method looks like this and the scheduled event doesn't happen..
>>
>> In processElement():
>>
>> context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
>> + 1);
>>
>> My onTimer function
>>
>>   @Override
>> public void onTimer(long timestamp, OnTimerContext ctx,
>> Collector collector) throws Exception {
>> TestBusinessObjectState result = testBusinessObjectState.value();
>> log.info("Inside o

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-04 Thread Fuyao Li
Hi Flink Users and Community,

For the first part of the question, the 12 hour time difference is caused
by a time extraction bug myself. I can get the time translated correctly
now. The type cast problem does have some workarounds to solve it..

My major blocker right now is the onTimer part is not properly triggered. I
guess it is caused by failing to configure the correct watermarks &
timestamp assigners. Please give me some insights.

1. If I don't chain the assignTimestampsAndWatermarks() method in together
with keyedBy().. and process().. method. The context.timestamp() in my
processElement() function will be null. Is this some expected behavior? The
Flink examples didn't chain it together. (see example here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#using-watermark-strategies
)
2. If I use registerEventTimeTimer() in processElement(). The onTimer
method will not be triggered. However, I can trigger the onTimer method if
I simply change it to registerProcessingTimeTimer(). I am using the
settings below in the stream env.

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);

My code for method the process chain:
retractStream

.assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((booleanRowTuple2,
timestamp) -> {
Row rowData = booleanRowTuple2.f1;
LocalDateTime headerTime =
(LocalDateTime)rowData.getField(3);
LocalDateTime linesTime =
(LocalDateTime)rowData.getField(7);

LocalDateTime latestDBUpdateTime = null;
if (headerTime != null && linesTime != null) {
latestDBUpdateTime =
headerTime.isAfter(linesTime) ? headerTime : linesTime;
}
else {
latestDBUpdateTime = (headerTime != null) ?
headerTime : linesTime;
}
if (latestDBUpdateTime != null) {
return
latestDBUpdateTime.atZone(ZoneId.of("America/Los_Angeles")).toInstant().toEpochMilli();
}
// In the worst case, we use system time
instead, which should never be reached.
return System.currentTimeMillis();
}))
//  .assignTimestampsAndWatermarks(new MyWaterStrategy())  //
second way to create watermark, doesn't work
.keyBy(value -> {
// There could be null fields for header invoice_id
field
String invoice_id_key = (String)value.f1.getField(0);
if (invoice_id_key == null) {
invoice_id_key = (String)value.f1.getField(4);
}
return invoice_id_key;
})
.process(new TableOutputProcessFunction())
.name("ProcessTableOutput")
.uid("ProcessTableOutput")
.addSink(businessObjectSink)
.name("businessObjectSink")
.uid("businessObjectSink")
.setParallelism(1);

Best regards,
Fuyao

On Mon, Nov 2, 2020 at 4:53 PM Fuyao Li  wrote:

> Hi Flink Community,
>
> I am doing some research work on Flink Datastream and Table API and I meet
> two major problems. I am using Flink 1.11.2, scala version 2.11, java 8. My
> use case looks like this. I plan to write a data processing pipeline with
> two stages. My goal is to construct a business object containing
> information from several Kafka streams with a primary key and emit the
> complete business object if such primary key doesn't  appear in the
> pipeline for 10 seconds.
>
> In the first stage, I first consume three Kafka streams and transform it
> to Flink Datastream using a deserialization schema containing some type and
> date format transformation, and then I register these data streams as Table
> and do a full outer join one by one using Table API. I also add query
> configuration for this to avoid excessive state. The primary key is also
> the join key.
>
> In the second stage, I transform the joined table to a retracted stream
> and put it into KeyedProcessFunction to generate the business object if the
> business object's primary key is inactive for 10 second.
>
> Is this way of handling the data the suggested approach? (I understand I
> can directly consume kafka data in Table API. I haven't tried that yet,
> maybe that's better?) Any suggestion is welcomed. During implementing this,
> I meet two major problems and several smaller question

Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-02 Thread Fuyao Li
Hi Flink Community,

I am doing some research work on Flink Datastream and Table API and I meet
two major problems. I am using Flink 1.11.2, scala version 2.11, java 8. My
use case looks like this. I plan to write a data processing pipeline with
two stages. My goal is to construct a business object containing
information from several Kafka streams with a primary key and emit the
complete business object if such primary key doesn't  appear in the
pipeline for 10 seconds.

In the first stage, I first consume three Kafka streams and transform it to
Flink Datastream using a deserialization schema containing some type and
date format transformation, and then I register these data streams as Table
and do a full outer join one by one using Table API. I also add query
configuration for this to avoid excessive state. The primary key is also
the join key.

In the second stage, I transform the joined table to a retracted stream and
put it into KeyedProcessFunction to generate the business object if the
business object's primary key is inactive for 10 second.

Is this way of handling the data the suggested approach? (I understand I
can directly consume kafka data in Table API. I haven't tried that yet,
maybe that's better?) Any suggestion is welcomed. During implementing this,
I meet two major problems and several smaller questions under each problem.


1. Some type cast behavior of retracted streams I can't explain.

(1) In the initial stage, I registered some field as *java.sql.Date* or
*java.sql.timestamp* following the examples at (
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#data-type-extraction)
. After join and transform to retracted stream, it becomes
*java.time.LocalDate* and *java.time.LocalDateTime* instead.

For example, when first ingesting the Kafka streams, I registerd a
attribute in java.sql.Timestamp type.

 @JsonAlias("ATTRIBUTE1")
 private @DataTypeHint(value = "TIMESTAMP(6)", bridgedTo =
java.sql.Timestamp.class) Timestamp ATTRIBUTE1;

When I tried to cast the type information back after the retracted stream,
the code gives me error information below.

 java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to
java.sql.Timestamp

Maybe I should use toAppendStream instead since append stream could
register type information, but toRetractedStream can't do that? (
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
)

My work around is to cast it to LocalDateTime first and extract the epoch
time, this doesn't seem to be a final solution.

(2) During timestamp conversion, the Flink to retracted stream seems to
lost the AM/PM information in the stream and causing a 12 hour difference
if it is PM.

I use joda time to do some timestamp conversion in the first
deserialization stage, my pattern looks like this. "a" means AM/PM
information

 DateTimeFormatter format3 = DateTimeFormat.forPattern("dd-MMM-yy
HH.mm.ss.SS a").withZone(DateTimeZone.getDefault());

After the retracted stream, the AM/PM information is not preserved.


2. My onTimer method in KeyedProcessFunction can not be triggered when I
scheduled a event timer timer.

I am using event time in my code. I am new to configure watermarks and I
might miss something to configure it correctly. I also tried to register a
processing time, it could enter and produce some results.

I am trying to follow the example here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example

My onTimer method looks like this and the scheduled event doesn't happen..

In processElement():

context.timerService().registerEventTimeTimer(current.getLastModifiedTime()
+ 1);

My onTimer function

  @Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector collector) throws Exception {
TestBusinessObjectState result = testBusinessObjectState.value();
log.info("Inside onTimer Method, current key: {}, timestamp: {},
last modified time: {}", ctx.getCurrentKey(), timestamp,
result.getLastModifiedTime());

// check if this is an outdated timer or the latest timer
if (timestamp >= result.getLastModifiedTime() + 1) {
// emit the state on timeout
log.info("Collecting a business object, {}",
result.getBusinessObject().toString());
collector.collect(result.getBusinessObject());

cleanUp(ctx);
}
}

private void cleanUp(Context ctx) throws Exception {
Long timer = testBusinessObjectState.value().getLastModifiedTime();
ctx.timerService().deleteEventTimeTimer(timer);
testBusinessObjectState.clear();
}


(1) When I assign the timestamp and watermarks outside the process() method
chain. The "context.timestamp()" will be null. If I put it inside the
chain, it won't be null. Is this the expected b

Re: Unit Test for KeyedProcessFunction with out-of-core state

2020-09-17 Thread Alexey Trenikhun
Unfortunately it looks like impossible to change backend of 
AbstractStreamOperatorTestHarness without resorting to reflection, stateBackend 
 initialized in constructor as `this.stateBackend = new MemoryStateBackend();`, 
since it is protected, I can change it in derived class, but checkpointStorage 
already initialized using original backend `this.checkpointStorage = 
this.stateBackend.createCheckpointStorage(new JobID());`, and checkpointStorage 
 is private

Thanks,
Alexey

From: Arvid Heise 
Sent: Monday, September 14, 2020 11:14 PM
To: Alexey Trenikhun 
Cc: Dawid Wysakowicz ; Flink User Mail List 

Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state

The new backend would be for unit tests (instead of a RocksDB mock). It's kind 
of the mock for out-of-core behavior that you initially requested.

To use rocksDB in an IT Case with multiple task managers, you would adjust the 
configuration in the usual minicluster setup, for example [1].

Note that you can do the same with the test harness [2], but I'd recommend the 
test harness only for testing new operators or complex ProcessFunctions (e.g., 
using timers) and not just for a map. Test harness is non-public API and we 
need to adjust it from time to time to reflect refactoring on the operators.

[1] 
https://github.com/apache/flink/blob/4518d18a726b35de9ff802d155fd8100dc711a63/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java#L132
[2] 
https://github.com/apache/flink/blob/5acbfedf754fa4d063931ea30432716374c2f8cf/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java#L143


On Tue, Sep 15, 2020 at 4:18 AM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Thank you for ideas.
Do you suggest to use new backend with unit test or integration test?

Thanks,
Alexey


From: Arvid Heise mailto:ar...@ververica.com>>
Sent: Monday, September 14, 2020 4:26:47 AM
To: Dawid Wysakowicz mailto:dwysakow...@apache.org>>
Cc: Alexey Trenikhun mailto:yen...@msn.com>>; Flink User Mail 
List mailto:user@flink.apache.org>>
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state

Hi Alexey,

Definition of test levels are always a bit blurry when writing tests for a data 
processing framework, but I'm convinced that in your case, you should rather 
think in terms of integration tests than unit tests:
* Unit test should really just be about business logic
* If it's about specific implementation details of other components, it should 
rather go in an integration test.

You can still structure your code that only half of the pipeline or even just 
one step is executed in an ITCase, but it's much harder to do all the mocking 
than simply executing a small Flink program with a local runner. ITCases are 
really fast and will not limit the portability of your program to newer Flink 
version (which mocking of components usually do).

Another idea for your specific use case would be to implement a backend that 
delegates to HeapMemory but copies all values on retrieval.

On Mon, Sep 7, 2020 at 5:49 PM Dawid Wysakowicz 
mailto:dwysakow...@apache.org>> wrote:

Hi Alexey,

There is no mock for RocksDB. Moreover I am not sure what would be the use case 
for one. If you want to test specifically against RocksDB then you can use it 
in the test harness Gordon mentioned.

On 04/09/2020 16:31, Alexey Trenikhun wrote:
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core 
specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);

Thanks,
Alexey


From: Tzu-Li (Gordon) Tai <mailto:tzuli...@apache.org>
Sent: Friday, September 4, 2020 12:35:48 AM
To: Alexey Trenikhun <mailto:yen...@msn.com>
Cc: Flink User Mail List <mailto:user@flink.apache.org>
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state

Hi Alexey,

Is there a specific reason why you want to test against RocksDB?

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] 
that allows you to wrap a user function and eliminate the need to worry about 
setting up heavy runtime context / dependencies such as the state backend.
As a unit test, this should be sufficient for you to implement basic test 
scenarios for your function, such as expected output given inputs, state etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1] 
https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state 
(like rocksdb).
Does Flink has mock fo

Re: Unit Test for KeyedProcessFunction with out-of-core state

2020-09-15 Thread Arvid Heise
The new backend would be for unit tests (instead of a RocksDB mock). It's
kind of the mock for out-of-core behavior that you initially requested.

To use rocksDB in an IT Case with multiple task managers, you would adjust
the configuration in the usual minicluster setup, for example [1].

Note that you can do the same with the test harness [2], but I'd recommend
the test harness only for testing new operators or complex ProcessFunctions
(e.g., using timers) and not just for a map. Test harness is non-public API
and we need to adjust it from time to time to reflect refactoring on the
operators.

[1]
https://github.com/apache/flink/blob/4518d18a726b35de9ff802d155fd8100dc711a63/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java#L132
[2]
https://github.com/apache/flink/blob/5acbfedf754fa4d063931ea30432716374c2f8cf/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java#L143


On Tue, Sep 15, 2020 at 4:18 AM Alexey Trenikhun  wrote:

> Thank you for ideas.
> Do you suggest to use new backend with unit test or integration test?
>
> Thanks,
> Alexey
>
> --
> *From:* Arvid Heise 
> *Sent:* Monday, September 14, 2020 4:26:47 AM
> *To:* Dawid Wysakowicz 
> *Cc:* Alexey Trenikhun ; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: Unit Test for KeyedProcessFunction with out-of-core state
>
> Hi Alexey,
>
> Definition of test levels are always a bit blurry when writing tests for a
> data processing framework, but I'm convinced that in your case, you should
> rather think in terms of integration tests than unit tests:
> * Unit test should really just be about business logic
> * If it's about specific implementation details of other components, it
> should rather go in an integration test.
>
> You can still structure your code that only half of the pipeline or even
> just one step is executed in an ITCase, but it's much harder to do all the
> mocking than simply executing a small Flink program with a local runner.
> ITCases are really fast and will not limit the portability of your program
> to newer Flink version (which mocking of components usually do).
>
> Another idea for your specific use case would be to implement a backend
> that delegates to HeapMemory but copies all values on retrieval.
>
> On Mon, Sep 7, 2020 at 5:49 PM Dawid Wysakowicz 
> wrote:
>
> Hi Alexey,
>
> There is no mock for RocksDB. Moreover I am not sure what would be the use
> case for one. If you want to test specifically against RocksDB then you can
> use it in the test harness Gordon mentioned.
> On 04/09/2020 16:31, Alexey Trenikhun wrote:
>
> Hi Gordon,
> We already use [1]. Unfortunately it doesn’t allow to detect out-of-core
> specific bugs like this:
> POJO v = myMapState.get(myKey):
> v.setStatus(1);
> return;
> // missing myMapState.put(myKey, v);
>
> Thanks,
> Alexey
>
> --
> *From:* Tzu-Li (Gordon) Tai  
> *Sent:* Friday, September 4, 2020 12:35:48 AM
> *To:* Alexey Trenikhun  
> *Cc:* Flink User Mail List  
> *Subject:* Re: Unit Test for KeyedProcessFunction with out-of-core state
>
> Hi Alexey,
>
> Is there a specific reason why you want to test against RocksDB?
>
> Otherwise, in Flink tests we use a
> `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user
> function and eliminate the need to worry about setting up heavy runtime
> context / dependencies such as the state backend.
> As a unit test, this should be sufficient for you to implement basic test
> scenarios for your function, such as expected output given inputs, state
> etc.
> Does this provide what you are looking for?
>
> Cheers,
> Gordon
>
> [1]
> https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
>
> On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun  wrote:
>
> Hello,
> I want to unit test KeyedProcessFunction which uses with out-of-core state
> (like rocksdb).
> Does Flink has mock for rocksdb, which can be used in unit tests ?
>
> Thanks,
> Alexey
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Ste

Re: Unit Test for KeyedProcessFunction with out-of-core state

2020-09-14 Thread Alexey Trenikhun
Thank you for ideas.
Do you suggest to use new backend with unit test or integration test?

Thanks,
Alexey


From: Arvid Heise 
Sent: Monday, September 14, 2020 4:26:47 AM
To: Dawid Wysakowicz 
Cc: Alexey Trenikhun ; Flink User Mail List 

Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state

Hi Alexey,

Definition of test levels are always a bit blurry when writing tests for a data 
processing framework, but I'm convinced that in your case, you should rather 
think in terms of integration tests than unit tests:
* Unit test should really just be about business logic
* If it's about specific implementation details of other components, it should 
rather go in an integration test.

You can still structure your code that only half of the pipeline or even just 
one step is executed in an ITCase, but it's much harder to do all the mocking 
than simply executing a small Flink program with a local runner. ITCases are 
really fast and will not limit the portability of your program to newer Flink 
version (which mocking of components usually do).

Another idea for your specific use case would be to implement a backend that 
delegates to HeapMemory but copies all values on retrieval.

On Mon, Sep 7, 2020 at 5:49 PM Dawid Wysakowicz 
mailto:dwysakow...@apache.org>> wrote:

Hi Alexey,

There is no mock for RocksDB. Moreover I am not sure what would be the use case 
for one. If you want to test specifically against RocksDB then you can use it 
in the test harness Gordon mentioned.

On 04/09/2020 16:31, Alexey Trenikhun wrote:
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core 
specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);

Thanks,
Alexey


From: Tzu-Li (Gordon) Tai <mailto:tzuli...@apache.org>
Sent: Friday, September 4, 2020 12:35:48 AM
To: Alexey Trenikhun <mailto:yen...@msn.com>
Cc: Flink User Mail List <mailto:user@flink.apache.org>
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state

Hi Alexey,

Is there a specific reason why you want to test against RocksDB?

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] 
that allows you to wrap a user function and eliminate the need to worry about 
setting up heavy runtime context / dependencies such as the state backend.
As a unit test, this should be sufficient for you to implement basic test 
scenarios for your function, such as expected output given inputs, state etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1] 
https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state 
(like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey


--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng


Re: Unit Test for KeyedProcessFunction with out-of-core state

2020-09-14 Thread Arvid Heise
Hi Alexey,

Definition of test levels are always a bit blurry when writing tests for a
data processing framework, but I'm convinced that in your case, you should
rather think in terms of integration tests than unit tests:
* Unit test should really just be about business logic
* If it's about specific implementation details of other components, it
should rather go in an integration test.

You can still structure your code that only half of the pipeline or even
just one step is executed in an ITCase, but it's much harder to do all the
mocking than simply executing a small Flink program with a local runner.
ITCases are really fast and will not limit the portability of your program
to newer Flink version (which mocking of components usually do).

Another idea for your specific use case would be to implement a backend
that delegates to HeapMemory but copies all values on retrieval.

On Mon, Sep 7, 2020 at 5:49 PM Dawid Wysakowicz 
wrote:

> Hi Alexey,
>
> There is no mock for RocksDB. Moreover I am not sure what would be the use
> case for one. If you want to test specifically against RocksDB then you can
> use it in the test harness Gordon mentioned.
> On 04/09/2020 16:31, Alexey Trenikhun wrote:
>
> Hi Gordon,
> We already use [1]. Unfortunately it doesn’t allow to detect out-of-core
> specific bugs like this:
> POJO v = myMapState.get(myKey):
> v.setStatus(1);
> return;
> // missing myMapState.put(myKey, v);
>
> Thanks,
> Alexey
>
> --
> *From:* Tzu-Li (Gordon) Tai  
> *Sent:* Friday, September 4, 2020 12:35:48 AM
> *To:* Alexey Trenikhun  
> *Cc:* Flink User Mail List  
> *Subject:* Re: Unit Test for KeyedProcessFunction with out-of-core state
>
> Hi Alexey,
>
> Is there a specific reason why you want to test against RocksDB?
>
> Otherwise, in Flink tests we use a
> `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user
> function and eliminate the need to worry about setting up heavy runtime
> context / dependencies such as the state backend.
> As a unit test, this should be sufficient for you to implement basic test
> scenarios for your function, such as expected output given inputs, state
> etc.
> Does this provide what you are looking for?
>
> Cheers,
> Gordon
>
> [1]
> https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
>
> On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun  wrote:
>
> Hello,
> I want to unit test KeyedProcessFunction which uses with out-of-core state
> (like rocksdb).
> Does Flink has mock for rocksdb, which can be used in unit tests ?
>
> Thanks,
> Alexey
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Unit Test for KeyedProcessFunction with out-of-core state

2020-09-04 Thread Alexey Trenikhun
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core 
specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);

Thanks,
Alexey


From: Tzu-Li (Gordon) Tai 
Sent: Friday, September 4, 2020 12:35:48 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: Unit Test for KeyedProcessFunction with out-of-core state

Hi Alexey,

Is there a specific reason why you want to test against RocksDB?

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] 
that allows you to wrap a user function and eliminate the need to worry about 
setting up heavy runtime context / dependencies such as the state backend.
As a unit test, this should be sufficient for you to implement basic test 
scenarios for your function, such as expected output given inputs, state etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1] 
https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state 
(like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey


Re: Unit Test for KeyedProcessFunction with out-of-core state

2020-09-04 Thread Tzu-Li (Gordon) Tai
Hi Alexey,

Is there a specific reason why you want to test against RocksDB?

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness`
[1] that allows you to wrap a user function and eliminate the need to worry
about setting up heavy runtime context / dependencies such as the state
backend.
As a unit test, this should be sufficient for you to implement basic test
scenarios for your function, such as expected output given inputs, state
etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1]
https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun  wrote:

> Hello,
> I want to unit test KeyedProcessFunction which uses with out-of-core state
> (like rocksdb).
> Does Flink has mock for rocksdb, which can be used in unit tests ?
>
> Thanks,
> Alexey
>


Unit Test for KeyedProcessFunction with out-of-core state

2020-09-03 Thread Alexey Trenikhun
Hello,
I want to unit test KeyedProcessFunction which uses with out-of-core state 
(like rocksdb).
Does Flink has mock for rocksdb, which can be used in unit tests ?

Thanks,
Alexey


Re: 如何在KeyedProcessFunction中获取processingTime

2020-08-17 Thread shizk233
ctx.timestamp()其实就是获取的StreamRecord的时间戳,也就是事件被提取出来的时间戳。
这个方法一般需要使用event time,并且在数据流上assign过timestamp和watermark。

ゞ野蠻遊戲χ  于2020年8月16日周日 下午7:57写道:

> 大家好
>
>   
> 当我在KeyedProcessFunction的processElement方法中获取processingTime,就像这样ctx.timestamp(),返回为null,我改如何在processElement中获取processingTime?
>
>
> 谢谢!
> 嘉治


Re: 如何在KeyedProcessFunction中获取processingTime

2020-08-16 Thread Zhao,Yi(SEC)
根据Context获取timerService,然后获取处理时间即可。


在 2020/8/16 下午7:57,“ゞ野蠻遊戲χ” 写入:

大家好

   
当我在KeyedProcessFunction的processElement方法中获取processingTime,就像这样ctx.timestamp(),返回为null,我改如何在processElement中获取processingTime?


谢谢!
嘉治



Re: Question about Watermarks within a KeyedProcessFunction

2020-06-27 Thread David Anderson
With an AscendingTimestampExtractor, watermarks are not created for every
event, and as your job starts up, some events will be processed before the
first watermark is generated.

The impossible value you see is an initial value that's in place until the
first real watermark is available. On the other hand, onTimer can not be
called until some timer is triggered by the arrival of a watermark, at
which point the watermark will have a reasonable value.

On Sat, Jun 27, 2020 at 2:37 AM Marco Villalobos 
wrote:

>
> My source is a Kafka topic.
> I am using Event Time.
> I assign the event time with an AscendingTimestampExtractor
>
> I noticed when debugging that in the KeyedProcessFunction that
> after my highest known event time of:  2020-06-23T00:46:30.000Z
>
> the processElement method had a watermark with an impossible date of:
> -292275055-05-16T16:47:04.192Z
>
> but in the onTimer method it had a more reasonable value that trails the
> highest known event time by 1 millisecond, which is this
> value:  2020-06-23T00:46:29.999Z
>
> I want to know, why does the processElement method have an impossible
> watermark value?
>
>
>


Question about Watermarks within a KeyedProcessFunction

2020-06-26 Thread Marco Villalobos
My source is a Kafka topic.
I am using Event Time.
I assign the event time with an AscendingTimestampExtractor

I noticed when debugging that in the KeyedProcessFunction that
after my highest known event time of:  2020-06-23T00:46:30.000Z

the processElement method had a watermark with an impossible date of:
-292275055-05-16T16:47:04.192Z

but in the onTimer method it had a more reasonable value that trails the
highest known event time by 1 millisecond, which is this
value:  2020-06-23T00:46:29.999Z

I want to know, why does the processElement method have an impossible
watermark value?


Re: KeyedStream and keyedProcessFunction

2020-06-09 Thread 1048262223
Hi


+1. Because there is no need to generate an instance for each key, flink just 
maintain the key collection in one instance. Imagine what would happen if the 
number of keys were unlimited.



Best,
Yichao Yang




--Original--
From:"Tzu-Li (Gordon) Tai"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: KeyedStream and keyedProcessFunction

2020-06-09 Thread Tzu-Li (Gordon) Tai
Hi,

Records with the same key will be processed by the same partition.
Note there isn't an instance of a keyed process function for each key.
There is a single instance per partition, and all keys that are distributed
to the same partition will get processed by the same keyed process function
instance.

Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


KeyedStream and keyedProcessFunction

2020-06-09 Thread Jaswin Shah
Hi All,

I have a keyed data stream and calling a keyedProcessFunction after keyBy 
operation on datastream. Till now my understanding was, "For all different n- 
elements in keyed stream if their keys are same, same instance of 
keyedProcessFunction is called and for another elements with different keyes, 
another instances are invoked. But, I am observing some different , different 
behaviours. Can anyone please correct me on this?

Thanks,
Jaswin


Re: Why the current time of my window never reaches the end time when I use KeyedProcessFunction?

2019-06-18 Thread Felipe Gutierrez
I achieved some enhancement based on [1]. My code is here [2]. Basically I
am using "ctx.timerService().registerProcessingTimeTimer(timeoutTime);"
inside the processElement method to trigger the onTimer method. And when
the onTimer method is triggered I clean the state using
"hllStateTwitter.clear();". However, I still have a question. I set the
time out to 5000 miliseconds and the onTimer method is triggered slightly
different. Why is it happening?

process: 1560850703025 - 1560850708025
onTimer: 1560850708025 - 1560850713017 = 4992
3> estimate cardinality: 544
process: 1560850709019 - 1560850714019
onTimer: 1560850714019 - 1560850718942 = 4923
3> estimate cardinality: 485
process: 1560850714027 - 1560850719027
onTimer: 1560850719027 - 1560850723936 = 4909
3> estimate cardinality: 438
process: 1560850719035 - 1560850724035

[1] https://stackoverflow.com/a/53646529/2096986
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Jun 18, 2019 at 11:15 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> I am sorry, I wanted to point this reference
> https://stackoverflow.com/a/47071833/2096986 which implements a window on
> a ProcessFunction in Flink.
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Tue, Jun 18, 2019 at 9:22 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Isn’t your problem that the source is constantly emitting the data and
>> bumping your timers? Keep in mind that the code that you are basing on has
>> the following characteristic:
>>
>> > In the following example a KeyedProcessFunction maintains counts per
>> key, and emits a key/count pair whenever a *minute passes without an
>> update for that key*
>>
>> Piotrek
>>
>> On 17 Jun 2019, at 15:51, Felipe Gutierrez 
>> wrote:
>>
>> Hi,
>>
>> I used this example of KeyedProcessFunction from the FLink website [1]
>> and I have implemented my own KeyedProcessFunction to process some
>> approximation counting [2]. This worked very well. Then I switched the data
>> source to consume strings from Twitter [3]. The data source is consuming
>> the strings because I can see it when I debug. However, the time comparison
>> is always different on the onTimer() method, and I never get the results of
>> the window processing. I don't know the exact reason that this is
>> happening. I guess it is because my state is too heavy. But, still
>> shouldn't the time be correct at some point to finish the evaluation of my
>> window?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>> [2]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
>> [3]
>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>>
>> Kind Regards,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com/>*
>>
>>
>>


Re: Why the current time of my window never reaches the end time when I use KeyedProcessFunction?

2019-06-18 Thread Felipe Gutierrez
I am sorry, I wanted to point this reference
https://stackoverflow.com/a/47071833/2096986 which implements a window on a
ProcessFunction in Flink.
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Tue, Jun 18, 2019 at 9:22 AM Piotr Nowojski  wrote:

> Hi,
>
> Isn’t your problem that the source is constantly emitting the data and
> bumping your timers? Keep in mind that the code that you are basing on has
> the following characteristic:
>
> > In the following example a KeyedProcessFunction maintains counts per
> key, and emits a key/count pair whenever a *minute passes without an
> update for that key*
>
> Piotrek
>
> On 17 Jun 2019, at 15:51, Felipe Gutierrez 
> wrote:
>
> Hi,
>
> I used this example of KeyedProcessFunction from the FLink website [1] and
> I have implemented my own KeyedProcessFunction to process some
> approximation counting [2]. This worked very well. Then I switched the data
> source to consume strings from Twitter [3]. The data source is consuming
> the strings because I can see it when I debug. However, the time comparison
> is always different on the onTimer() method, and I never get the results of
> the window processing. I don't know the exact reason that this is
> happening. I guess it is because my state is too heavy. But, still
> shouldn't the time be correct at some point to finish the evaluation of my
> window?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>
> Kind Regards,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com/>*
>
>
>


Re: Why the current time of my window never reaches the end time when I use KeyedProcessFunction?

2019-06-18 Thread Piotr Nowojski
Hi,

Isn’t your problem that the source is constantly emitting the data and bumping 
your timers? Keep in mind that the code that you are basing on has the 
following characteristic:

> In the following example a KeyedProcessFunction maintains counts per key, and 
> emits a key/count pair whenever a minute passes without an update for that key

Piotrek

> On 17 Jun 2019, at 15:51, Felipe Gutierrez  
> wrote:
> 
> Hi,
> 
> I used this example of KeyedProcessFunction from the FLink website [1] and I 
> have implemented my own KeyedProcessFunction to process some approximation 
> counting [2]. This worked very well. Then I switched the data source to 
> consume strings from Twitter [3]. The data source is consuming the strings 
> because I can see it when I debug. However, the time comparison is always 
> different on the onTimer() method, and I never get the results of the window 
> processing. I don't know the exact reason that this is happening. I guess it 
> is because my state is too heavy. But, still shouldn't the time be correct at 
> some point to finish the evaluation of my window?
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example>
> [2] 
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
>  
> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java>
> [3] 
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java
>  
> <https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java>
> 
> Kind Regards,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com 
> <https://felipeogutierrez.blogspot.com/>


Why the current time of my window never reaches the end time when I use KeyedProcessFunction?

2019-06-17 Thread Felipe Gutierrez
Hi,

I used this example of KeyedProcessFunction from the FLink website [1] and
I have implemented my own KeyedProcessFunction to process some
approximation counting [2]. This worked very well. Then I switched the data
source to consume strings from Twitter [3]. The data source is consuming
the strings because I can see it when I debug. However, the time comparison
is always different on the onTimer() method, and I never get the results of
the window processing. I don't know the exact reason that this is
happening. I guess it is because my state is too heavy. But, still
shouldn't the time be correct at some point to finish the evaluation of my
window?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java
[3]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java

Kind Regards,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


Re: Is KeyedProcessFunction available in Flink 1.4?

2018-07-20 Thread anna stax
Thanks Bowen.

On Thu, Jul 19, 2018 at 4:45 PM, Bowen Li  wrote:

> Hi Anna,
>
> KeyedProcessFunction is only available starting from Flink 1.5. The doc is
> here
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction>.
> It extends ProcessFunction and shares the same functionalities except
> giving more access to timers' key, thus you can refer to examples of
> ProcessFunction in that document.
>
> Bowen
>
>
> On Thu, Jul 19, 2018 at 3:26 PM anna stax  wrote:
>
>> Hello all,
>> I am using Flink 1.4 because thats the version provided by the latest AWS
>> EMR.
>> Is KeyedProcessFunction available in Flink 1.4?
>>
>> Also please share any links to good examples on using
>> KeyedProcessFunction .
>>
>> Thanks
>>
>


Re: Is KeyedProcessFunction available in Flink 1.4?

2018-07-19 Thread Bowen Li
Hi Anna,

KeyedProcessFunction is only available starting from Flink 1.5. The doc is
here
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction>.
It extends ProcessFunction and shares the same functionalities except
giving more access to timers' key, thus you can refer to examples of
ProcessFunction in that document.

Bowen


On Thu, Jul 19, 2018 at 3:26 PM anna stax  wrote:

> Hello all,
> I am using Flink 1.4 because thats the version provided by the latest AWS
> EMR.
> Is KeyedProcessFunction available in Flink 1.4?
>
> Also please share any links to good examples on using KeyedProcessFunction
> .
>
> Thanks
>


Is KeyedProcessFunction available in Flink 1.4?

2018-07-19 Thread anna stax
Hello all,
I am using Flink 1.4 because thats the version provided by the latest AWS
EMR.
Is KeyedProcessFunction available in Flink 1.4?

Also please share any links to good examples on using KeyedProcessFunction
.

Thanks