Hi Avi,

Just to verify your ITCase, I wrote the following dummy example and it
seems to be "working" (ie. I can see non null timestamps and timers firing).


StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);

env
      .addSource(new LongSource())
      .keyBy(elmnt -> elmnt)
      .process(new KeyedProcessFunction<Long, Long, Long>() {

         @Override
         public void processElement(Long value, Context ctx,
Collector<Long> out) throws Exception {


            long timestamp = ctx.timestamp();
            long timerTimestamp = timestamp + Time.seconds(10).toMilliseconds();

            System.out.println(ctx.timestamp() + " " + timerTimestamp);

            ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
         }

         @Override
         public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Long> out) throws Exception {
            System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain());
         }
      }).print();
env.execute();

The source is:

private static final class LongSource implements SourceFunction<Long> {

   private volatile boolean running = true;

   private long element = 0L;

   @Override
   public void run(SourceContext<java.lang.Long> ctx) throws Exception {
      while (running) {
         ctx.collect(element++ % 10);
         Thread.sleep(10L);
      }
   }

   @Override
   public void cancel() {

   }
}


Could you provide more details on how your usecase differs from the above
dummy example so that we can pin down the problem?

As a side-note, Ingestion time is essentially event time, with the only
difference that the timestamp assigner in the beginning gives each element
the timestamp System.currentTimeMillis. So in this case, maybe you could
also consider setting event time timers but keep in mind then your
Watermark emission interval.

In addition, if you want to simply check processing time processing of you
operator (not the whole pipeline), then you could make use of the
OneInputStreamTaskTestHarness or its keyed variant. This allows you to
provide your own processing time provider thus allow you to
deterministically
test processing time behaviour.

Cheers,
Kostas



On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <avi.l...@bluevoyant.com> wrote:

> Any idea what should I do to overcome this?
>
> On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <avi.l...@bluevoyant.com> wrote:
>
>> Hi Andrey,
>> I am testing a Filter operator that receives a key from the stream and
>> checks if it is a new one or not. if it is new it keeps it in state and
>> fire a timer all that is done using the ProcessFunction.
>> The testing is using some CollectSink as described here
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing>
>>  and
>> the source is implementation of the SourceFunction that accepts a
>> collection of values and adds it to ctx.collect .
>> The ctx.timestamp() is null, BUT even if I set the timer to sometime in
>> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp +
>> x) the timer is fired immediately.
>>
>>
>> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <and...@ververica.com>
>> wrote:
>>
>>> Hi Avi,
>>>
>>> what is the structure of your unit test? do you create some source and
>>> then apply function or you test only ProcessFunction methods in isolation?
>>> does ctx.timestamp() return zero or which value?
>>>
>>> Best,
>>> Andrey
>>>
>>>
>>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <avi.l...@bluevoyant.com>
>>> wrote:
>>>
>>>> Hi Andrey ,
>>>> I'm using IngestionTime
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>>
>>>> This is my timer in the processElement:
>>>>    val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
>>>>    ctx.timerService.registerProcessingTimeTimer(nextTim)
>>>>
>>>> The problem is how do I use it in my unit tests ? since there is no
>>>> IngestionTime and timers are fired immediately so the timers actions (such
>>>> as state cleanup) are fired before time and causing the tests to fail .
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <and...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Avi,
>>>>>
>>>>> do you use processing time timer
>>>>> (timerService().registerProcessingTimeTimer)?
>>>>> why do you need ingestion time? do you
>>>>> set TimeCharacteristic.IngestionTime?
>>>>>
>>>>> Best,
>>>>> Andrey
>>>>>
>>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> Our stream is not based on time sequence and we do not use time based
>>>>>> operations. we do want to clean the state after x days hence we fire 
>>>>>> timer
>>>>>> event. My problem is that our unit test fires the event immediately 
>>>>>> (there
>>>>>> is no ingestion time) how can I inject ingestion time ?
>>>>>>
>>>>>> Cheers
>>>>>> Avi
>>>>>>
>>>>>>

Reply via email to