[ 
https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng updated FLINK-9201:
---------------------------
    Description: 
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds
 * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
 *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
at 11.
 * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3. w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

Examples

{code}

@Test
@SuppressWarnings("unchecked")
public void testSessionWindowsFiredTwice() throws Exception {
 closeCalled.set(0);

 final int sessionSize = 3;

 TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");

 ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
ListStateDescriptor<>("window-contents",
 inputType.createSerializer(new ExecutionConfig()));

 WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, 
Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new 
WindowOperator<>(
 EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
 new TimeWindow.Serializer(),
 new TupleKeySelector(),
 BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
 stateDesc,
 new InternalIterableWindowFunction<>(new SessionWindowFunction()),
 EventTimeTrigger.create(),
 60000,
 null /* late data output tag */);

 OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, 
Long, Long>> testHarness =
 createTestHarness(operator);

 ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();

 testHarness.open();

 // add elements out-of-order
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));

 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
 testHarness.processWatermark(new Watermark(5500));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
5499));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
3999));
 expectedOutput.add(new Watermark(5500));
 // do a snapshot, close and restore again
 OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);

 TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 testHarness.close();

 testHarness = createTestHarness(operator);
 testHarness.setup();
 testHarness.initializeState(snapshot);
 testHarness.open();
 expectedOutput.clear();
 //suppose the watermark alread arrived 10000
 testHarness.processWatermark(new Watermark(10000));
 //late element with timestamp 4500 had arrived,the new session window[0, 7500] 
is still a valid window becase of maxtimestamp < cleantime
 //and fired immediately
 testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 4500));
 expectedOutput.add(new Watermark(10000));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
7499));
 //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
again
 testHarness.processWatermark(new Watermark(11000));
 expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
7499));
 expectedOutput.add(new Watermark(11000));
 TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
 testHarness.close();
}

{code}

 

 

 

  was:
sum with session window,.suppose the session gap is 3 seconds and 
allowedlateness is 60 seconds
 * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 9
 *  if a late element (w2,TimeWindow[7,10]) had come but the watermark already 
at 11.
 * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register a 
new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired first 
time by call triggerContext.onElement(element) because of the watermark pass 
the w3. w3 will be fired second times because of the timer < current watermark.

that mean w3 will be fired  twice because of watermark pass the new merge 
window w3.

 

 


> same merge window will be fired twice if watermark already pass the new 
> merged window
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-9201
>                 URL: https://issues.apache.org/jira/browse/FLINK-9201
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.3.3
>            Reporter: yuemeng
>            Assignee: yuemeng
>            Priority: Blocker
>
> sum with session window,.suppose the session gap is 3 seconds and 
> allowedlateness is 60 seconds
>  * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached 
> 9
>  *  if a late element (w2,TimeWindow[7,10]) had come but the watermark 
> already at 11.
>  * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register 
> a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired 
> first time by call triggerContext.onElement(element) because of the watermark 
> pass the w3. w3 will be fired second times because of the timer < current 
> watermark.
> that mean w3 will be fired  twice because of watermark pass the new merge 
> window w3.
> Examples
> {code}
> @Test
> @SuppressWarnings("unchecked")
> public void testSessionWindowsFiredTwice() throws Exception {
>  closeCalled.set(0);
>  final int sessionSize = 3;
>  TypeInformation<Tuple2<String, Integer>> inputType = 
> TypeInfoParser.parse("Tuple2<String, Integer>");
>  ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new 
> ListStateDescriptor<>("window-contents",
>  inputType.createSerializer(new ExecutionConfig()));
>  WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, 
> Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new 
> WindowOperator<>(
>  EventTimeSessionWindows.withGap(Time.seconds(sessionSize)),
>  new TimeWindow.Serializer(),
>  new TupleKeySelector(),
>  BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
>  stateDesc,
>  new InternalIterableWindowFunction<>(new SessionWindowFunction()),
>  EventTimeTrigger.create(),
>  60000,
>  null /* late data output tag */);
>  OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, 
> Long, Long>> testHarness =
>  createTestHarness(operator);
>  ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
>  testHarness.open();
>  // add elements out-of-order
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 
> 1000));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 
> 2500));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 
> 1000));
>  testHarness.processWatermark(new Watermark(5500));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), 
> 5499));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), 
> 3999));
>  expectedOutput.add(new Watermark(5500));
>  // do a snapshot, close and restore again
>  OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
>  testHarness = createTestHarness(operator);
>  testHarness.setup();
>  testHarness.initializeState(snapshot);
>  testHarness.open();
>  expectedOutput.clear();
>  //suppose the watermark alread arrived 10000
>  testHarness.processWatermark(new Watermark(10000));
>  //late element with timestamp 4500 had arrived,the new session window[0, 
> 7500] is still a valid window becase of maxtimestamp < cleantime
>  //and fired immediately
>  testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), 
> 4500));
>  expectedOutput.add(new Watermark(10000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired 
> again
>  testHarness.processWatermark(new Watermark(11000));
>  expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), 
> 7499));
>  expectedOutput.add(new Watermark(11000));
>  TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
> expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
>  testHarness.close();
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to