http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java new file mode 100644 index 0000000..cddcc42 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -0,0 +1,115 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; +import org.apache.flink.streaming.api.windowing.evictors.Evictor; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, OUT, W> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(EvictingWindowOperator.class); + + private final Evictor<? super IN, ? super W> evictor; + + public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, + KeySelector<IN, K> keySelector, + WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory, + KeyedWindowFunction<IN, OUT, K, W> windowFunction, + Trigger<? super IN, ? super W> trigger, + Evictor<? super IN, ? super W> evictor) { + super(windowAssigner, keySelector, windowBufferFactory, windowFunction, trigger); + this.evictor = evictor; + } + + @Override + @SuppressWarnings("unchecked, rawtypes") + protected void emitWindow(K key, W window, boolean purge) throws Exception { + + timestampedCollector.setTimestamp(window.getEnd()); + + Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key); + + if (keyWindows == null) { + LOG.debug("Window {} for key {} already gone.", window, key); + return; + } + + Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger; + if (purge) { + bufferAndTrigger = keyWindows.remove(window); + } else { + bufferAndTrigger = keyWindows.get(window); + } + + if (bufferAndTrigger == null) { + LOG.debug("Window {} for key {} already gone.", window, key); + return; + } + + + EvictingWindowBuffer<IN> windowBuffer = (EvictingWindowBuffer<IN>) bufferAndTrigger.f0; + + int toEvict = 0; + if (windowBuffer.size() > 0) { + // need some type trickery here... + toEvict = evictor.evict((Iterable) windowBuffer.getElements(), windowBuffer.size(), window); + } + + windowBuffer.removeElements(toEvict); + + userFunction.evaluate(key, + window, + bufferAndTrigger.f0.getUnpackedElements(), + timestampedCollector); + + if (keyWindows.isEmpty()) { + windows.remove(key); + } + } + + @Override + public EvictingWindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) { + super.enableSetProcessingTime(setProcessingTime); + return this; + } + + + // ------------------------------------------------------------------------ + // Getters for testing + // ------------------------------------------------------------------------ + + @VisibleForTesting + public Evictor<? super IN, ? super W> getEvictor() { + return evictor; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java index b1ff7e2..880c85c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyToOperator.java @@ -18,15 +18,41 @@ package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.commons.lang.SerializationUtils; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; +import org.apache.flink.streaming.api.windowing.evictors.DeltaEvictor; +import org.apache.flink.streaming.api.windowing.evictors.Evictor; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger; +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger; +import org.apache.flink.streaming.api.windowing.windowpolicy.Count; +import org.apache.flink.streaming.api.windowing.windowpolicy.Delta; import org.apache.flink.streaming.api.windowing.windowpolicy.EventTime; import org.apache.flink.streaming.api.windowing.windowpolicy.ProcessingTime; import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; /** * This class implements the conversion from window policies to concrete operator @@ -55,7 +81,7 @@ public class PolicyToOperator { @SuppressWarnings("unchecked") OneInputStreamOperator<IN, OUT> op = (OneInputStreamOperator<IN, OUT>) - new AggregatingProcessingTimeWindowOperator<KEY, IN>( + new AggregatingProcessingTimeWindowOperator<>( reducer, keySelector, windowLength, windowSlide); return op; } @@ -63,17 +89,147 @@ public class PolicyToOperator { @SuppressWarnings("unchecked") KeyedWindowFunction<IN, OUT, KEY, Window> wf = (KeyedWindowFunction<IN, OUT, KEY, Window>) function; - return new AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>( + return new AccumulatingProcessingTimeWindowOperator<>( wf, keySelector, windowLength, windowSlide); } } // -- case 2: both policies are event time policies if (window instanceof EventTime && (slide == null || slide instanceof EventTime)) { - // add event time implementation + final long windowLength = ((EventTime) window).toMilliseconds(); + final long windowSlide = slide == null ? windowLength : ((EventTime) slide).toMilliseconds(); + + WindowAssigner<? super IN, TimeWindow> assigner; + if (windowSlide == windowLength) { + assigner = TumblingTimeWindows.of(windowLength); + } else { + assigner = SlidingTimeWindows.of(windowLength, windowSlide); + } + WindowBufferFactory<IN, ? extends WindowBuffer<IN>> windowBuffer; + if (function instanceof ReduceFunction) { + @SuppressWarnings("unchecked") + ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function); + function = new ReduceWindowFunction<>(reducer); + windowBuffer = new PreAggregatingHeapWindowBuffer.Factory<>(reducer); + } else { + windowBuffer = new HeapWindowBuffer.Factory<>(); + } + @SuppressWarnings("unchecked") + KeyedWindowFunction<IN, OUT, KEY, TimeWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, TimeWindow>) function; + + return new WindowOperator<>( + assigner, + keySelector, + windowBuffer, + windowFunction, + WatermarkTrigger.create()); } - - throw new UnsupportedOperationException("The windowing mechanism does not yet support " + window.toString(slide)); + + // -- case 3: arbitrary trigger, no eviction + if (slide == null) { + Trigger<? super IN, GlobalWindow> trigger = policyToTrigger(window); + // we need to make them purging triggers because the trigger/eviction policy model + // expects that the window is purged when no slide is used + Trigger<? super IN, GlobalWindow> purgingTrigger = PurgingTrigger.of(trigger); + + WindowBufferFactory<IN, ? extends WindowBuffer<IN>> windowBuffer; + if (function instanceof ReduceFunction) { + @SuppressWarnings("unchecked") + ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function); + function = new ReduceWindowFunction<>(reducer); + windowBuffer = new PreAggregatingHeapWindowBuffer.Factory<>(reducer); + } else { + windowBuffer = new HeapWindowBuffer.Factory<>(); + } + + if (!(function instanceof KeyedWindowFunction)) { + throw new IllegalStateException("Windowing function is not of type EvaluateKeyedWindowFunction."); + } + @SuppressWarnings("unchecked") + KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function; + + return new WindowOperator<>( + GlobalWindows.<IN>create(), + keySelector, + windowBuffer, + windowFunction, + purgingTrigger); + } + + // -- case 4: arbitrary trigger, arbitrary eviction + Trigger<? super IN, GlobalWindow> trigger = policyToTrigger(slide); + Evictor<? super IN, GlobalWindow> evictor = policyToEvictor(window); + + WindowBufferFactory<IN, ? extends EvictingWindowBuffer<IN>> windowBuffer = new HeapWindowBuffer.Factory<>(); + if (function instanceof ReduceFunction) { + @SuppressWarnings("unchecked") + ReduceFunction<IN> reducer = (ReduceFunction<IN>) SerializationUtils.clone(function); + function = new ReduceWindowFunction<>(reducer); + } + + if (!(function instanceof KeyedWindowFunction)) { + throw new IllegalStateException("Windowing function is not of type EvaluateKeyedWindowFunction."); + } + + @SuppressWarnings("unchecked") + KeyedWindowFunction<IN, OUT, KEY, GlobalWindow> windowFunction = (KeyedWindowFunction<IN, OUT, KEY, GlobalWindow>) function; + + EvictingWindowOperator<KEY, IN, OUT, GlobalWindow> op = new EvictingWindowOperator<>( + GlobalWindows.<IN>create(), + keySelector, + windowBuffer, + windowFunction, + trigger, + evictor); + + if (window instanceof ProcessingTime) { + // special case, we need to instruct the window operator to store the processing time in + // the elements so that the evictor can work on that + op.enableSetProcessingTime(true); + } + + return op; + } + + private static <IN> Trigger<? super IN, GlobalWindow> policyToTrigger(WindowPolicy policy) { + if (policy instanceof EventTime) { + EventTime eventTime = (EventTime) policy; + return ContinuousWatermarkTrigger.of(eventTime.getSize()); + } else if (policy instanceof ProcessingTime) { + ProcessingTime processingTime = (ProcessingTime) policy; + return ContinuousProcessingTimeTrigger.of(processingTime.getSize()); + } else if (policy instanceof Count) { + Count count = (Count) policy; + return CountTrigger.of(count.getSize()); + } else if (policy instanceof Delta) { + @SuppressWarnings("unchecked,rawtypes") + Delta<IN> delta = (Delta) policy; + return DeltaTrigger.of(delta.getThreshold(), delta.getDeltaFunction()); + + } + + throw new UnsupportedOperationException("Unsupported policy " + policy); + } + + private static <IN> Evictor<? super IN, GlobalWindow> policyToEvictor(WindowPolicy policy) { + if (policy instanceof EventTime) { + EventTime eventTime = (EventTime) policy; + return TimeEvictor.of(eventTime.getSize()); + } else if (policy instanceof ProcessingTime) { + ProcessingTime processingTime = (ProcessingTime) policy; + return TimeEvictor.of(processingTime.getSize()); + } else if (policy instanceof Count) { + Count count = (Count) policy; + return CountEvictor.of(count.getSize()); + } else if (policy instanceof Delta) { + @SuppressWarnings("unchecked,rawtypes") + Delta<IN> delta = (Delta) policy; + return DeltaEvictor.of(delta.getThreshold(), delta.getDeltaFunction()); + + } + + + throw new UnsupportedOperationException("Unsupported policy " + policy); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java new file mode 100644 index 0000000..cda4481 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -0,0 +1,320 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +public class WindowOperator<K, IN, OUT, W extends Window> + extends AbstractUdfStreamOperator<OUT, KeyedWindowFunction<IN, OUT, K, W>> + implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class); + + + private final WindowAssigner<? super IN, W> windowAssigner; + private final KeySelector<IN, K> keySelector; + + private final Trigger<? super IN, ? super W> triggerTemplate; + private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory; + + protected transient Map<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> windows; + + private transient Map<Long, Set<TriggerContext>> processingTimeTimers; + private transient Map<Long, Set<TriggerContext>> watermarkTimers; + + protected transient TimestampedCollector<OUT> timestampedCollector; + + private boolean setProcessingTime = false; + + private TypeSerializer<IN> inputSerializer; + + public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, + KeySelector<IN, K> keySelector, + WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory, + KeyedWindowFunction<IN, OUT, K, W> windowFunction, + Trigger<? super IN, ? super W> trigger) { + + super(windowFunction); + + this.windowAssigner = windowAssigner; + this.keySelector = keySelector; + + this.windowBufferFactory = windowBufferFactory; + this.triggerTemplate = trigger; + + setChainingStrategy(ChainingStrategy.ALWAYS); +// forceInputCopy(); + } + + @Override + @SuppressWarnings("unchecked") + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + windows = Maps.newHashMap(); + watermarkTimers = Maps.newHashMap(); + processingTimeTimers = Maps.newHashMap(); + timestampedCollector = new TimestampedCollector<>(output); + + if (inputSerializer == null) { + throw new IllegalStateException("Input serializer was not set."); + } + + windowBufferFactory.setRuntimeContext(getRuntimeContext()); + windowBufferFactory.open(parameters); + } + + @Override + public void close() throws Exception { + super.close(); + // emit the elements that we still keep + for (Map.Entry<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> entry: windows.entrySet()) { + K key = entry.getKey(); + Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = entry.getValue(); + for (W window: keyWindows.keySet()) { + emitWindow(key, window, false); + } + } + windows.clear(); + windowBufferFactory.close(); + } + + @Override + @SuppressWarnings("unchecked") + public void processElement(StreamRecord<IN> element) throws Exception { + if (setProcessingTime) { + element.replace(element.getValue(), System.currentTimeMillis()); + } + Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp()); + + K key = keySelector.getKey(element.getValue()); + + Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key); + if (keyWindows == null) { + keyWindows = Maps.newHashMap(); + windows.put(key, keyWindows); + } + + for (W window: elementWindows) { + Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger = keyWindows.get(window); + if (bufferAndTrigger == null) { + bufferAndTrigger = new Tuple2<>(); + bufferAndTrigger.f0 = windowBufferFactory.create(); + bufferAndTrigger.f1 = new TriggerContext(key, window, triggerTemplate.duplicate()); + keyWindows.put(window, bufferAndTrigger); + } + StreamRecord<IN> elementCopy = new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp()); + bufferAndTrigger.f0.storeElement(elementCopy); + Trigger.TriggerResult triggerResult = bufferAndTrigger.f1.trigger.onElement(elementCopy.getValue(), elementCopy.getTimestamp(), window, bufferAndTrigger.f1); + processTriggerResult(triggerResult, key, window); + } + } + + protected void emitWindow(K key, W window, boolean purge) throws Exception { + timestampedCollector.setTimestamp(window.getEnd()); + + Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key); + + if (keyWindows == null) { + LOG.debug("Window {} for key {} already gone.", window, key); + return; + } + + Tuple2<WindowBuffer<IN>, TriggerContext> bufferAndTrigger; + if (purge) { + bufferAndTrigger = keyWindows.remove(window); + } else { + bufferAndTrigger = keyWindows.get(window); + } + + if (bufferAndTrigger == null) { + LOG.debug("Window {} for key {} already gone.", window, key); + return; + } + + + userFunction.evaluate(key, + window, + bufferAndTrigger.f0.getUnpackedElements(), + timestampedCollector); + + if (keyWindows.isEmpty()) { + windows.remove(key); + } + } + + private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception { + switch (triggerResult) { + case FIRE: + emitWindow(key, window, false); + break; + + case FIRE_AND_PURGE: + emitWindow(key, window, true); + break; + + case CONTINUE: + // ingore + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + Set<Long> toRemove = Sets.newHashSet(); + + for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) { + if (triggers.getKey() <= mark.getTimestamp()) { + for (TriggerContext trigger: triggers.getValue()) { + Trigger.TriggerResult triggerResult = trigger.trigger.onTime(mark.getTimestamp(), trigger); + processTriggerResult(triggerResult, trigger.key, trigger.window); + } + toRemove.add(triggers.getKey()); + } + } + + for (Long l: toRemove) { + watermarkTimers.remove(l); + } + output.emitWatermark(mark); + } + + @Override + public void trigger(long time) throws Exception { + Set<Long> toRemove = Sets.newHashSet(); + + for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) { + if (triggers.getKey() < time) { + for (TriggerContext trigger: triggers.getValue()) { + Trigger.TriggerResult triggerResult = trigger.trigger.onTime(time, trigger); + processTriggerResult(triggerResult, trigger.key, trigger.window); + } + toRemove.add(triggers.getKey()); + } + } + + for (Long l: toRemove) { + processingTimeTimers.remove(l); + } + } + + protected class TriggerContext implements Trigger.TriggerContext { + Trigger<? super IN, ? super W> trigger; + K key; + W window; + + public TriggerContext(K key, W window, Trigger<? super IN, ? super W> trigger) { + this.key = key; + this.window = window; + this.trigger = trigger; + } + + @Override + public void registerProcessingTimeTimer(long time) { + Set<TriggerContext> triggers = processingTimeTimers.get(time); + if (triggers == null) { + getRuntimeContext().registerTimer(time, WindowOperator.this); + triggers = Sets.newHashSet(); + processingTimeTimers.put(time, triggers); + } + triggers.add(this); + } + + @Override + public void registerWatermarkTimer(long time) { + Set<TriggerContext> triggers = watermarkTimers.get(time); + if (triggers == null) { + triggers = Sets.newHashSet(); + watermarkTimers.put(time, triggers); + } + triggers.add(this); + } + } + + /** + * When this flag is enabled the current processing time is set as the timestamp of elements + * upon arrival. This must be used, for example, when using the + * {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor} with processing + * time semantics. + */ + public WindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) { + this.setProcessingTime = setProcessingTime; + return this; + } + + // ------------------------------------------------------------------------ + // Getters for testing + // ------------------------------------------------------------------------ + + @VisibleForTesting + public Trigger<? super IN, ? super W> getTriggerTemplate() { + return triggerTemplate; + } + + @VisibleForTesting + public KeySelector<IN, K> getKeySelector() { + return keySelector; + } + + @VisibleForTesting + public WindowAssigner<? super IN, W> getWindowAssigner() { + return windowAssigner; + } + + @VisibleForTesting + public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() { + return windowBufferFactory; + } + + @VisibleForTesting + public boolean isSetProcessingTime() { + return setProcessingTime; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java new file mode 100644 index 0000000..50e392b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.buffers; + +public interface EvictingWindowBuffer<T> extends WindowBuffer<T> { + public boolean removeElements(int count); +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java new file mode 100644 index 0000000..092718a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.buffers; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.ArrayDeque; + +public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> { + private static final long serialVersionUID = 1L; + + private ArrayDeque<StreamRecord<T>> elements; + + protected HeapWindowBuffer() { + this.elements = new ArrayDeque<>(); + } + + @Override + public void storeElement(StreamRecord<T> element) { + elements.add(element); + } + + @Override + public boolean removeElements(int count) { + // TODO determine if this can be done in a better way + for (int i = 0; i < count; i++) { + elements.removeFirst(); + } + return false; + } + + @Override + public Iterable<StreamRecord<T>> getElements() { + return elements; + } + + @Override + public Iterable<T> getUnpackedElements() { + return FluentIterable.from(elements).transform(new Function<StreamRecord<T>, T>() { + @Override + public T apply(StreamRecord<T> record) { + return record.getValue(); + } + }); + } + + @Override + public int size() { + return elements.size(); + } + + public static class Factory<T> implements WindowBufferFactory<T, HeapWindowBuffer<T>> { + private static final long serialVersionUID = 1L; + + @Override + public void setRuntimeContext(RuntimeContext ctx) {} + + @Override + public void open(Configuration config) {} + + @Override + public void close() {} + + @Override + public HeapWindowBuffer<T> create() { + return new HeapWindowBuffer<>(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java new file mode 100644 index 0000000..85f90b0 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.buffers; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collections; + +public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> { + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + private transient StreamRecord<T> data; + + protected PreAggregatingHeapWindowBuffer(ReduceFunction<T> reduceFunction) { + this.reduceFunction = reduceFunction; + } + + @Override + public void storeElement(StreamRecord<T> element) throws Exception { + if (data == null) { + data = new StreamRecord<>(element.getValue(), element.getTimestamp()); + } else { + data.replace(reduceFunction.reduce(data.getValue(), element.getValue())); + } + } + + @Override + public Iterable<StreamRecord<T>> getElements() { + return Collections.singleton(data); + } + + @Override + public Iterable<T> getUnpackedElements() { + return Collections.singleton(data.getValue()); + } + + @Override + public int size() { + return 1; + } + + public static class Factory<T> implements WindowBufferFactory<T, PreAggregatingHeapWindowBuffer<T>> { + private static final long serialVersionUID = 1L; + + private final ReduceFunction<T> reduceFunction; + + public Factory(ReduceFunction<T> reduceFunction) { + this.reduceFunction = reduceFunction; + } + + @Override + public void setRuntimeContext(RuntimeContext ctx) { + FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx); + } + + @Override + public void open(Configuration config) throws Exception { + FunctionUtils.openFunction(reduceFunction, config); + } + + @Override + public void close() throws Exception { + FunctionUtils.closeFunction(reduceFunction); + } + + @Override + public PreAggregatingHeapWindowBuffer<T> create() { + return new PreAggregatingHeapWindowBuffer<T>(reduceFunction); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java new file mode 100644 index 0000000..8c891d5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.buffers; + + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; + +public interface WindowBuffer<T> extends Serializable { + + public void storeElement(StreamRecord<T> element) throws Exception; + + public Iterable<StreamRecord<T>> getElements(); + + public Iterable<T> getUnpackedElements(); + + public int size(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java new file mode 100644 index 0000000..4a7f6df --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing.buffers; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; + +import java.io.Serializable; + +public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends Serializable { + void setRuntimeContext(RuntimeContext ctx); + void open(Configuration config) throws Exception; + void close() throws Exception; + B create(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java new file mode 100644 index 0000000..3d9605e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; +import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Comparator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +public class EvictingWindowOperatorTest { + + // For counting if close() is called the correct number of times on the SumReducer + + @Test + @SuppressWarnings("unchecked") + public void testCountTrigger() throws Exception { + AtomicInteger closeCalled = new AtomicInteger(0); + + final int WINDOW_SIZE = 4; + final int WINDOW_SLIDE = 2; + + EvictingWindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new EvictingWindowOperator<>( + GlobalWindows.create(), + new TupleKeySelector(), + new HeapWindowBuffer.Factory<Tuple2<String, Integer>>(), + new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer(closeCalled)), + CountTrigger.of(WINDOW_SLIDE), + CountEvictor.of(WINDOW_SIZE)); + + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + long initialTime = 0L; + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.open(); + + // The global window actually ignores these timestamps... + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999)); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + + + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000)); + + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + + Assert.assertEquals("Close was not called.", 1, closeCalled.get()); + + + } + + // ------------------------------------------------------------------------ + // UDFs + // ------------------------------------------------------------------------ + + public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + private boolean openCalled = false; + + private AtomicInteger closeCalled; + + public SumReducer(AtomicInteger closeCalled) { + this.closeCalled = closeCalled; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + openCalled = true; + } + + @Override + public void close() throws Exception { + super.close(); + closeCalled.incrementAndGet(); + } + + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + if (!openCalled) { + Assert.fail("Open was not called"); + } + return new Tuple2<>(value2.f0, value1.f1 + value2.f1); + } + } + + @SuppressWarnings("unchecked") + private static class ResultSortComparator implements Comparator<Object> { + @Override + public int compare(Object o1, Object o2) { + if (o1 instanceof Watermark || o2 instanceof Watermark) { + return 0; + } else { + StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1; + StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2; + if (sr0.getTimestamp() != sr1.getTimestamp()) { + return (int) (sr0.getTimestamp() - sr1.getTimestamp()); + } + int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0); + if (comparison != 0) { + return comparison; + } else { + return sr0.getValue().f1 - sr1.getValue().f1; + } + } + } + } + + private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple2<String, Integer> value) throws Exception { + return value.f0; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java new file mode 100644 index 0000000..6f42514 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PolicyWindowTranslationTest.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger; +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.windowpolicy.Count; +import org.apache.flink.streaming.api.windowing.windowpolicy.Delta; +import org.apache.flink.streaming.api.windowing.windowpolicy.Time; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +/** + * These tests verify that the api calls on + * {@link org.apache.flink.streaming.api.datastream.KeyedWindowDataStream} instantiate + * the correct window operator. + */ +public class PolicyWindowTranslationTest extends StreamingMultipleProgramsTestBase { + + /** + * These tests ensure that the fast aligned time windows operator is used if the + * conditions are right. + */ + @Test + public void testFastTimeWindows() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS)) + .reduceWindow(reducer); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator); + + DataStream<Tuple2<String, Integer>> window2 = source + .keyBy(0) + .window(Time.of(1000, TimeUnit.MILLISECONDS)) + .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, Window>() { + private static final long serialVersionUID = 1L; + + @Override + public void evaluate(Tuple tuple, + Window window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); + Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator); + } + + @Test + @SuppressWarnings("rawtypes") + public void testNonEvicting() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(Count.of(200)) + .reduceWindow(reducer); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof WindowOperator); + WindowOperator winOperator1 = (WindowOperator) operator1; + Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof PurgingTrigger); + Assert.assertTrue(((PurgingTrigger)winOperator1.getTriggerTemplate()).getNestedTrigger() instanceof CountTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof GlobalWindows); + Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); + + DataStream<Tuple2<String, Integer>> window2 = source + .keyBy(0) + .window(Delta.of(15.0, new DeltaFunction<Object>() { + @Override + public double getDelta(Object oldDataPoint, Object newDataPoint) { + return 0; + } + })) + .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, Window>() { + private static final long serialVersionUID = 1L; + + @Override + public void evaluate(Tuple tuple, + Window window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); + Assert.assertTrue(operator2 instanceof WindowOperator); + WindowOperator winOperator2 = (WindowOperator) operator2; + Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof PurgingTrigger); + Assert.assertTrue(((PurgingTrigger)winOperator2.getTriggerTemplate()).getNestedTrigger() instanceof DeltaTrigger); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof GlobalWindows); + Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + } + + @Test + @SuppressWarnings("rawtypes") + public void testEvicting() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(Time.of(1000, TimeUnit.MICROSECONDS), Count.of(100)) + .reduceWindow(reducer); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof EvictingWindowOperator); + EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1; + // ensure that the operator sets the current processing time as timestamp + Assert.assertTrue(winOperator1.isSetProcessingTime()); + Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof GlobalWindows); + Assert.assertTrue(winOperator1.getEvictor() instanceof TimeEvictor); + Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + + DataStream<Tuple2<String, Integer>> window2 = source + .keyBy(0) + .window(Count.of(1000), Delta.of(1.0, new DeltaFunction<Object>() { + @Override + public double getDelta(Object oldDataPoint, Object newDataPoint) { + return 0; + } + })) + .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, Window>() { + private static final long serialVersionUID = 1L; + + @Override + public void evaluate(Tuple tuple, + Window window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); + Assert.assertTrue(operator2 instanceof EvictingWindowOperator); + EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2; + Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof DeltaTrigger); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof GlobalWindows); + Assert.assertTrue(winOperator2.getEvictor() instanceof CountEvictor); + Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + } + + // ------------------------------------------------------------------------ + // UDFs + // ------------------------------------------------------------------------ + + public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { + return value1; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd51c977/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java new file mode 100644 index 0000000..5078c8c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerWindowTranslationTest.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; +import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +/** + * These tests verify that the api calls on + * {@link org.apache.flink.streaming.api.datastream.KeyedTriggerWindowDataStream} instantiate + * the correct window operator. + */ +public class TriggerWindowTranslationTest extends StreamingMultipleProgramsTestBase { + + /** + * These tests ensure that the fast aligned time windows operator is used if the + * conditions are right. + */ + @Test + public void testFastTimeWindows() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(SlidingProcessingTimeWindows.of(1000, 100)) + .reduceWindow(reducer); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator); + + DataStream<Tuple2<String, Integer>> window2 = source + .keyBy(0) + .window(SlidingProcessingTimeWindows.of(1000, 100)) + .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void evaluate(Tuple tuple, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); + Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator); + } + + @Test + @SuppressWarnings("rawtypes") + public void testNonEvicting() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(SlidingProcessingTimeWindows.of(1000, 100)) + .trigger(CountTrigger.of(100)) + .reduceWindow(reducer); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof WindowOperator); + WindowOperator winOperator1 = (WindowOperator) operator1; + Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows); + Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); + + DataStream<Tuple2<String, Integer>> window2 = source + .keyBy(0) + .window(TumblingProcessingTimeWindows.of(1000)) + .trigger(CountTrigger.of(100)) + .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void evaluate(Tuple tuple, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); + Assert.assertTrue(operator2 instanceof WindowOperator); + WindowOperator winOperator2 = (WindowOperator) operator2; + Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + } + + @Test + @SuppressWarnings("rawtypes") + public void testEvicting() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + + DummyReducer reducer = new DummyReducer(); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(SlidingProcessingTimeWindows.of(1000, 100)) + .evictor(CountEvictor.of(100)) + .reduceWindow(reducer); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator(); + Assert.assertTrue(operator1 instanceof EvictingWindowOperator); + EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1; + Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger); + Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows); + Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); + Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + + DataStream<Tuple2<String, Integer>> window2 = source + .keyBy(0) + .window(TumblingProcessingTimeWindows.of(1000)) + .trigger(CountTrigger.of(100)) + .evictor(TimeEvictor.of(100)) + .mapWindow(new KeyedWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + private static final long serialVersionUID = 1L; + + @Override + public void evaluate(Tuple tuple, + TimeWindow window, + Iterable<Tuple2<String, Integer>> values, + Collector<Tuple2<String, Integer>> out) throws Exception { + + } + }); + + OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator(); + Assert.assertTrue(operator2 instanceof EvictingWindowOperator); + EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2; + Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger); + Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows); + Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor); + Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + } + + // ------------------------------------------------------------------------ + // UDFs + // ------------------------------------------------------------------------ + + public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { + return value1; + } + } +}
