http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java deleted file mode 100644 index fb8eb7c..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java +++ /dev/null @@ -1,1420 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; - -import com.google.auto.value.AutoValue; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; -import com.google.common.collect.SortedMultiset; -import com.google.common.collect.TreeMultiset; - -import org.joda.time.Instant; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Objects; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicReference; - -import javax.annotation.Nullable; - -/** - * Manages watermarks of {@link PCollection PCollections} and input and output watermarks of - * {@link AppliedPTransform AppliedPTransforms} to provide event-time and completion tracking for - * in-memory execution. {@link InMemoryWatermarkManager} is designed to update and return a - * consistent view of watermarks in the presence of concurrent updates. - * - * <p>An {@link InMemoryWatermarkManager} is provided with the collection of root - * {@link AppliedPTransform AppliedPTransforms} and a map of {@link PCollection PCollections} to - * all the {@link AppliedPTransform AppliedPTransforms} that consume them at construction time. - * - * <p>Whenever a root {@link AppliedPTransform transform} produces elements, the - * {@link InMemoryWatermarkManager} is provided with the produced elements and the output watermark - * of the producing {@link AppliedPTransform transform}. The - * {@link InMemoryWatermarkManager watermark manager} is responsible for computing the watermarks - * of all {@link AppliedPTransform transforms} that consume one or more - * {@link PCollection PCollections}. - * - * <p>Whenever a non-root {@link AppliedPTransform} finishes processing one or more in-flight - * elements (referred to as the input {@link CommittedBundle bundle}), the following occurs - * atomically: - * <ul> - * <li>All of the in-flight elements are removed from the collection of pending elements for the - * {@link AppliedPTransform}.</li> - * <li>All of the elements produced by the {@link AppliedPTransform} are added to the collection - * of pending elements for each {@link AppliedPTransform} that consumes them.</li> - * <li>The input watermark for the {@link AppliedPTransform} becomes the maximum value of - * <ul> - * <li>the previous input watermark</li> - * <li>the minimum of - * <ul> - * <li>the timestamps of all currently pending elements</li> - * <li>all input {@link PCollection} watermarks</li> - * </ul> - * </li> - * </ul> - * </li> - * <li>The output watermark for the {@link AppliedPTransform} becomes the maximum of - * <ul> - * <li>the previous output watermark</li> - * <li>the minimum of - * <ul> - * <li>the current input watermark</li> - * <li>the current watermark holds</li> - * </ul> - * </li> - * </ul> - * </li> - * <li>The watermark of the output {@link PCollection} can be advanced to the output watermark of - * the {@link AppliedPTransform}</li> - * <li>The watermark of all downstream {@link AppliedPTransform AppliedPTransforms} can be - * advanced.</li> - * </ul> - * - * <p>The watermark of a {@link PCollection} is equal to the output watermark of the - * {@link AppliedPTransform} that produces it. - * - * <p>The watermarks for a {@link PTransform} are updated as follows when output is committed:<pre> - * Watermark_In' = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection))) - * Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(StateHold))) - * Watermark_PCollection = Watermark_Out_ProducingPTransform - * </pre> - */ -public class InMemoryWatermarkManager { - /** - * The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a - * {@link PCollection}. - * - * <p>A watermark is a monotonically increasing value, which represents the point up to which the - * system believes it has received all of the data. Data that arrives with a timestamp that is - * before the watermark is considered late. {@link BoundedWindow#TIMESTAMP_MAX_VALUE} is a special - * timestamp which indicates we have received all of the data and there will be no more on-time or - * late data. This value is represented by {@link InMemoryWatermarkManager#THE_END_OF_TIME}. - */ - private static interface Watermark { - /** - * Returns the current value of this watermark. - */ - Instant get(); - - /** - * Refreshes the value of this watermark from its input watermarks and watermark holds. - * - * @return true if the value of the watermark has changed (and thus dependent watermark must - * also be updated - */ - WatermarkUpdate refresh(); - } - - /** - * The result of computing a {@link Watermark}. - */ - private static enum WatermarkUpdate { - /** The watermark is later than the value at the previous time it was computed. */ - ADVANCED(true), - /** The watermark is equal to the value at the previous time it was computed. */ - NO_CHANGE(false); - - private final boolean advanced; - - private WatermarkUpdate(boolean advanced) { - this.advanced = advanced; - } - - public boolean isAdvanced() { - return advanced; - } - - /** - * Returns the {@link WatermarkUpdate} that is a result of combining the two watermark updates. - * - * If either of the input {@link WatermarkUpdate WatermarkUpdates} were advanced, the result - * {@link WatermarkUpdate} has been advanced. - */ - public WatermarkUpdate union(WatermarkUpdate that) { - if (this.advanced) { - return this; - } - return that; - } - - /** - * Returns the {@link WatermarkUpdate} based on the former and current - * {@link Instant timestamps}. - */ - public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) { - if (currentTime.isAfter(oldTime)) { - return ADVANCED; - } - return NO_CHANGE; - } - } - - /** - * The input {@link Watermark} of an {@link AppliedPTransform}. - * - * <p>At any point, the value of an {@link AppliedPTransformInputWatermark} is equal to the - * minimum watermark across all of its input {@link Watermark Watermarks}, and the minimum - * timestamp of all of the pending elements, restricted to be monotonically increasing. - * - * <p>See {@link #refresh()} for more information. - */ - private static class AppliedPTransformInputWatermark implements Watermark { - private final Collection<? extends Watermark> inputWatermarks; - private final SortedMultiset<WindowedValue<?>> pendingElements; - private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers; - - private AtomicReference<Instant> currentWatermark; - - public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) { - this.inputWatermarks = inputWatermarks; - this.pendingElements = TreeMultiset.create(new WindowedValueByTimestampComparator()); - this.objectTimers = new HashMap<>(); - currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); - } - - @Override - public Instant get() { - return currentWatermark.get(); - } - - /** - * {@inheritDoc}. - * - * <p>When refresh is called, the value of the {@link AppliedPTransformInputWatermark} becomes - * equal to the maximum value of - * <ul> - * <li>the previous input watermark</li> - * <li>the minimum of - * <ul> - * <li>the timestamps of all currently pending elements</li> - * <li>all input {@link PCollection} watermarks</li> - * </ul> - * </li> - * </ul> - */ - @Override - public synchronized WatermarkUpdate refresh() { - Instant oldWatermark = currentWatermark.get(); - Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (Watermark inputWatermark : inputWatermarks) { - minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get()); - } - if (!pendingElements.isEmpty()) { - minInputWatermark = INSTANT_ORDERING.min( - minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp()); - } - Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark); - currentWatermark.set(newWatermark); - return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark); - } - - private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) { - for (WindowedValue<?> pendingElement : newPending) { - pendingElements.add(pendingElement); - } - } - - private synchronized void removePendingElements( - Iterable<? extends WindowedValue<?>> finishedElements) { - for (WindowedValue<?> finishedElement : finishedElements) { - pendingElements.remove(finishedElement); - } - } - - private synchronized void updateTimers(TimerUpdate update) { - NavigableSet<TimerData> keyTimers = objectTimers.get(update.key); - if (keyTimers == null) { - keyTimers = new TreeSet<>(); - objectTimers.put(update.key, keyTimers); - } - for (TimerData timer : update.setTimers) { - if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - keyTimers.add(timer); - } - } - for (TimerData timer : update.deletedTimers) { - if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - keyTimers.remove(timer); - } - } - // We don't keep references to timers that have been fired and delivered via #getFiredTimers() - } - - private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() { - return extractFiredTimers(currentWatermark.get(), objectTimers); - } - - @Override - public synchronized String toString() { - return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class) - .add("pendingElements", pendingElements) - .add("currentWatermark", currentWatermark) - .toString(); - } - } - - /** - * The output {@link Watermark} of an {@link AppliedPTransform}. - * - * <p>The value of an {@link AppliedPTransformOutputWatermark} is equal to the minimum of the - * current watermark hold and the {@link AppliedPTransformInputWatermark} for the same - * {@link AppliedPTransform}, restricted to be monotonically increasing. See - * {@link #refresh()} for more information. - */ - private static class AppliedPTransformOutputWatermark implements Watermark { - private final Watermark inputWatermark; - private final PerKeyHolds holds; - private AtomicReference<Instant> currentWatermark; - - public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) { - this.inputWatermark = inputWatermark; - holds = new PerKeyHolds(); - currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); - } - - public synchronized void updateHold(Object key, Instant newHold) { - if (newHold == null) { - holds.removeHold(key); - } else { - holds.updateHold(key, newHold); - } - } - - @Override - public Instant get() { - return currentWatermark.get(); - } - - /** - * {@inheritDoc}. - * - * <p>When refresh is called, the value of the {@link AppliedPTransformOutputWatermark} becomes - * equal to the maximum value of: - * <ul> - * <li>the previous output watermark</li> - * <li>the minimum of - * <ul> - * <li>the current input watermark</li> - * <li>the current watermark holds</li> - * </ul> - * </li> - * </ul> - */ - @Override - public synchronized WatermarkUpdate refresh() { - Instant oldWatermark = currentWatermark.get(); - Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold()); - newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark); - currentWatermark.set(newWatermark); - return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark); - } - - @Override - public synchronized String toString() { - return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class) - .add("holds", holds) - .add("currentWatermark", currentWatermark) - .toString(); - } - } - - /** - * The input {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an - * {@link AppliedPTransform}. - * - * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeInputWatermark} is equal - * to the minimum across all pending bundles at the {@link AppliedPTransform} and all upstream - * {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the input - * synchronized processing time at any step is equal to the maximum of: - * <ul> - * <li>The most recently returned synchronized processing input time - * <li>The minimum of - * <ul> - * <li>The current processing time - * <li>The current synchronized processing time input hold - * </ul> - * </ul> - */ - private static class SynchronizedProcessingTimeInputWatermark implements Watermark { - private final Collection<? extends Watermark> inputWms; - private final Collection<CommittedBundle<?>> pendingBundles; - private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers; - private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers; - - private final PriorityQueue<TimerData> pendingTimers; - - private AtomicReference<Instant> earliestHold; - - public SynchronizedProcessingTimeInputWatermark(Collection<? extends Watermark> inputWms) { - this.inputWms = inputWms; - this.pendingBundles = new HashSet<>(); - this.processingTimers = new HashMap<>(); - this.synchronizedProcessingTimers = new HashMap<>(); - this.pendingTimers = new PriorityQueue<>(); - Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (Watermark wm : inputWms) { - initialHold = INSTANT_ORDERING.min(initialHold, wm.get()); - } - earliestHold = new AtomicReference<>(initialHold); - } - - @Override - public Instant get() { - return earliestHold.get(); - } - - /** - * {@inheritDoc}. - * - * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeInputWatermark} - * becomes equal to the minimum value of - * <ul> - * <li>the timestamps of all currently pending bundles</li> - * <li>all input {@link PCollection} synchronized processing time watermarks</li> - * </ul> - * - * <p>Note that this value is not monotonic, but the returned value for the synchronized - * processing time must be. - */ - @Override - public synchronized WatermarkUpdate refresh() { - Instant oldHold = earliestHold.get(); - Instant minTime = THE_END_OF_TIME.get(); - for (Watermark input : inputWms) { - minTime = INSTANT_ORDERING.min(minTime, input.get()); - } - for (CommittedBundle<?> bundle : pendingBundles) { - // TODO: Track elements in the bundle by the processing time they were output instead of - // entire bundles. Requried to support arbitrarily splitting and merging bundles between - // steps - minTime = INSTANT_ORDERING.min(minTime, bundle.getSynchronizedProcessingOutputWatermark()); - } - earliestHold.set(minTime); - return WatermarkUpdate.fromTimestamps(oldHold, minTime); - } - - public synchronized void addPending(CommittedBundle<?> bundle) { - pendingBundles.add(bundle); - } - - public synchronized void removePending(CommittedBundle<?> bundle) { - pendingBundles.remove(bundle); - } - - /** - * Return the earliest timestamp of the earliest timer that has not been completed. This is - * either the earliest timestamp across timers that have not been completed, or the earliest - * timestamp across timers that have been delivered but have not been completed. - */ - public synchronized Instant getEarliestTimerTimestamp() { - Instant earliest = THE_END_OF_TIME.get(); - for (NavigableSet<TimerData> timers : processingTimers.values()) { - if (!timers.isEmpty()) { - earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest); - } - } - for (NavigableSet<TimerData> timers : synchronizedProcessingTimers.values()) { - if (!timers.isEmpty()) { - earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest); - } - } - if (!pendingTimers.isEmpty()) { - earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), earliest); - } - return earliest; - } - - private synchronized void updateTimers(TimerUpdate update) { - Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key); - for (TimerData addedTimer : update.setTimers) { - NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain()); - if (timerQueue != null) { - timerQueue.add(addedTimer); - } - } - - for (TimerData completedTimer : update.completedTimers) { - pendingTimers.remove(completedTimer); - } - for (TimerData deletedTimer : update.deletedTimers) { - NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain()); - if (timerQueue != null) { - timerQueue.remove(deletedTimer); - } - } - } - - private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredDomainTimers( - TimeDomain domain, Instant firingTime) { - Map<StructuralKey<?>, List<TimerData>> firedTimers; - switch (domain) { - case PROCESSING_TIME: - firedTimers = extractFiredTimers(firingTime, processingTimers); - break; - case SYNCHRONIZED_PROCESSING_TIME: - firedTimers = - extractFiredTimers( - INSTANT_ORDERING.min(firingTime, earliestHold.get()), - synchronizedProcessingTimers); - break; - default: - throw new IllegalArgumentException( - "Called getFiredTimers on a Synchronized Processing Time watermark" - + " and gave a non-processing time domain " - + domain); - } - for (Map.Entry<StructuralKey<?>, ? extends Collection<TimerData>> firedTimer : - firedTimers.entrySet()) { - pendingTimers.addAll(firedTimer.getValue()); - } - return firedTimers; - } - - private Map<TimeDomain, NavigableSet<TimerData>> timerMap(StructuralKey<?> key) { - NavigableSet<TimerData> processingQueue = processingTimers.get(key); - if (processingQueue == null) { - processingQueue = new TreeSet<>(); - processingTimers.put(key, processingQueue); - } - NavigableSet<TimerData> synchronizedProcessingQueue = - synchronizedProcessingTimers.get(key); - if (synchronizedProcessingQueue == null) { - synchronizedProcessingQueue = new TreeSet<>(); - synchronizedProcessingTimers.put(key, synchronizedProcessingQueue); - } - EnumMap<TimeDomain, NavigableSet<TimerData>> result = new EnumMap<>(TimeDomain.class); - result.put(TimeDomain.PROCESSING_TIME, processingQueue); - result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue); - return result; - } - - @Override - public synchronized String toString() { - return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class) - .add("earliestHold", earliestHold) - .toString(); - } - } - - /** - * The output {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an - * {@link AppliedPTransform}. - * - * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeOutputWatermark} is - * equal to the minimum across all incomplete timers at the {@link AppliedPTransform} and all - * upstream {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the output - * synchronized processing time at any step is equal to the maximum of: - * <ul> - * <li>The most recently returned synchronized processing output time - * <li>The minimum of - * <ul> - * <li>The current processing time - * <li>The current synchronized processing time output hold - * </ul> - * </ul> - */ - private static class SynchronizedProcessingTimeOutputWatermark implements Watermark { - private final SynchronizedProcessingTimeInputWatermark inputWm; - private AtomicReference<Instant> latestRefresh; - - public SynchronizedProcessingTimeOutputWatermark( - SynchronizedProcessingTimeInputWatermark inputWm) { - this.inputWm = inputWm; - this.latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); - } - - @Override - public Instant get() { - return latestRefresh.get(); - } - - /** - * {@inheritDoc}. - * - * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeOutputWatermark} - * becomes equal to the minimum value of: - * <ul> - * <li>the current input watermark. - * <li>all {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} timers that are based on the input - * watermark. - * <li>all {@link TimeDomain#PROCESSING_TIME} timers that are based on the input watermark. - * </ul> - * - * <p>Note that this value is not monotonic, but the returned value for the synchronized - * processing time must be. - */ - @Override - public synchronized WatermarkUpdate refresh() { - // Hold the output synchronized processing time to the input watermark, which takes into - // account buffered bundles, and the earliest pending timer, which determines what to hold - // downstream timers to. - Instant oldRefresh = latestRefresh.get(); - Instant newTimestamp = - INSTANT_ORDERING.min(inputWm.get(), inputWm.getEarliestTimerTimestamp()); - latestRefresh.set(newTimestamp); - return WatermarkUpdate.fromTimestamps(oldRefresh, newTimestamp); - } - - @Override - public synchronized String toString() { - return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class) - .add("latestRefresh", latestRefresh) - .toString(); - } - } - - /** - * The {@code Watermark} that is after the latest time it is possible to represent in the global - * window. This is a distinguished value representing a complete {@link PTransform}. - */ - private static final Watermark THE_END_OF_TIME = new Watermark() { - @Override - public WatermarkUpdate refresh() { - // THE_END_OF_TIME is a distinguished value that cannot be advanced. - return WatermarkUpdate.NO_CHANGE; - } - - @Override - public Instant get() { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - }; - - private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural(); - - /** - * A function that takes a WindowedValue and returns the exploded representation of that - * {@link WindowedValue}. - */ - private static final Function<WindowedValue<?>, ? extends Iterable<? extends WindowedValue<?>>> - EXPLODE_WINDOWS_FN = - new Function<WindowedValue<?>, Iterable<? extends WindowedValue<?>>>() { - @Override - public Iterable<? extends WindowedValue<?>> apply(WindowedValue<?> input) { - return input.explodeWindows(); - } - }; - - /** - * For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the - * latestTime argument and put in in the result with the same key, then remove all of the keys - * which have no more pending timers. - * - * The result collection retains ordering of timers (from earliest to latest). - */ - private static Map<StructuralKey<?>, List<TimerData>> extractFiredTimers( - Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers) { - Map<StructuralKey<?>, List<TimerData>> result = new HashMap<>(); - Set<StructuralKey<?>> emptyKeys = new HashSet<>(); - for (Map.Entry<StructuralKey<?>, NavigableSet<TimerData>> pendingTimers : - objectTimers.entrySet()) { - NavigableSet<TimerData> timers = pendingTimers.getValue(); - if (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) { - ArrayList<TimerData> keyFiredTimers = new ArrayList<>(); - result.put(pendingTimers.getKey(), keyFiredTimers); - while (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) { - keyFiredTimers.add(timers.first()); - timers.remove(timers.first()); - } - } - if (timers.isEmpty()) { - emptyKeys.add(pendingTimers.getKey()); - } - } - objectTimers.keySet().removeAll(emptyKeys); - return result; - } - - //////////////////////////////////////////////////////////////////////////////////////////////// - - /** - * The {@link Clock} providing the current time in the {@link TimeDomain#PROCESSING_TIME} domain. - */ - private final Clock clock; - - /** - * A map from each {@link PCollection} to all {@link AppliedPTransform PTransform applications} - * that consume that {@link PCollection}. - */ - private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers; - - /** - * The input and output watermark of each {@link AppliedPTransform}. - */ - private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> transformToWatermarks; - - /** - * A queue of pending updates to the state of this {@link InMemoryWatermarkManager}. - */ - private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates; - - /** - * A queue of pending {@link AppliedPTransform AppliedPTransforms} that have potentially - * stale data. - */ - private final ConcurrentLinkedQueue<AppliedPTransform<?, ?, ?>> pendingRefreshes; - - /** - * Creates a new {@link InMemoryWatermarkManager}. All watermarks within the newly created - * {@link InMemoryWatermarkManager} start at {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, the - * minimum watermark, with no watermark holds or pending elements. - * - * @param rootTransforms the root-level transforms of the {@link Pipeline} - * @param consumers a mapping between each {@link PCollection} in the {@link Pipeline} to the - * transforms that consume it as a part of their input - */ - public static InMemoryWatermarkManager create( - Clock clock, - Collection<AppliedPTransform<?, ?, ?>> rootTransforms, - Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) { - return new InMemoryWatermarkManager(clock, rootTransforms, consumers); - } - - private InMemoryWatermarkManager( - Clock clock, - Collection<AppliedPTransform<?, ?, ?>> rootTransforms, - Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) { - this.clock = clock; - this.consumers = consumers; - this.pendingUpdates = new ConcurrentLinkedQueue<>(); - this.pendingRefreshes = new ConcurrentLinkedQueue<>(); - - transformToWatermarks = new HashMap<>(); - - for (AppliedPTransform<?, ?, ?> rootTransform : rootTransforms) { - getTransformWatermark(rootTransform); - } - for (Collection<AppliedPTransform<?, ?, ?>> intermediateTransforms : consumers.values()) { - for (AppliedPTransform<?, ?, ?> transform : intermediateTransforms) { - getTransformWatermark(transform); - } - } - } - - private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform) { - TransformWatermarks wms = transformToWatermarks.get(transform); - if (wms == null) { - List<Watermark> inputCollectionWatermarks = getInputWatermarks(transform); - AppliedPTransformInputWatermark inputWatermark = - new AppliedPTransformInputWatermark(inputCollectionWatermarks); - AppliedPTransformOutputWatermark outputWatermark = - new AppliedPTransformOutputWatermark(inputWatermark); - - SynchronizedProcessingTimeInputWatermark inputProcessingWatermark = - new SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(transform)); - SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark = - new SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark); - - wms = - new TransformWatermarks( - inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark); - transformToWatermarks.put(transform, wms); - } - return wms; - } - - private Collection<Watermark> getInputProcessingWatermarks( - AppliedPTransform<?, ?, ?> transform) { - ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder(); - Collection<? extends PValue> inputs = transform.getInput().expand(); - if (inputs.isEmpty()) { - inputWmsBuilder.add(THE_END_OF_TIME); - } - for (PValue pvalue : inputs) { - Watermark producerOutputWatermark = - getTransformWatermark(pvalue.getProducingTransformInternal()) - .synchronizedProcessingOutputWatermark; - inputWmsBuilder.add(producerOutputWatermark); - } - return inputWmsBuilder.build(); - } - - private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) { - ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder(); - Collection<? extends PValue> inputs = transform.getInput().expand(); - if (inputs.isEmpty()) { - inputWatermarksBuilder.add(THE_END_OF_TIME); - } - for (PValue pvalue : inputs) { - Watermark producerOutputWatermark = - getTransformWatermark(pvalue.getProducingTransformInternal()).outputWatermark; - inputWatermarksBuilder.add(producerOutputWatermark); - } - List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build(); - return inputCollectionWatermarks; - } - - //////////////////////////////////////////////////////////////////////////////////////////////// - - /** - * Gets the input and output watermarks for an {@link AppliedPTransform}. If the - * {@link AppliedPTransform PTransform} has not processed any elements, return a watermark of - * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. - * - * @return a snapshot of the input watermark and output watermark for the provided transform - */ - public TransformWatermarks getWatermarks(AppliedPTransform<?, ?, ?> transform) { - return transformToWatermarks.get(transform); - } - - /** - * Updates the watermarks of a transform with one or more inputs. - * - * <p>Each transform has two monotonically increasing watermarks: the input watermark, which can, - * at any time, be updated to equal: - * <pre> - * MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks)) - * </pre> - * and the output watermark, which can, at any time, be updated to equal: - * <pre> - * MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds)) - * </pre>. - * - * @param completed the input that has completed - * @param timerUpdate the timers that were added, removed, and completed as part of producing - * this update - * @param result the result that was produced by processing the input - * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there - * is no hold - */ - public void updateWatermarks( - @Nullable CommittedBundle<?> completed, - TimerUpdate timerUpdate, - CommittedResult result, - Instant earliestHold) { - pendingUpdates.offer(PendingWatermarkUpdate.create(completed, - timerUpdate, - result, - earliestHold)); - } - - /** - * Applies all pending updates to this {@link InMemoryWatermarkManager}, causing the pending state - * of all {@link TransformWatermarks} to be advanced as far as possible. - */ - private void applyPendingUpdates() { - Set<AppliedPTransform<?, ?, ?>> updatedTransforms = new HashSet<>(); - PendingWatermarkUpdate pending = pendingUpdates.poll(); - while (pending != null) { - applyPendingUpdate(pending); - updatedTransforms.add(pending.getTransform()); - pending = pendingUpdates.poll(); - } - pendingRefreshes.addAll(updatedTransforms); - } - - private void applyPendingUpdate(PendingWatermarkUpdate pending) { - CommittedResult result = pending.getResult(); - AppliedPTransform transform = result.getTransform(); - CommittedBundle<?> inputBundle = pending.getInputBundle(); - - updatePending(inputBundle, pending.getTimerUpdate(), result); - - TransformWatermarks transformWms = transformToWatermarks.get(transform); - transformWms.setEventTimeHold(inputBundle == null ? null : inputBundle.getKey(), - pending.getEarliestHold()); - } - - /** - * First adds all produced elements to the queue of pending elements for each consumer, then adds - * all pending timers to the collection of pending timers, then removes all completed and deleted - * timers from the collection of pending timers, then removes all completed elements from the - * pending queue of the transform. - * - * <p>It is required that all newly pending elements are added to the queue of pending elements - * for each consumer prior to the completed elements being removed, as doing otherwise could cause - * a Watermark to appear in a state in which the upstream (completed) element does not hold the - * watermark but the element it produced is not yet pending. This can cause the watermark to - * erroneously advance. - */ - private void updatePending( - CommittedBundle<?> input, - TimerUpdate timerUpdate, - CommittedResult result) { - // Newly pending elements must be added before completed elements are removed, as the two - // do not share a Mutex within this call and thus can be interleaved with external calls to - // refresh. - for (CommittedBundle<?> bundle : result.getOutputs()) { - for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) { - TransformWatermarks watermarks = transformToWatermarks.get(consumer); - watermarks.addPending(bundle); - } - } - - TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform()); - if (input != null) { - // Add the unprocessed inputs - completedTransform.addPending(result.getUnprocessedInputs()); - } - completedTransform.updateTimers(timerUpdate); - if (input != null) { - completedTransform.removePending(input); - } - } - - /** - * Refresh the watermarks contained within this {@link InMemoryWatermarkManager}, causing all - * watermarks to be advanced as far as possible. - */ - synchronized void refreshAll() { - applyPendingUpdates(); - while (!pendingRefreshes.isEmpty()) { - refreshWatermarks(pendingRefreshes.poll()); - } - } - - private void refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) { - TransformWatermarks myWatermarks = transformToWatermarks.get(toRefresh); - WatermarkUpdate updateResult = myWatermarks.refresh(); - Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>(); - if (updateResult.isAdvanced()) { - for (PValue outputPValue : toRefresh.getOutput().expand()) { - additionalRefreshes.addAll(consumers.get(outputPValue)); - } - } - pendingRefreshes.addAll(additionalRefreshes); - } - - /** - * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the - * pending timers will be removed from this {@link InMemoryWatermarkManager}. - */ - public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() { - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> allTimers = new HashMap<>(); - for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry : - transformToWatermarks.entrySet()) { - Map<StructuralKey<?>, FiredTimers> keyFiredTimers = - watermarksEntry.getValue().extractFiredTimers(); - if (!keyFiredTimers.isEmpty()) { - allTimers.put(watermarksEntry.getKey(), keyFiredTimers); - } - } - return allTimers; - } - - /** - * A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global, - * and as such the watermark manager must track holds and the release of holds on a per-key basis. - * - * <p>The {@link #compareTo(KeyedHold)} method of {@link KeyedHold} is not consistent with equals, - * as the key is arbitrarily ordered via identity, rather than object equality. - */ - private static final class KeyedHold implements Comparable<KeyedHold> { - private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast(); - - private final Object key; - private final Instant timestamp; - - /** - * Create a new KeyedHold with the specified key and timestamp. - */ - public static KeyedHold of(Object key, Instant timestamp) { - return new KeyedHold(key, MoreObjects.firstNonNull(timestamp, THE_END_OF_TIME.get())); - } - - private KeyedHold(Object key, Instant timestamp) { - this.key = key; - this.timestamp = timestamp; - } - - @Override - public int compareTo(KeyedHold that) { - return ComparisonChain.start() - .compare(this.timestamp, that.timestamp) - .compare(this.key, that.key, KEY_ORDERING) - .result(); - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, key); - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof KeyedHold)) { - return false; - } - KeyedHold that = (KeyedHold) other; - return Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.key, that.key); - } - - /** - * Get the value of this {@link KeyedHold}. - */ - public Instant getTimestamp() { - return timestamp; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(KeyedHold.class) - .add("key", key) - .add("hold", timestamp) - .toString(); - } - } - - private static class PerKeyHolds { - private final Map<Object, KeyedHold> keyedHolds; - private final PriorityQueue<KeyedHold> allHolds; - - private PerKeyHolds() { - this.keyedHolds = new HashMap<>(); - this.allHolds = new PriorityQueue<>(); - } - - /** - * Gets the minimum hold across all keys in this {@link PerKeyHolds}, or THE_END_OF_TIME if - * there are no holds within this {@link PerKeyHolds}. - */ - public Instant getMinHold() { - return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.peek().getTimestamp(); - } - - /** - * Updates the hold of the provided key to the provided value, removing any other holds for - * the same key. - */ - public void updateHold(@Nullable Object key, Instant newHold) { - removeHold(key); - KeyedHold newKeyedHold = KeyedHold.of(key, newHold); - keyedHolds.put(key, newKeyedHold); - allHolds.offer(newKeyedHold); - } - - /** - * Removes the hold of the provided key. - */ - public void removeHold(Object key) { - KeyedHold oldHold = keyedHolds.get(key); - if (oldHold != null) { - allHolds.remove(oldHold); - } - } - } - - /** - * A reference to the input and output watermarks of an {@link AppliedPTransform}. - */ - public class TransformWatermarks { - private final AppliedPTransformInputWatermark inputWatermark; - private final AppliedPTransformOutputWatermark outputWatermark; - - private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark; - private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark; - - private Instant latestSynchronizedInputWm; - private Instant latestSynchronizedOutputWm; - - private TransformWatermarks( - AppliedPTransformInputWatermark inputWatermark, - AppliedPTransformOutputWatermark outputWatermark, - SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark, - SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) { - this.inputWatermark = inputWatermark; - this.outputWatermark = outputWatermark; - - this.synchronizedProcessingInputWatermark = inputSynchProcessingWatermark; - this.synchronizedProcessingOutputWatermark = outputSynchProcessingWatermark; - this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE; - this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - /** - * Returns the input watermark of the {@link AppliedPTransform}. - */ - public Instant getInputWatermark() { - return Preconditions.checkNotNull(inputWatermark.get()); - } - - /** - * Returns the output watermark of the {@link AppliedPTransform}. - */ - public Instant getOutputWatermark() { - return outputWatermark.get(); - } - - /** - * Returns the synchronized processing input time of the {@link AppliedPTransform}. - * - * <p>The returned value is guaranteed to be monotonically increasing, and outside of the - * presence of holds, will increase as the system time progresses. - */ - public synchronized Instant getSynchronizedProcessingInputTime() { - latestSynchronizedInputWm = INSTANT_ORDERING.max( - latestSynchronizedInputWm, - INSTANT_ORDERING.min(clock.now(), synchronizedProcessingInputWatermark.get())); - return latestSynchronizedInputWm; - } - - /** - * Returns the synchronized processing output time of the {@link AppliedPTransform}. - * - * <p>The returned value is guaranteed to be monotonically increasing, and outside of the - * presence of holds, will increase as the system time progresses. - */ - public synchronized Instant getSynchronizedProcessingOutputTime() { - latestSynchronizedOutputWm = INSTANT_ORDERING.max( - latestSynchronizedOutputWm, - INSTANT_ORDERING.min(clock.now(), synchronizedProcessingOutputWatermark.get())); - return latestSynchronizedOutputWm; - } - - private WatermarkUpdate refresh() { - inputWatermark.refresh(); - synchronizedProcessingInputWatermark.refresh(); - WatermarkUpdate eventOutputUpdate = outputWatermark.refresh(); - WatermarkUpdate syncOutputUpdate = synchronizedProcessingOutputWatermark.refresh(); - return eventOutputUpdate.union(syncOutputUpdate); - } - - private void setEventTimeHold(Object key, Instant newHold) { - outputWatermark.updateHold(key, newHold); - } - - private void removePending(CommittedBundle<?> bundle) { - inputWatermark.removePendingElements(elementsFromBundle(bundle)); - synchronizedProcessingInputWatermark.removePending(bundle); - } - - private void addPending(CommittedBundle<?> bundle) { - inputWatermark.addPendingElements(elementsFromBundle(bundle)); - synchronizedProcessingInputWatermark.addPending(bundle); - } - - private Iterable<? extends WindowedValue<?>> elementsFromBundle(CommittedBundle<?> bundle) { - return FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN); - } - - private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() { - Map<StructuralKey<?>, List<TimerData>> eventTimeTimers = - inputWatermark.extractFiredEventTimeTimers(); - Map<StructuralKey<?>, List<TimerData>> processingTimers; - Map<StructuralKey<?>, List<TimerData>> synchronizedTimers; - if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( - TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE); - synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( - TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE); - } else { - processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( - TimeDomain.PROCESSING_TIME, clock.now()); - synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( - TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime()); - } - Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>(); - groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers); - - Map<StructuralKey<?>, FiredTimers> keyFiredTimers = new HashMap<>(); - for (Map.Entry<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> firedTimers : - groupedTimers.entrySet()) { - keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue())); - } - return keyFiredTimers; - } - - @SafeVarargs - private final void groupFiredTimers( - Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedToMutate, - Map<StructuralKey<?>, List<TimerData>>... timersToGroup) { - for (Map<StructuralKey<?>, List<TimerData>> subGroup : timersToGroup) { - for (Map.Entry<StructuralKey<?>, List<TimerData>> newTimers : subGroup.entrySet()) { - Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey()); - if (grouped == null) { - grouped = new HashMap<>(); - groupedToMutate.put(newTimers.getKey(), grouped); - } - grouped.put(newTimers.getValue().get(0).getDomain(), newTimers.getValue()); - } - } - } - - private void updateTimers(TimerUpdate update) { - inputWatermark.updateTimers(update); - synchronizedProcessingInputWatermark.updateTimers(update); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(TransformWatermarks.class) - .add("inputWatermark", inputWatermark) - .add("outputWatermark", outputWatermark) - .add("inputProcessingTime", synchronizedProcessingInputWatermark) - .add("outputProcessingTime", synchronizedProcessingOutputWatermark) - .toString(); - } - } - - /** - * A collection of newly set, deleted, and completed timers. - * - * <p>setTimers and deletedTimers are collections of {@link TimerData} that have been added to the - * {@link TimerInternals} of an executed step. completedTimers are timers that were delivered as - * the input to the executed step. - */ - public static class TimerUpdate { - private final StructuralKey<?> key; - private final Iterable<? extends TimerData> completedTimers; - - private final Iterable<? extends TimerData> setTimers; - private final Iterable<? extends TimerData> deletedTimers; - - /** - * Returns a TimerUpdate for a null key with no timers. - */ - public static TimerUpdate empty() { - return new TimerUpdate( - null, - Collections.<TimerData>emptyList(), - Collections.<TimerData>emptyList(), - Collections.<TimerData>emptyList()); - } - - /** - * Creates a new {@link TimerUpdate} builder with the provided completed timers that needs the - * set and deleted timers to be added to it. - */ - public static TimerUpdateBuilder builder(StructuralKey<?> key) { - return new TimerUpdateBuilder(key); - } - - /** - * A {@link TimerUpdate} builder that needs to be provided with set timers and deleted timers. - */ - public static final class TimerUpdateBuilder { - private final StructuralKey<?> key; - private final Collection<TimerData> completedTimers; - private final Collection<TimerData> setTimers; - private final Collection<TimerData> deletedTimers; - - private TimerUpdateBuilder(StructuralKey<?> key) { - this.key = key; - this.completedTimers = new HashSet<>(); - this.setTimers = new HashSet<>(); - this.deletedTimers = new HashSet<>(); - } - - /** - * Adds all of the provided timers to the collection of completed timers, and returns this - * {@link TimerUpdateBuilder}. - */ - public TimerUpdateBuilder withCompletedTimers(Iterable<TimerData> completedTimers) { - Iterables.addAll(this.completedTimers, completedTimers); - return this; - } - - /** - * Adds the provided timer to the collection of set timers, removing it from deleted timers if - * it has previously been deleted. Returns this {@link TimerUpdateBuilder}. - */ - public TimerUpdateBuilder setTimer(TimerData setTimer) { - deletedTimers.remove(setTimer); - setTimers.add(setTimer); - return this; - } - - /** - * Adds the provided timer to the collection of deleted timers, removing it from set timers if - * it has previously been set. Returns this {@link TimerUpdateBuilder}. - */ - public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) { - deletedTimers.add(deletedTimer); - setTimers.remove(deletedTimer); - return this; - } - - /** - * Returns a new {@link TimerUpdate} with the most recently set completedTimers, setTimers, - * and deletedTimers. - */ - public TimerUpdate build() { - return new TimerUpdate( - key, - ImmutableSet.copyOf(completedTimers), - ImmutableSet.copyOf(setTimers), - ImmutableSet.copyOf(deletedTimers)); - } - } - - private TimerUpdate( - StructuralKey<?> key, - Iterable<? extends TimerData> completedTimers, - Iterable<? extends TimerData> setTimers, - Iterable<? extends TimerData> deletedTimers) { - this.key = key; - this.completedTimers = completedTimers; - this.setTimers = setTimers; - this.deletedTimers = deletedTimers; - } - - @VisibleForTesting - StructuralKey<?> getKey() { - return key; - } - - @VisibleForTesting - Iterable<? extends TimerData> getCompletedTimers() { - return completedTimers; - } - - @VisibleForTesting - Iterable<? extends TimerData> getSetTimers() { - return setTimers; - } - - @VisibleForTesting - Iterable<? extends TimerData> getDeletedTimers() { - return deletedTimers; - } - - /** - * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers. - */ - public TimerUpdate withCompletedTimers(Iterable<TimerData> completedTimers) { - return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers); - } - - @Override - public int hashCode() { - return Objects.hash(key, completedTimers, setTimers, deletedTimers); - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof TimerUpdate)) { - return false; - } - TimerUpdate that = (TimerUpdate) other; - return Objects.equals(this.key, that.key) - && Objects.equals(this.completedTimers, that.completedTimers) - && Objects.equals(this.setTimers, that.setTimers) - && Objects.equals(this.deletedTimers, that.deletedTimers); - } - } - - /** - * A pair of {@link TimerData} and key which can be delivered to the appropriate - * {@link AppliedPTransform}. A timer fires at the transform that set it with a specific key when - * the time domain in which it lives progresses past a specified time, as determined by the - * {@link InMemoryWatermarkManager}. - */ - public static class FiredTimers { - private final Map<TimeDomain, ? extends Collection<TimerData>> timers; - - private FiredTimers(Map<TimeDomain, ? extends Collection<TimerData>> timers) { - this.timers = timers; - } - - /** - * Gets all of the timers that have fired within the provided {@link TimeDomain}. If no timers - * fired within the provided domain, return an empty collection. - * - * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of increasing timestamp. - */ - public Collection<TimerData> getTimers(TimeDomain domain) { - Collection<TimerData> domainTimers = timers.get(domain); - if (domainTimers == null) { - return Collections.emptyList(); - } - return domainTimers; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(FiredTimers.class).add("timers", timers).toString(); - } - } - - private static class WindowedValueByTimestampComparator extends Ordering<WindowedValue<?>> { - @Override - public int compare(WindowedValue<?> o1, WindowedValue<?> o2) { - return ComparisonChain.start() - .compare(o1.getTimestamp(), o2.getTimestamp()) - .result(); - } - } - - public Set<AppliedPTransform<?, ?, ?>> getCompletedTransforms() { - Set<AppliedPTransform<?, ?, ?>> result = new HashSet<>(); - for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> wms : - transformToWatermarks.entrySet()) { - if (wms.getValue().getOutputWatermark().equals(THE_END_OF_TIME.get())) { - result.add(wms.getKey()); - } - } - return result; - } - - @AutoValue - abstract static class PendingWatermarkUpdate { - @Nullable - public abstract CommittedBundle<?> getInputBundle(); - public abstract TimerUpdate getTimerUpdate(); - public abstract CommittedResult getResult(); - public abstract Instant getEarliestHold(); - - /** - * Gets the {@link AppliedPTransform} that generated this result. - */ - public AppliedPTransform<?, ?, ?> getTransform() { - return getResult().getTransform(); - } - - public static PendingWatermarkUpdate create( - CommittedBundle<?> inputBundle, - TimerUpdate timerUpdate, - CommittedResult result, Instant earliestHold) { - return new AutoValue_InMemoryWatermarkManager_PendingWatermarkUpdate(inputBundle, - timerUpdate, - result, - earliestHold); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java deleted file mode 100644 index 0c7449c..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; - -import org.joda.time.Instant; - -/** - * A factory that produces bundles that perform no additional validation. - */ -class InProcessBundleFactory implements BundleFactory { - public static InProcessBundleFactory create() { - return new InProcessBundleFactory(); - } - - private InProcessBundleFactory() {} - - @Override - public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) { - return InProcessBundle.create(output, StructuralKey.of(null, VoidCoder.of())); - } - - @Override - public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) { - return InProcessBundle.create(output, input.getKey()); - } - - @Override - public <K, T> UncommittedBundle<T> createKeyedBundle( - CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) { - return InProcessBundle.create(output, key); - } - - /** - * A {@link UncommittedBundle} that buffers elements in memory. - */ - private static final class InProcessBundle<T> implements UncommittedBundle<T> { - private final PCollection<T> pcollection; - private final StructuralKey<?> key; - private boolean committed = false; - private ImmutableList.Builder<WindowedValue<T>> elements; - - /** - * Create a new {@link InProcessBundle} for the specified {@link PCollection}. - */ - public static <T> InProcessBundle<T> create(PCollection<T> pcollection, StructuralKey<?> key) { - return new InProcessBundle<>(pcollection, key); - } - - private InProcessBundle(PCollection<T> pcollection, StructuralKey<?> key) { - this.pcollection = pcollection; - this.key = key; - this.elements = ImmutableList.builder(); - } - - @Override - public PCollection<T> getPCollection() { - return pcollection; - } - - @Override - public InProcessBundle<T> add(WindowedValue<T> element) { - checkState( - !committed, - "Can't add element %s to committed bundle in PCollection %s", - element, - pcollection); - elements.add(element); - return this; - } - - @Override - public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) { - checkState(!committed, "Can't commit already committed bundle %s", this); - committed = true; - final Iterable<WindowedValue<T>> committedElements = elements.build(); - return new CommittedInProcessBundle<>( - pcollection, key, committedElements, synchronizedCompletionTime); - } - } - - private static class CommittedInProcessBundle<T> implements CommittedBundle<T> { - public CommittedInProcessBundle( - PCollection<T> pcollection, - StructuralKey<?> key, - Iterable<WindowedValue<T>> committedElements, - Instant synchronizedCompletionTime) { - this.pcollection = pcollection; - this.key = key; - this.committedElements = committedElements; - this.synchronizedCompletionTime = synchronizedCompletionTime; - } - - private final PCollection<T> pcollection; - /** The structural value key of the Bundle, as specified by the coder that created it. */ - private final StructuralKey<?> key; - private final Iterable<WindowedValue<T>> committedElements; - private final Instant synchronizedCompletionTime; - - @Override - public StructuralKey<?> getKey() { - return key; - } - - @Override - public Iterable<WindowedValue<T>> getElements() { - return committedElements; - } - - @Override - public PCollection<T> getPCollection() { - return pcollection; - } - - @Override - public Instant getSynchronizedProcessingOutputWatermark() { - return synchronizedCompletionTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .omitNullValues() - .add("pcollection", pcollection) - .add("key", key) - .add("elements", committedElements) - .toString(); - } - - @Override - public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements) { - return new CommittedInProcessBundle<>( - pcollection, key, ImmutableList.copyOf(elements), synchronizedCompletionTime); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java deleted file mode 100644 index bd07040..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.sdk.util.DoFnRunners.OutputManager; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - -import java.util.Map; - -/** - * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the - * {@link DirectRunner}. - */ -public class InProcessBundleOutputManager implements OutputManager { - private final Map<TupleTag<?>, UncommittedBundle<?>> bundles; - - public static InProcessBundleOutputManager create( - Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) { - return new InProcessBundleOutputManager(outputBundles); - } - - public InProcessBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) { - this.bundles = bundles; - } - - @SuppressWarnings("unchecked") - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - @SuppressWarnings("rawtypes") - UncommittedBundle bundle = bundles.get(tag); - bundle.add(output); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java deleted file mode 100644 index 220ff83..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java +++ /dev/null @@ -1,429 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers; -import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; -import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.util.ExecutionContext; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.MoreExecutors; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import javax.annotation.Nullable; - -/** - * The evaluation context for a specific pipeline being executed by the - * {@link DirectRunner}. Contains state shared within the execution across all - * transforms. - * - * <p>{@link InProcessEvaluationContext} contains shared state for an execution of the - * {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This - * consists of views into underlying state and watermark implementations, access to read and write - * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and - * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when - * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and - * known to be empty). - * - * <p>{@link InProcessEvaluationContext} also handles results by committing finalizing bundles based - * on the current global state and updating the global state appropriately. This includes updating - * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that - * can be executed. - */ -class InProcessEvaluationContext { - /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */ - private final Map<AppliedPTransform<?, ?, ?>, String> stepNames; - - /** The options that were used to create this {@link Pipeline}. */ - private final DirectOptions options; - - private final BundleFactory bundleFactory; - /** The current processing time and event time watermarks and timers. */ - private final InMemoryWatermarkManager watermarkManager; - - /** Executes callbacks based on the progression of the watermark. */ - private final WatermarkCallbackExecutor callbackExecutor; - - /** The stateInternals of the world, by applied PTransform and key. */ - private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>> - applicationStateInternals; - - private final InProcessSideInputContainer sideInputContainer; - - private final CounterSet mergedCounters; - - public static InProcessEvaluationContext create( - DirectOptions options, - BundleFactory bundleFactory, - Collection<AppliedPTransform<?, ?, ?>> rootTransforms, - Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, - Map<AppliedPTransform<?, ?, ?>, String> stepNames, - Collection<PCollectionView<?>> views) { - return new InProcessEvaluationContext( - options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views); - } - - private InProcessEvaluationContext( - DirectOptions options, - BundleFactory bundleFactory, - Collection<AppliedPTransform<?, ?, ?>> rootTransforms, - Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, - Map<AppliedPTransform<?, ?, ?>, String> stepNames, - Collection<PCollectionView<?>> views) { - this.options = checkNotNull(options); - this.bundleFactory = checkNotNull(bundleFactory); - checkNotNull(rootTransforms); - checkNotNull(valueToConsumers); - checkNotNull(stepNames); - checkNotNull(views); - this.stepNames = stepNames; - - this.watermarkManager = - InMemoryWatermarkManager.create( - NanosOffsetClock.create(), rootTransforms, valueToConsumers); - this.sideInputContainer = InProcessSideInputContainer.create(this, views); - - this.applicationStateInternals = new ConcurrentHashMap<>(); - this.mergedCounters = new CounterSet(); - - this.callbackExecutor = - WatermarkCallbackExecutor.create(MoreExecutors.directExecutor()); - } - - /** - * Handle the provided {@link InProcessTransformResult}, produced after evaluating the provided - * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}). - * - * <p>The result is the output of running the transform contained in the - * {@link InProcessTransformResult} on the contents of the provided bundle. - * - * @param completedBundle the bundle that was processed to produce the result. Potentially - * {@code null} if the transform that produced the result is a root - * transform - * @param completedTimers the timers that were delivered to produce the {@code completedBundle}, - * or an empty iterable if no timers were delivered - * @param result the result of evaluating the input bundle - * @return the committed bundles contained within the handled {@code result} - */ - public CommittedResult handleResult( - @Nullable CommittedBundle<?> completedBundle, - Iterable<TimerData> completedTimers, - InProcessTransformResult result) { - Iterable<? extends CommittedBundle<?>> committedBundles = - commitBundles(result.getOutputBundles()); - // Update watermarks and timers - CommittedResult committedResult = CommittedResult.create(result, - completedBundle == null - ? null - : completedBundle.withElements((Iterable) result.getUnprocessedElements()), - committedBundles); - watermarkManager.updateWatermarks( - completedBundle, - result.getTimerUpdate().withCompletedTimers(completedTimers), - committedResult, - result.getWatermarkHold()); - // Update counters - if (result.getCounters() != null) { - mergedCounters.merge(result.getCounters()); - } - // Update state internals - CopyOnAccessInMemoryStateInternals<?> theirState = result.getState(); - if (theirState != null) { - CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit(); - StepAndKey stepAndKey = - StepAndKey.of( - result.getTransform(), completedBundle == null ? null : completedBundle.getKey()); - if (!committedState.isEmpty()) { - applicationStateInternals.put(stepAndKey, committedState); - } else { - applicationStateInternals.remove(stepAndKey); - } - } - return committedResult; - } - - private Iterable<? extends CommittedBundle<?>> commitBundles( - Iterable<? extends UncommittedBundle<?>> bundles) { - ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder(); - for (UncommittedBundle<?> inProgress : bundles) { - AppliedPTransform<?, ?, ?> producing = - inProgress.getPCollection().getProducingTransformInternal(); - TransformWatermarks watermarks = watermarkManager.getWatermarks(producing); - CommittedBundle<?> committed = - inProgress.commit(watermarks.getSynchronizedProcessingOutputTime()); - // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so - // filter them out - if (!Iterables.isEmpty(committed.getElements())) { - completed.add(committed); - } - } - return completed.build(); - } - - private void fireAllAvailableCallbacks() { - for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) { - fireAvailableCallbacks(transform); - } - } - - private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransform) { - TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform); - callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark()); - } - - /** - * Create a {@link UncommittedBundle} for use by a source. - */ - public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) { - return bundleFactory.createRootBundle(output); - } - - /** - * Create a {@link UncommittedBundle} whose elements belong to the specified {@link - * PCollection}. - */ - public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) { - return bundleFactory.createBundle(input, output); - } - - /** - * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by - * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. - */ - public <K, T> UncommittedBundle<T> createKeyedBundle( - CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) { - return bundleFactory.createKeyedBundle(input, key, output); - } - - /** - * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided - * {@link PCollectionView}. - */ - public <ElemT, ViewT> PCollectionViewWriter<ElemT, ViewT> createPCollectionViewWriter( - PCollection<Iterable<ElemT>> input, final PCollectionView<ViewT> output) { - return new PCollectionViewWriter<ElemT, ViewT>() { - @Override - public void add(Iterable<WindowedValue<ElemT>> values) { - sideInputContainer.write(output, values); - } - }; - } - - /** - * Schedule a callback to be executed after output would be produced for the given window - * if there had been input. - * - * <p>Output would be produced when the watermark for a {@link PValue} passes the point at - * which the trigger for the specified window (with the specified windowing strategy) must have - * fired from the perspective of that {@link PValue}, as specified by the value of - * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the - * {@link WindowingStrategy}. When the callback has fired, either values will have been produced - * for a key in that window, the window is empty, or all elements in the window are late. The - * callback will be executed regardless of whether values have been produced. - */ - public void scheduleAfterOutputWouldBeProduced( - PValue value, - BoundedWindow window, - WindowingStrategy<?, ?> windowingStrategy, - Runnable runnable) { - AppliedPTransform<?, ?, ?> producing = getProducing(value); - callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable); - - fireAvailableCallbacks(lookupProducing(value)); - } - - private AppliedPTransform<?, ?, ?> getProducing(PValue value) { - if (value.getProducingTransformInternal() != null) { - return value.getProducingTransformInternal(); - } - return lookupProducing(value); - } - - private AppliedPTransform<?, ?, ?> lookupProducing(PValue value) { - for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) { - if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) { - return transform; - } - } - return null; - } - - /** - * Get the options used by this {@link Pipeline}. - */ - public DirectOptions getPipelineOptions() { - return options; - } - - /** - * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key. - */ - public InProcessExecutionContext getExecutionContext( - AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) { - StepAndKey stepAndKey = StepAndKey.of(application, key); - return new InProcessExecutionContext( - options.getClock(), - key, - (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey), - watermarkManager.getWatermarks(application)); - } - - /** - * Get all of the steps used in this {@link Pipeline}. - */ - public Collection<AppliedPTransform<?, ?, ?>> getSteps() { - return stepNames.keySet(); - } - - /** - * Get the Step Name for the provided application. - */ - public String getStepName(AppliedPTransform<?, ?, ?> application) { - return stepNames.get(application); - } - - /** - * Returns a {@link ReadyCheckingSideInputReader} capable of reading the provided - * {@link PCollectionView PCollectionViews}. - * - * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to - * read - * @return a {@link SideInputReader} that can read all of the provided {@link PCollectionView - * PCollectionViews} - */ - public ReadyCheckingSideInputReader createSideInputReader( - final List<PCollectionView<?>> sideInputs) { - return sideInputContainer.createReaderForViews(sideInputs); - } - - - /** - * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent - * of all other {@link CounterSet CounterSets} created by this call. - * - * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in - * all created {@link CounterSet CounterSets} when the transforms that call this method - * complete. - */ - public CounterSet createCounterSet() { - return new CounterSet(); - } - - /** - * Returns all of the counters that have been merged into this context via calls to - * {@link CounterSet#merge(CounterSet)}. - */ - public CounterSet getCounters() { - return mergedCounters; - } - - @VisibleForTesting - void forceRefresh() { - watermarkManager.refreshAll(); - fireAllAvailableCallbacks(); - } - - /** - * Extracts all timers that have been fired and have not already been extracted. - * - * <p>This is a destructive operation. Timers will only appear in the result of this method once - * for each time they are set. - */ - public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() { - forceRefresh(); - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired = - watermarkManager.extractFiredTimers(); - return fired; - } - - /** - * Returns true if the step will not produce additional output. - * - * <p>If the provided transform produces only {@link IsBounded#BOUNDED} - * {@link PCollection PCollections}, returns true if the watermark is at - * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}. - * - * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED} - * {@link PCollection PCollections}, returns the value of - * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()}. - */ - public boolean isDone(AppliedPTransform<?, ?, ?> transform) { - // if the PTransform's watermark isn't at the max value, it isn't done - if (watermarkManager - .getWatermarks(transform) - .getOutputWatermark() - .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - return false; - } - // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down, - // the PTransform may produce additional output. It is not done. - for (PValue output : transform.getOutput().expand()) { - if (output instanceof PCollection) { - IsBounded bounded = ((PCollection<?>) output).isBounded(); - if (bounded.equals(IsBounded.UNBOUNDED) - && !options.isShutdownUnboundedProducersWithMaxWatermark()) { - return false; - } - } - } - // The PTransform's watermark was at positive infinity and all of its outputs are known to be - // done. It is done. - return true; - } - - /** - * Returns true if all steps are done. - */ - public boolean isDone() { - for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) { - if (!isDone(transform)) { - return false; - } - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java deleted file mode 100644 index d2558ce..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; -import org.apache.beam.sdk.util.BaseExecutionContext; -import org.apache.beam.sdk.util.ExecutionContext; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; - -/** - * Execution Context for the {@link DirectRunner}. - * - * This implementation is not thread safe. A new {@link InProcessExecutionContext} must be created - * for each thread that requires it. - */ -class InProcessExecutionContext - extends BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> { - private final Clock clock; - private final StructuralKey<?> key; - private final CopyOnAccessInMemoryStateInternals<Object> existingState; - private final TransformWatermarks watermarks; - - public InProcessExecutionContext(Clock clock, StructuralKey<?> key, - CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) { - this.clock = clock; - this.key = key; - this.existingState = existingState; - this.watermarks = watermarks; - } - - @Override - protected InProcessStepContext createStepContext(String stepName, String transformName) { - return new InProcessStepContext(this, stepName, transformName); - } - - /** - * Step Context for the {@link DirectRunner}. - */ - public class InProcessStepContext - extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext { - private CopyOnAccessInMemoryStateInternals<Object> stateInternals; - private InProcessTimerInternals timerInternals; - - public InProcessStepContext( - ExecutionContext executionContext, String stepName, String transformName) { - super(executionContext, stepName, transformName); - } - - @Override - public CopyOnAccessInMemoryStateInternals<Object> stateInternals() { - if (stateInternals == null) { - stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState); - } - return stateInternals; - } - - @Override - public InProcessTimerInternals timerInternals() { - if (timerInternals == null) { - timerInternals = - InProcessTimerInternals.create(clock, watermarks, TimerUpdate.builder(key)); - } - return timerInternals; - } - - /** - * Commits the state of this step, and returns the committed state. If the step has not - * accessed any state, return null. - */ - public CopyOnAccessInMemoryStateInternals<?> commitState() { - if (stateInternals != null) { - return stateInternals.commit(); - } - return null; - } - - /** - * Gets the timer update of the {@link TimerInternals} of this {@link InProcessStepContext}, - * which is empty if the {@link TimerInternals} were never accessed. - */ - public TimerUpdate getTimerUpdate() { - if (timerInternals == null) { - return TimerUpdate.empty(); - } - return timerInternals.getTimerUpdate(); - } - } -}