Hi Piotr,

Thanks, and darn it that’s something I should have noticed.

— Ken


> On Aug 16, 2018, at 4:37 AM, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> Hi,
> 
> You made a small mistake when restoring from state using test harness, that I 
> myself have also done in the past. Problem is with an ordering of those calls:
> 
>         result.open();
>         if (savedState != null) {
>             result.initializeState(savedState);
>         }
> 
> Open is supposed to be called after initializeState, and if you look into the 
> code of AbstractStreamOperatorTestHarness#open, if it is called before 
> initialize, it will initialize harness without any state.
> 
> Unfortunate is that this is implicit behaviour that doesn’t throw any error 
> (test harness is not part of a Flink’s public api). I will try to fix this: 
> https://issues.apache.org/jira/browse/FLINK-10159 
> <https://issues.apache.org/jira/browse/FLINK-10159>
> 
> Piotrek
> 
>> On 16 Aug 2018, at 00:24, Ken Krugler <kkrugler_li...@transpac.com 
>> <mailto:kkrugler_li...@transpac.com>> wrote:
>> 
>> Hi all,
>> 
>> It looks to me like the OperatorSubtaskState returned from 
>> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that 
>> had been registered via registerProcessingTimeTimer but had not yet fired 
>> when the snapshot was saved.
>> 
>> Is this a known limitation of OneInputStreamOperatorTestHarness?
>> 
>> If not, is there anything special I need to do when setting up the test 
>> harness to ensure that timers are saved?
>> 
>> Below is the unit test, which shows how the test harness is being set up and 
>> run.
>> 
>> The TimerFunction used in this test does seem to be doing the right thing, 
>> as using it in a simple job on a local Flink cluster works as expected when 
>> creating & then restarting from a savepoint.
>> 
>> Thanks,
>> 
>> — Ken
>> 
>> ==================================================================================================
>> TimerTest.java
>> ==================================================================================================
>> package com.scaleunlimited.flinkcrawler.functions;
>> 
>> import static org.junit.Assert.assertTrue;
>> 
>> import java.util.ArrayList;
>> import java.util.List;
>> 
>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
>> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>> import 
>> org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
>> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
>> import org.junit.Before;
>> import org.junit.Test;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
>> 
>> public class TimerTest {
>>     public static final Logger LOGGER = 
>> LoggerFactory.getLogger(TimerTest.class);
>> 
>>     private List<Long> _firedTimers = new ArrayList<Long>();
>> 
>>     @Before
>>     public void setUp() throws Exception {
>>     }
>>     
>>     @Test
>>     public void testTimerSaving() throws Throwable {
>>         
>>         // This operator doesn't really do much at all, but the first element
>>         // it processes will create a timer for (timestamp+1).
>>         // Whenever that timer fires, it will create another timer for 
>>         // (timestamp+1).
>>         KeyedProcessOperator<Integer, Integer, Integer> operator = 
>>             new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
>>         
>>         // Create a test harness from scratch
>>         OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
>>             makeTestHarness(operator, null);
>>         
>>         // We begin at time zero
>>         testHarness.setProcessingTime(0);
>> 
>>         // Process some elements, which should also create a timer for time 
>> 1.
>>         int inputs[] = new int[] {1, 2, 3};
>>         for (int input : inputs) {
>>             testHarness.processElement(new StreamRecord<>(input));
>>         }
>>         
>>         // Force some time to pass, which should keep moving the timer ahead,
>>         // finally leaving it set for time 10.
>>         for (long i = 1; i < 10; i++) {
>>             testHarness.setProcessingTime(i);
>>         }
>>         
>>         // Save the state, which we assume should include the timer we set 
>> for
>>         // time 10.
>>         OperatorSubtaskState savedState = 
>>             testHarness.snapshot(0L, testHarness.getProcessingTime());
>>         
>>         // Close the first test harness
>>         testHarness.close();
>>         
>>         // Create a new test harness using the saved state (which we assume
>>         // includes the timer for time 10).
>>         testHarness = makeTestHarness(operator, savedState);
>>         
>>         // Force more time to pass, which should keep moving the timer ahead.
>>         for (long i = 10; i < 20; i++) {
>>             testHarness.setProcessingTime(i);
>>         }
>>         
>>         // Close the second test harness and make sure all the timers we 
>> expect
>>         // actually fired.
>>         testHarness.close();
>>         for (long i = 1; i < 20; i++) {
>>             
>>             // TODO This expectation currently fails, since Timers don't
>>             // seem to be included in the snapshot, at least the one 
>> produced by
>>             // the test harness.
>>             assertTrue(_firedTimers.contains(i));
>>         }
>>     }
>> 
>>     private OneInputStreamOperatorTestHarness<Integer, Integer> 
>> makeTestHarness(
>>             KeyedProcessOperator<Integer, Integer, Integer> operator,
>>             OperatorSubtaskState savedState) 
>>             throws Exception {
>>         OneInputStreamOperatorTestHarness<Integer, Integer> result;
>>         result = 
>>             new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
>> Integer>(
>>                     operator,
>>                     new TimerTool.IdentityKeySelector<Integer>(),
>>                     BasicTypeInfo.INT_TYPE_INFO);
>>         result.setup();
>>         result.open();
>>         if (savedState != null) {
>>             result.initializeState(savedState);
>>         }
>>         return result;
>>     }
>> }
>> 
>> 
>> ==================================================================================================
>> TimerFunction.java
>> ==================================================================================================
>> package com.scaleunlimited.flinkcrawler.functions;
>> 
>> import java.util.List;
>> 
>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>> import org.apache.flink.util.Collector;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> 
>> @SuppressWarnings("serial")
>> public class TimerFunction
>>     extends KeyedProcessFunction<Integer, Integer, Integer> {
>>     static final Logger LOGGER = 
>> LoggerFactory.getLogger(TimerFunction.class);
>>    
>>     List<Long> _firedTimers;
>>     long _period;
>> 
>>     public TimerFunction(List<Long> firedTimers) {
>>         this(firedTimers, 1);
>>     }
>> 
>>     public TimerFunction(List<Long> firedTimers, long period) {
>>         super();
>>         _firedTimers = firedTimers;
>>         _period = period;
>>     }
>> 
>>     @Override
>>     public void onTimer(long timestamp,
>>                         KeyedProcessFunction<Integer, Integer, 
>> Integer>.OnTimerContext context,
>>                         Collector<Integer> out) throws Exception {
>>         super.onTimer(timestamp, context, out);
>>         _firedTimers.add(timestamp);
>>         long nextTimestamp = timestamp + _period;
>>         LOGGER.info <http://logger.info/>("Firing at {}; Setting new timer 
>> for {}",
>>                     timestamp,
>>                     nextTimestamp);
>>         context.timerService().registerProcessingTimeTimer(nextTimestamp);
>>     }
>> 
>>     @Override
>>     public void processElement( Integer input,
>>                                 KeyedProcessFunction<Integer, Integer, 
>> Integer>.Context context,
>>                                 Collector<Integer> out)
>>         throws Exception {
>>         
>>         LOGGER.info <http://logger.info/>("Processing input {}", input);
>>         if (_firedTimers.isEmpty()) {
>>             long firstTimestamp = 
>>                 context.timerService().currentProcessingTime() + _period;
>>             LOGGER.info <http://logger.info/>("Setting initial timer for {}",
>>                         firstTimestamp);
>>             
>> context.timerService().registerProcessingTimeTimer(firstTimestamp);
>>         }
>>         
>>         out.collect(input);
>>     }
>> }
>> 
>> 
>> 
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to