Hi Piotr, Thanks, and darn it that’s something I should have noticed.
— Ken > On Aug 16, 2018, at 4:37 AM, Piotr Nowojski <pi...@data-artisans.com> wrote: > > Hi, > > You made a small mistake when restoring from state using test harness, that I > myself have also done in the past. Problem is with an ordering of those calls: > > result.open(); > if (savedState != null) { > result.initializeState(savedState); > } > > Open is supposed to be called after initializeState, and if you look into the > code of AbstractStreamOperatorTestHarness#open, if it is called before > initialize, it will initialize harness without any state. > > Unfortunate is that this is implicit behaviour that doesn’t throw any error > (test harness is not part of a Flink’s public api). I will try to fix this: > https://issues.apache.org/jira/browse/FLINK-10159 > <https://issues.apache.org/jira/browse/FLINK-10159> > > Piotrek > >> On 16 Aug 2018, at 00:24, Ken Krugler <kkrugler_li...@transpac.com >> <mailto:kkrugler_li...@transpac.com>> wrote: >> >> Hi all, >> >> It looks to me like the OperatorSubtaskState returned from >> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that >> had been registered via registerProcessingTimeTimer but had not yet fired >> when the snapshot was saved. >> >> Is this a known limitation of OneInputStreamOperatorTestHarness? >> >> If not, is there anything special I need to do when setting up the test >> harness to ensure that timers are saved? >> >> Below is the unit test, which shows how the test harness is being set up and >> run. >> >> The TimerFunction used in this test does seem to be doing the right thing, >> as using it in a simple job on a local Flink cluster works as expected when >> creating & then restarting from a savepoint. >> >> Thanks, >> >> — Ken >> >> ================================================================================================== >> TimerTest.java >> ================================================================================================== >> package com.scaleunlimited.flinkcrawler.functions; >> >> import static org.junit.Assert.assertTrue; >> >> import java.util.ArrayList; >> import java.util.List; >> >> import org.apache.flink.api.common.typeinfo.BasicTypeInfo; >> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; >> import org.apache.flink.streaming.api.operators.KeyedProcessOperator; >> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; >> import >> org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; >> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; >> import org.junit.Before; >> import org.junit.Test; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> import com.scaleunlimited.flinkcrawler.tools.TimerTool; >> >> public class TimerTest { >> public static final Logger LOGGER = >> LoggerFactory.getLogger(TimerTest.class); >> >> private List<Long> _firedTimers = new ArrayList<Long>(); >> >> @Before >> public void setUp() throws Exception { >> } >> >> @Test >> public void testTimerSaving() throws Throwable { >> >> // This operator doesn't really do much at all, but the first element >> // it processes will create a timer for (timestamp+1). >> // Whenever that timer fires, it will create another timer for >> // (timestamp+1). >> KeyedProcessOperator<Integer, Integer, Integer> operator = >> new KeyedProcessOperator<>(new TimerFunction(_firedTimers)); >> >> // Create a test harness from scratch >> OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = >> makeTestHarness(operator, null); >> >> // We begin at time zero >> testHarness.setProcessingTime(0); >> >> // Process some elements, which should also create a timer for time >> 1. >> int inputs[] = new int[] {1, 2, 3}; >> for (int input : inputs) { >> testHarness.processElement(new StreamRecord<>(input)); >> } >> >> // Force some time to pass, which should keep moving the timer ahead, >> // finally leaving it set for time 10. >> for (long i = 1; i < 10; i++) { >> testHarness.setProcessingTime(i); >> } >> >> // Save the state, which we assume should include the timer we set >> for >> // time 10. >> OperatorSubtaskState savedState = >> testHarness.snapshot(0L, testHarness.getProcessingTime()); >> >> // Close the first test harness >> testHarness.close(); >> >> // Create a new test harness using the saved state (which we assume >> // includes the timer for time 10). >> testHarness = makeTestHarness(operator, savedState); >> >> // Force more time to pass, which should keep moving the timer ahead. >> for (long i = 10; i < 20; i++) { >> testHarness.setProcessingTime(i); >> } >> >> // Close the second test harness and make sure all the timers we >> expect >> // actually fired. >> testHarness.close(); >> for (long i = 1; i < 20; i++) { >> >> // TODO This expectation currently fails, since Timers don't >> // seem to be included in the snapshot, at least the one >> produced by >> // the test harness. >> assertTrue(_firedTimers.contains(i)); >> } >> } >> >> private OneInputStreamOperatorTestHarness<Integer, Integer> >> makeTestHarness( >> KeyedProcessOperator<Integer, Integer, Integer> operator, >> OperatorSubtaskState savedState) >> throws Exception { >> OneInputStreamOperatorTestHarness<Integer, Integer> result; >> result = >> new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, >> Integer>( >> operator, >> new TimerTool.IdentityKeySelector<Integer>(), >> BasicTypeInfo.INT_TYPE_INFO); >> result.setup(); >> result.open(); >> if (savedState != null) { >> result.initializeState(savedState); >> } >> return result; >> } >> } >> >> >> ================================================================================================== >> TimerFunction.java >> ================================================================================================== >> package com.scaleunlimited.flinkcrawler.functions; >> >> import java.util.List; >> >> import org.apache.flink.streaming.api.functions.KeyedProcessFunction; >> import org.apache.flink.util.Collector; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> @SuppressWarnings("serial") >> public class TimerFunction >> extends KeyedProcessFunction<Integer, Integer, Integer> { >> static final Logger LOGGER = >> LoggerFactory.getLogger(TimerFunction.class); >> >> List<Long> _firedTimers; >> long _period; >> >> public TimerFunction(List<Long> firedTimers) { >> this(firedTimers, 1); >> } >> >> public TimerFunction(List<Long> firedTimers, long period) { >> super(); >> _firedTimers = firedTimers; >> _period = period; >> } >> >> @Override >> public void onTimer(long timestamp, >> KeyedProcessFunction<Integer, Integer, >> Integer>.OnTimerContext context, >> Collector<Integer> out) throws Exception { >> super.onTimer(timestamp, context, out); >> _firedTimers.add(timestamp); >> long nextTimestamp = timestamp + _period; >> LOGGER.info <http://logger.info/>("Firing at {}; Setting new timer >> for {}", >> timestamp, >> nextTimestamp); >> context.timerService().registerProcessingTimeTimer(nextTimestamp); >> } >> >> @Override >> public void processElement( Integer input, >> KeyedProcessFunction<Integer, Integer, >> Integer>.Context context, >> Collector<Integer> out) >> throws Exception { >> >> LOGGER.info <http://logger.info/>("Processing input {}", input); >> if (_firedTimers.isEmpty()) { >> long firstTimestamp = >> context.timerService().currentProcessingTime() + _period; >> LOGGER.info <http://logger.info/>("Setting initial timer for {}", >> firstTimestamp); >> >> context.timerService().registerProcessingTimeTimer(firstTimestamp); >> } >> >> out.collect(input); >> } >> } >> >> >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> Custom big data solutions & training >> Flink, Solr, Hadoop, Cascading & Cassandra >> > -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra