taegeonum closed pull request #159: [NEMO-216,251,259] Support side inputs and 
windowing
URL: https://github.com/apache/incubator-nemo/pull/159
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java 
b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
index a6269a085..6a6ca4d43 100644
--- a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
@@ -20,7 +20,6 @@
 
 import org.apache.nemo.common.exception.CompileTimeOptimizationException;
 import org.apache.nemo.common.ir.edge.IREdge;
-import 
org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
 import 
org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import org.apache.nemo.common.ir.vertex.*;
@@ -258,14 +257,6 @@ private void sinkCheck() {
    * Helper method to check that all execution properties are correct and 
makes sense.
    */
   private void executionPropertyCheck() {
-    // SideInput is not compatible with Push
-    vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e 
instanceof IREdge).map(e -> (IREdge) e)
-        .filter(e -> 
e.getPropertyValue(BroadcastVariableIdProperty.class).isPresent())
-        .filter(e -> 
DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
-        .forEach(e -> {
-          throw new CompileTimeOptimizationException("DAG execution property 
check: "
-              + "Broadcast edge is not compatible with push" + e.getId());
-        }));
     // DataSizeMetricCollection is not compatible with Push (All data have to 
be stored before the data collection)
     vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e 
instanceof IREdge).map(e -> (IREdge) e)
         .filter(e -> 
Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
diff --git 
a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java 
b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index e676e6e8c..1055f0bb9 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -49,7 +49,7 @@ public boolean equals(final Object o) {
 
   @Override
   public String toString() {
-    return String.valueOf(timestamp);
+    return String.valueOf("Watermark(" + timestamp + ")");
   }
 
   @Override
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
new file mode 100644
index 000000000..9a18b0c9f
--- /dev/null
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam;
+
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.compiler.frontend.beam.transform.CreateViewTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+/**
+ * Accumulates and provides side inputs in memory.
+ */
+public final class InMemorySideInputReader implements 
ReadyCheckingSideInputReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemorySideInputReader.class.getName());
+
+  private long curWatermark = Long.MIN_VALUE;
+
+  private final Collection<PCollectionView<?>> sideInputsToRead;
+  private final Map<Pair<PCollectionView<?>, BoundedWindow>, Object> 
inMemorySideInputs;
+
+  public InMemorySideInputReader(final Collection<PCollectionView<?>> 
sideInputsToRead) {
+    this.sideInputsToRead = sideInputsToRead;
+    this.inMemorySideInputs = new HashMap<>();
+  }
+
+  @Override
+  public boolean isReady(final PCollectionView view, final BoundedWindow 
window) {
+    return window.maxTimestamp().getMillis() < curWatermark
+      || inMemorySideInputs.containsKey(Pair.of(view, window));
+  }
+
+  @Nullable
+  @Override
+  public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
+    // This gets called after isReady()
+    final T sideInputData = (T) inMemorySideInputs.get(Pair.of(view, window));
+    return sideInputData == null
+      // The upstream gave us an empty sideInput
+      ? ((ViewFn<Object, T>) view.getViewFn()).apply(new 
CreateViewTransform.MultiView<T>(Collections.emptyList()))
+      // The upstream gave us a concrete sideInput
+      : sideInputData;
+  }
+
+  @Override
+  public <T> boolean contains(final PCollectionView<T> view) {
+    return sideInputsToRead.contains(view);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return sideInputsToRead.isEmpty();
+  }
+
+  /**
+   * Stores the side input in memory to be used with main inputs.
+   * @param view of the side input.
+   * @param sideInputElement to add.
+   */
+  public void addSideInputElement(final PCollectionView<?> view,
+                                  final WindowedValue<SideInputElement<?>> 
sideInputElement) {
+    for (final BoundedWindow bw : sideInputElement.getWindows()) {
+      inMemorySideInputs.put(Pair.of(view, bw), 
sideInputElement.getValue().getSideInputValue());
+    }
+  }
+
+  /**
+   * Say a DoFn of this reader has 3 main inputs and 4 side inputs.
+   * {@link 
org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager} guarantees 
that the watermark here
+   * is the minimum of the all 7 input streams.
+   * @param newWatermark to set.
+   */
+  public void setCurrentWatermarkOfAllMainAndSideInputs(final long 
newWatermark) {
+    if (curWatermark > newWatermark) {
+      // Cannot go backwards in time.
+      throw new IllegalStateException(curWatermark + " > " + newWatermark);
+    }
+
+    this.curWatermark = newWatermark;
+    // TODO #282: Handle late data
+    inMemorySideInputs.entrySet().removeIf(entry -> {
+      return entry.getKey().right().maxTimestamp().getMillis() <= 
this.curWatermark; // Discard old sideinputs.
+    });
+  }
+}
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
index 722f42146..d54a7cdf0 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -35,6 +35,7 @@
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
 import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
+import org.apache.nemo.compiler.frontend.beam.coder.SideInputCoder;
 import org.apache.nemo.compiler.frontend.beam.transform.*;
 
 import java.util.*;
@@ -91,6 +92,45 @@ void addVertex(final IRVertex vertex) {
     builder.addVertex(vertex, loopVertexStack);
   }
 
+  /**
+   * Say the dstIRVertex consumes three views: view0, view1, and view2.
+   *
+   * We translate that as the following:
+   * view0 -> SideInputTransform(index=0) ->
+   * view1 -> SideInputTransform(index=1) -> dstIRVertex(with a map from 
indices to PCollectionViews)
+   * view2 -> SideInputTransform(index=2) ->
+   *
+   * @param dstVertex vertex.
+   * @param sideInputs of the vertex.
+   */
+  void addSideInputEdges(final IRVertex dstVertex, final Map<Integer, 
PCollectionView<?>> sideInputs) {
+    for (final Map.Entry<Integer, PCollectionView<?>> entry : 
sideInputs.entrySet()) {
+      final int index = entry.getKey();
+      final PCollectionView view = entry.getValue();
+
+      final IRVertex srcVertex = pValueToProducerVertex.get(view);
+      final IRVertex sideInputTransformVertex = new OperatorVertex(new 
SideInputTransform(index));
+      addVertex(sideInputTransformVertex);
+      final Coder viewCoder = getCoderForView(view, this);
+      final Coder windowCoder = 
view.getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
+
+      // First edge: view to transform
+      final IREdge firstEdge =
+        new IREdge(CommunicationPatternProperty.Value.OneToOne, srcVertex, 
sideInputTransformVertex);
+      addEdge(firstEdge, viewCoder, windowCoder);
+
+      // Second edge: transform to the dstIRVertex
+      final IREdge secondEdge =
+        new IREdge(CommunicationPatternProperty.Value.OneToOne, 
sideInputTransformVertex, dstVertex);
+      final WindowedValue.FullWindowedValueCoder sideInputElementCoder =
+        WindowedValue.getFullCoder(SideInputCoder.of(viewCoder), windowCoder);
+
+      secondEdge.setProperty(EncoderProperty.of(new 
BeamEncoderFactory(sideInputElementCoder)));
+      secondEdge.setProperty(DecoderProperty.of(new 
BeamDecoderFactory(sideInputElementCoder)));
+      builder.connectVertices(secondEdge);
+    }
+  }
+
   /**
    * Add IR edge to the builder.
    *
@@ -98,62 +138,38 @@ void addVertex(final IRVertex vertex) {
    * @param input the {@link PValue} {@code dst} consumes
    */
   void addEdgeTo(final IRVertex dst, final PValue input) {
-    final Coder coder;
     if (input instanceof PCollection) {
-      coder = ((PCollection) input).getCoder();
-    } else if (input instanceof PCollectionView) {
-      coder = getCoderForView((PCollectionView) input, this);
-    } else {
-      throw new RuntimeException(String.format("While adding an edge to %s, 
coder for PValue %s cannot "
-        + "be determined", dst, input));
-    }
-    addEdgeTo(dst, input, coder);
-  }
+      final Coder elementCoder = ((PCollection) input).getCoder();
+      final Coder windowCoder = ((PCollection) 
input).getWindowingStrategy().getWindowFn().windowCoder();
+      final IRVertex src = pValueToProducerVertex.get(input);
+      if (src == null) {
+        throw new IllegalStateException(String.format("Cannot find a vertex 
that emits pValue %s", input));
+      }
 
-  void addEdgeTo(final IRVertex dst, final PValue input, final Coder 
elementCoder) {
-    final IRVertex src = pValueToProducerVertex.get(input);
-    if (src == null) {
-      throw new IllegalStateException(String.format("Cannot find a vertex that 
emits pValue %s", input));
-    }
+      final CommunicationPatternProperty.Value communicationPattern = 
getCommPattern(src, dst);
+      final IREdge edge = new IREdge(communicationPattern, src, dst);
 
-    final Coder windowCoder;
-    final CommunicationPatternProperty.Value communicationPattern = 
getCommPattern(src, dst);
-    final IREdge edge = new IREdge(communicationPattern, src, dst);
+      if (pValueToTag.containsKey(input)) {
+        
edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
+      }
 
-    if (pValueToTag.containsKey(input)) {
-      
edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
-    }
-    if (input instanceof PCollectionView) {
-      edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView) 
input));
-    }
-    if (input instanceof PCollection) {
-      windowCoder = ((PCollection) 
input).getWindowingStrategy().getWindowFn().windowCoder();
-    } else if (input instanceof PCollectionView) {
-      windowCoder = ((PCollectionView) input).getPCollection()
-        .getWindowingStrategy().getWindowFn().windowCoder();
+      addEdge(edge, elementCoder, windowCoder);
     } else {
-      throw new RuntimeException(String.format("While adding an edge from %s, 
to %s, coder for PValue %s cannot "
-        + "be determined", src, dst, input));
+      throw new IllegalStateException(input.toString());
     }
-
-    addEdgeTo(edge, elementCoder, windowCoder);
   }
 
-  void addEdgeTo(final IREdge edge,
-                 final Coder elementCoder,
-                 final Coder windowCoder) {
+  void addEdge(final IREdge edge, final Coder elementCoder, final Coder 
windowCoder) {
     edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
-
     if (elementCoder instanceof KvCoder) {
       Coder keyCoder = ((KvCoder) elementCoder).getKeyCoder();
       edge.setProperty(KeyEncoderProperty.of(new 
BeamEncoderFactory(keyCoder)));
       edge.setProperty(KeyDecoderProperty.of(new 
BeamDecoderFactory(keyCoder)));
     }
 
-    edge.setProperty(EncoderProperty.of(
-      new BeamEncoderFactory<>(WindowedValue.getFullCoder(elementCoder, 
windowCoder))));
-    edge.setProperty(DecoderProperty.of(
-      new BeamDecoderFactory<>(WindowedValue.getFullCoder(elementCoder, 
windowCoder))));
+    final WindowedValue.FullWindowedValueCoder coder = 
WindowedValue.getFullCoder(elementCoder, windowCoder);
+    edge.setProperty(EncoderProperty.of(new BeamEncoderFactory<>(coder)));
+    edge.setProperty(DecoderProperty.of(new BeamDecoderFactory<>(coder)));
 
     builder.connectVertices(edge);
   }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 7a22ba8a9..a6ae6ceb8 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -25,6 +25,7 @@
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.nemo.common.ir.edge.IREdge;
 import 
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
@@ -48,7 +49,9 @@
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.*;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * A collection of translators for the Beam PTransforms.
@@ -164,7 +167,7 @@ void translatePrimitive(final PipelineTranslationContext 
context,
   private static void unboundedReadTranslator(final PipelineTranslationContext 
ctx,
                                               final TransformHierarchy.Node 
beamNode,
                                               final Read.Unbounded<?> 
transform) {
-    final IRVertex vertex = new 
BeamUnboundedSourceVertex<>(transform.getSource());
+    final IRVertex vertex = new 
BeamUnboundedSourceVertex<>(transform.getSource(), DisplayData.from(transform));
     ctx.addVertex(vertex);
     beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, 
input));
     beamNode.getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(beamNode, vertex, output));
@@ -174,7 +177,7 @@ private static void unboundedReadTranslator(final 
PipelineTranslationContext ctx
   private static void boundedReadTranslator(final PipelineTranslationContext 
ctx,
                                             final TransformHierarchy.Node 
beamNode,
                                             final Read.Bounded<?> transform) {
-    final IRVertex vertex = new 
BeamBoundedSourceVertex<>(transform.getSource());
+    final IRVertex vertex = new 
BeamBoundedSourceVertex<>(transform.getSource(), DisplayData.from(transform));
     ctx.addVertex(vertex);
     beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, 
input));
     beamNode.getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(beamNode, vertex, output));
@@ -184,14 +187,15 @@ private static void boundedReadTranslator(final 
PipelineTranslationContext ctx,
   private static void parDoSingleOutputTranslator(final 
PipelineTranslationContext ctx,
                                                   final 
TransformHierarchy.Node beamNode,
                                                   final ParDo.SingleOutput<?, 
?> transform) {
-    final DoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode);
+    final Map<Integer, PCollectionView<?>> sideInputMap = 
getSideInputMap(transform.getSideInputs());
+    final AbstractDoFnTransform doFnTransform = createDoFnTransform(ctx, 
beamNode, sideInputMap);
     final IRVertex vertex = new OperatorVertex(doFnTransform);
 
     ctx.addVertex(vertex);
     beamNode.getInputs().values().stream()
       .filter(input -> 
!transform.getAdditionalInputs().values().contains(input))
       .forEach(input -> ctx.addEdgeTo(vertex, input));
-    transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
+    ctx.addSideInputEdges(vertex, sideInputMap);
     beamNode.getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(beamNode, vertex, output));
   }
 
@@ -199,13 +203,14 @@ private static void parDoSingleOutputTranslator(final 
PipelineTranslationContext
   private static void parDoMultiOutputTranslator(final 
PipelineTranslationContext ctx,
                                                  final TransformHierarchy.Node 
beamNode,
                                                  final ParDo.MultiOutput<?, ?> 
transform) {
-    final DoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode);
+    final Map<Integer, PCollectionView<?>> sideInputMap = 
getSideInputMap(transform.getSideInputs());
+    final AbstractDoFnTransform doFnTransform = createDoFnTransform(ctx, 
beamNode, sideInputMap);
     final IRVertex vertex = new OperatorVertex(doFnTransform);
     ctx.addVertex(vertex);
     beamNode.getInputs().values().stream()
       .filter(input -> 
!transform.getAdditionalInputs().values().contains(input))
       .forEach(input -> ctx.addEdgeTo(vertex, input));
-    transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
+    ctx.addSideInputEdges(vertex, sideInputMap);
     beamNode.getOutputs().entrySet().stream()
       .filter(pValueWithTupleTag -> 
pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
       .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(beamNode, 
vertex, pValueWithTupleTag.getValue()));
@@ -237,7 +242,8 @@ private static void windowTranslator(final 
PipelineTranslationContext ctx,
     } else {
       throw new UnsupportedOperationException(String.format("%s is not 
supported", transform));
     }
-    final IRVertex vertex = new OperatorVertex(new 
WindowFnTransform(windowFn));
+    final IRVertex vertex = new OperatorVertex(
+      new WindowFnTransform(windowFn, 
DisplayData.from(beamNode.getTransform())));
     ctx.addVertex(vertex);
     beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, 
input));
     beamNode.getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(beamNode, vertex, output));
@@ -317,7 +323,7 @@ private static void flattenTranslator(final 
PipelineTranslationContext ctx,
     final IRVertex finalCombine = new OperatorVertex(new 
CombineFnFinalTransform<>(combineFn));
     ctx.addVertex(finalCombine);
     final IREdge edge = new IREdge(CommunicationPatternProperty.Value.Shuffle, 
partialCombine, finalCombine);
-    ctx.addEdgeTo(
+    ctx.addEdge(
       edge,
       KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
       input.getWindowingStrategy().getWindowFn().windowCoder());
@@ -348,27 +354,45 @@ private static void flattenTranslator(final 
PipelineTranslationContext ctx,
   
////////////////////////////////////////////////////////////////////////////////////////////////////////
   /////////////////////// HELPER METHODS
 
-  private static DoFnTransform createDoFnTransform(final 
PipelineTranslationContext ctx,
-                                                   final 
TransformHierarchy.Node beamNode) {
+  private static Map<Integer, PCollectionView<?>> getSideInputMap(final 
List<PCollectionView<?>> viewList) {
+    return IntStream.range(0, 
viewList.size()).boxed().collect(Collectors.toMap(Function.identity(), 
viewList::get));
+  }
+
+  private static AbstractDoFnTransform createDoFnTransform(final 
PipelineTranslationContext ctx,
+                                                           final 
TransformHierarchy.Node beamNode,
+                                                           final Map<Integer, 
PCollectionView<?>> sideInputMap) {
     try {
       final AppliedPTransform pTransform = 
beamNode.toAppliedPTransform(ctx.getPipeline());
       final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
       final TupleTag mainOutputTag = 
ParDoTranslation.getMainOutputTag(pTransform);
-      final List<PCollectionView<?>> sideInputs = 
ParDoTranslation.getSideInputs(pTransform);
       final TupleTagList additionalOutputTags = 
ParDoTranslation.getAdditionalOutputTags(pTransform);
 
       final PCollection<?> mainInput = (PCollection<?>)
         
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
 
-      return new DoFnTransform(
-        doFn,
-        mainInput.getCoder(),
-        getOutputCoders(pTransform),
-        mainOutputTag,
-        additionalOutputTags.getAll(),
-        mainInput.getWindowingStrategy(),
-        sideInputs,
-        ctx.getPipelineOptions());
+      if (sideInputMap.isEmpty()) {
+        return new DoFnTransform(
+          doFn,
+          mainInput.getCoder(),
+          getOutputCoders(pTransform),
+          mainOutputTag,
+          additionalOutputTags.getAll(),
+          mainInput.getWindowingStrategy(),
+          ctx.getPipelineOptions(),
+          DisplayData.from(beamNode.getTransform()));
+      } else {
+        return new PushBackDoFnTransform(
+          doFn,
+          mainInput.getCoder(),
+          getOutputCoders(pTransform),
+          mainOutputTag,
+          additionalOutputTags.getAll(),
+          mainInput.getWindowingStrategy(),
+          sideInputMap,
+          ctx.getPipelineOptions(),
+          DisplayData.from(beamNode.getTransform()));
+
+      }
     } catch (final IOException e) {
       throw new RuntimeException(e);
     }
@@ -404,11 +428,10 @@ private static Transform createGBKTransform(
       return new GroupByKeyAndWindowDoFnTransform(
         getOutputCoders(pTransform),
         mainOutputTag,
-        Collections.emptyList(),  /*  GBK does not have additional outputs */
         mainInput.getWindowingStrategy(),
-        Collections.emptyList(), /*  GBK does not have additional side inputs 
*/
         ctx.getPipelineOptions(),
-        SystemReduceFn.buffering(mainInput.getCoder()));
+        SystemReduceFn.buffering(mainInput.getCoder()),
+        DisplayData.from(beamNode.getTransform()));
     }
   }
 
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java
similarity index 54%
rename from 
common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java
rename to 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java
index d7e8aa461..22f8d72a4 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java
@@ -16,31 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.common.ir.edge.executionproperty;
-
-import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
-
-import java.io.Serializable;
+package org.apache.nemo.compiler.frontend.beam;
 
 /**
- * Edges with this property fetch a broadcast variable.
+ * {@link org.apache.nemo.compiler.frontend.beam.transform.DoFnTransform} 
treats elements of this type as side inputs.
+ * TODO #289: Prevent using SideInputElement in UDFs
+ * @param <T> type of the side input value.
  */
-public final class BroadcastVariableIdProperty extends 
EdgeExecutionProperty<Serializable> {
+public final class SideInputElement<T> {
+  private final int sideInputIndex;
+  private final T sideInputValue;
+
+  public SideInputElement(final int sideInputIndex, final T sideInputValue) {
+    this.sideInputIndex = sideInputIndex;
+    this.sideInputValue = sideInputValue;
+  }
 
-  /**
-   * Constructor.
-   * @param value id.
-   */
-  private BroadcastVariableIdProperty(final Serializable value) {
-    super(value);
+  public int getSideInputIndex() {
+    return sideInputIndex;
   }
 
-  /**
-   * Static method exposing constructor.
-   * @param value id.
-   * @return the newly created execution property.
-   */
-  public static BroadcastVariableIdProperty of(final Serializable value) {
-    return new BroadcastVariableIdProperty(value);
+  public T getSideInputValue() {
+    return sideInputValue;
   }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
new file mode 100644
index 000000000..59a1792cd
--- /dev/null
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.coder;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+
+import java.io.*;
+
+/**
+ * EncoderFactory for side inputs.
+ * @param <T> type of the side input value.
+ */
+public final class SideInputCoder<T> extends AtomicCoder<SideInputElement<T>> {
+  private final Coder<T> valueCoder;
+
+  /**
+   * Private constructor.
+   */
+  private SideInputCoder(final Coder<T> valueCoder) {
+    this.valueCoder = valueCoder;
+  }
+
+  /**
+   * @return a new coder
+   */
+  public static SideInputCoder of(final Coder valueCoder) {
+    return new SideInputCoder<>(valueCoder);
+  }
+
+  @Override
+  public void encode(final SideInputElement<T> sideInputElement, final 
OutputStream outStream) throws IOException {
+    final DataOutputStream dataOutputStream = new DataOutputStream(outStream);
+    dataOutputStream.writeInt(sideInputElement.getSideInputIndex());
+    valueCoder.encode(sideInputElement.getSideInputValue(), dataOutputStream);
+  }
+
+  @Override
+  public SideInputElement<T> decode(final InputStream inStream) throws 
IOException {
+    final DataInputStream dataInputStream = new DataInputStream(inStream);
+    final int index = dataInputStream.readInt();
+    final T value = valueCoder.decode(inStream);
+    return new SideInputElement<>(index, value);
+  }
+}
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index bc672b7b9..30947fd3e 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.compiler.frontend.beam.source;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.Readable;
 
@@ -42,17 +43,18 @@
 public final class BeamBoundedSourceVertex<O> extends 
SourceVertex<WindowedValue<O>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamBoundedSourceVertex.class.getName());
   private BoundedSource<O> source;
-  private final String sourceDescription;
+  private final DisplayData displayData;
 
   /**
    * Constructor of BeamBoundedSourceVertex.
    *
    * @param source BoundedSource to read from.
+   * @param displayData data to display.
    */
-  public BeamBoundedSourceVertex(final BoundedSource<O> source) {
+  public BeamBoundedSourceVertex(final BoundedSource<O> source, final 
DisplayData displayData) {
     super();
     this.source = source;
-    this.sourceDescription = source.toString();
+    this.displayData = displayData;
   }
 
   /**
@@ -63,7 +65,7 @@ public BeamBoundedSourceVertex(final BoundedSource<O> source) 
{
   public BeamBoundedSourceVertex(final BeamBoundedSourceVertex that) {
     super(that);
     this.source = that.source;
-    this.sourceDescription = that.source.toString();
+    this.displayData = that.displayData;
   }
 
   @Override
@@ -94,7 +96,7 @@ public void clearInternalStates() {
   @Override
   public ObjectNode getPropertiesAsJsonNode() {
     final ObjectNode node = getIRVertexPropertiesAsJsonNode();
-    node.put("source", sourceDescription);
+    node.put("source", displayData.toString());
     return node;
   }
 
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index ad40d1b1f..5942925b0 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -20,6 +20,7 @@
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.Readable;
@@ -44,22 +45,23 @@
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamUnboundedSourceVertex.class.getName());
   private UnboundedSource<O, M> source;
-  private final String sourceDescription;
+  private final DisplayData displayData;
 
   /**
    * The default constructor for beam unbounded source.
    * @param source unbounded source.
    */
-  public BeamUnboundedSourceVertex(final UnboundedSource<O, M> source) {
+  public BeamUnboundedSourceVertex(final UnboundedSource<O, M> source,
+                                   final DisplayData displayData) {
     super();
     this.source = source;
-    this.sourceDescription = source.toString();
+    this.displayData = displayData;
   }
 
   private BeamUnboundedSourceVertex(final BeamUnboundedSourceVertex<O, M> 
that) {
     super(that);
     this.source = that.source;
-    this.sourceDescription = that.source.toString();
+    this.displayData = that.displayData;
   }
 
   @Override
@@ -88,7 +90,7 @@ public void clearInternalStates() {
   @Override
   public ObjectNode getPropertiesAsJsonNode() {
     final ObjectNode node = getIRVertexPropertiesAsJsonNode();
-    node.put("source", sourceDescription);
+    node.put("source", displayData.toString());
     return node;
   }
 
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index dd5ca35be..72113d654 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -23,6 +23,7 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -31,11 +32,12 @@
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -52,7 +54,7 @@
 
   private final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> additionalOutputTags;
-  private final Collection<PCollectionView<?>> sideInputs;
+  private final Map<Integer, PCollectionView<?>> sideInputs;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final DoFn<InterT, OutputT> doFn;
   private final SerializablePipelineOptions serializedOptions;
@@ -61,9 +63,13 @@
 
   private transient OutputCollector<WindowedValue<OutputT>> outputCollector;
   private transient DoFnRunner<InterT, OutputT> doFnRunner;
-  private transient SideInputReader sideInputReader;
+
+  // null when there is no side input.
+  private transient PushbackSideInputDoFnRunner<InterT, OutputT> 
pushBackRunner;
+
   private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
   private transient DoFnRunners.OutputManager outputManager;
+  private transient InMemorySideInputReader sideInputReader;
 
   // Variables for bundle.
   // We consider count and time millis for start/finish bundle.
@@ -74,6 +80,7 @@
   private long prevBundleStartTime;
   private long currBundleCount = 0;
   private boolean bundleFinished = true;
+  private final DisplayData displayData;
 
   /**
    * AbstractDoFnTransform constructor.
@@ -85,6 +92,7 @@
    * @param windowingStrategy windowing strategy
    * @param sideInputs side inputs
    * @param options pipeline options
+   * @param displayData display data.
    */
   public AbstractDoFnTransform(final DoFn<InterT, OutputT> doFn,
                                final Coder<InputT> inputCoder,
@@ -92,8 +100,9 @@ public AbstractDoFnTransform(final DoFn<InterT, OutputT> 
doFn,
                                final TupleTag<OutputT> mainOutputTag,
                                final List<TupleTag<?>> additionalOutputTags,
                                final WindowingStrategy<?, ?> windowingStrategy,
-                               final Collection<PCollectionView<?>> sideInputs,
-                               final PipelineOptions options) {
+                               final Map<Integer, PCollectionView<?>> 
sideInputs,
+                               final PipelineOptions options,
+                               final DisplayData displayData) {
     this.doFn = doFn;
     this.inputCoder = inputCoder;
     this.outputCoders = outputCoders;
@@ -102,28 +111,37 @@ public AbstractDoFnTransform(final DoFn<InterT, OutputT> 
doFn,
     this.sideInputs = sideInputs;
     this.serializedOptions = new SerializablePipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
+    this.displayData = displayData;
   }
 
-  protected final DoFnRunners.OutputManager getOutputManager() {
-    return outputManager;
+  final Map<Integer, PCollectionView<?>> getSideInputs() {
+    return sideInputs;
   }
 
-  protected final WindowingStrategy getWindowingStrategy() {
-    return windowingStrategy;
+  final DoFnRunners.OutputManager getOutputManager() {
+    return outputManager;
   }
 
-  protected final SideInputReader getSideInputReader() {
-    return sideInputReader;
+  final WindowingStrategy getWindowingStrategy() {
+    return windowingStrategy;
   }
 
-  protected final TupleTag<OutputT> getMainOutputTag() {
+  final TupleTag<OutputT> getMainOutputTag() {
     return mainOutputTag;
   }
 
-  protected final DoFnRunner<InterT, OutputT> getDoFnRunner() {
+  final DoFnRunner<InterT, OutputT> getDoFnRunner() {
     return doFnRunner;
   }
 
+  final PushbackSideInputDoFnRunner<InterT, OutputT> getPushBackRunner() {
+    return pushBackRunner;
+  }
+
+  final InMemorySideInputReader getSideInputReader() {
+    return sideInputReader;
+  }
+
   public final DoFn getDoFn() {
     return doFn;
   }
@@ -131,26 +149,51 @@ public final DoFn getDoFn() {
   /**
    * Checks whether the bundle is finished or not.
    * Starts the bundle if it is done.
+   *
+   * TODO #263: Partial Combining for Beam Streaming
+   * We may want to use separate methods for doFnRunner/pushBackRunner
+   * (same applies to the other bundle-related methods)
    */
-  protected final void checkAndInvokeBundle() {
+  final void checkAndInvokeBundle() {
     if (bundleFinished) {
       bundleFinished = false;
-      doFnRunner.startBundle();
+      if (pushBackRunner == null) {
+        doFnRunner.startBundle();
+      } else {
+        pushBackRunner.startBundle();
+      }
       prevBundleStartTime = System.currentTimeMillis();
       currBundleCount = 0;
     }
     currBundleCount += 1;
   }
 
-
   /**
    * Checks whether it is time to finish the bundle and finish it.
    */
-  protected final void checkAndFinishBundle() {
+  final void checkAndFinishBundle() {
     if (!bundleFinished) {
       if (currBundleCount >= bundleSize || System.currentTimeMillis() - 
prevBundleStartTime >= bundleMillis) {
         bundleFinished = true;
+        if (pushBackRunner == null) {
+          doFnRunner.finishBundle();
+        } else {
+          pushBackRunner.finishBundle();
+        }
+      }
+    }
+  }
+
+  /**
+   * Finish bundle without checking for conditions.
+   */
+  final void forceFinishBundle() {
+    if (!bundleFinished) {
+      bundleFinished = true;
+      if (pushBackRunner == null) {
         doFnRunner.finishBundle();
+      } else {
+        pushBackRunner.finishBundle();
       }
     }
   }
@@ -168,11 +211,7 @@ public final void prepare(final Context context, final 
OutputCollector<WindowedV
     outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);
 
     // create side input reader
-    if (!sideInputs.isEmpty()) {
-      sideInputReader = new BroadcastVariableSideInputReader(context, 
sideInputs);
-    } else {
-      sideInputReader = NullSideInputReader.of(sideInputs);
-    }
+    sideInputReader = new InMemorySideInputReader(new 
ArrayList<>(sideInputs.values()));
 
     // this transform does not support state and timer.
     final StepContext stepContext = new StepContext() {
@@ -205,6 +244,10 @@ public TimerInternals timerInternals() {
       inputCoder,
       outputCoders,
       windowingStrategy);
+
+    pushBackRunner = sideInputs.isEmpty()
+      ? null
+      : SimplePushbackSideInputDoFnRunner.<InterT, OutputT>create(doFnRunner, 
sideInputs.values(), sideInputReader);
   }
 
   public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
@@ -214,12 +257,15 @@ public TimerInternals timerInternals() {
   @Override
   public final void close() {
     beforeClose();
-    if (!bundleFinished) {
-      doFnRunner.finishBundle();
-    }
+    forceFinishBundle();
     doFnInvoker.invokeTeardown();
   }
 
+  @Override
+  public final String toString() {
+    return this.getClass().getSimpleName() + " / " + 
displayData.toString().replace(":", " / ");
+  }
+
   /**
    * An abstract function that wraps the original doFn.
    * @param originalDoFn the original doFn.
@@ -234,9 +280,6 @@ public final void close() {
    */
   abstract OutputCollector wrapOutputCollector(final OutputCollector oc);
 
-  @Override
-  public abstract void onData(final WindowedValue<InputT> data);
-
   /**
    * An abstract function that is called before close.
    */
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
deleted file mode 100644
index 64460f977..000000000
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-
-import javax.annotation.Nullable;
-import java.util.Collection;
-
-/**
- * A sideinput reader that reads/writes side input values to context.
- */
-public final class BroadcastVariableSideInputReader implements SideInputReader 
{
-
-  // Nemo context for storing/getting side inputs
-  private final Transform.Context context;
-
-  // The list of side inputs that we're handling
-  private final Collection<PCollectionView<?>> sideInputs;
-
-  BroadcastVariableSideInputReader(final Transform.Context context,
-                                   final Collection<PCollectionView<?>> 
sideInputs) {
-    this.context = context;
-    this.sideInputs = sideInputs;
-  }
-
-  @Nullable
-  @Override
-  public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
-    // TODO #216: implement side input and windowing
-    return ((WindowedValue<T>) context.getBroadcastVariable(view)).getValue();
-  }
-
-  @Override
-  public <T> boolean contains(final PCollectionView<T> view) {
-    return sideInputs.contains(view);
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return sideInputs.isEmpty();
-  }
-}
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index 05e5af610..03313c44e 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -37,14 +37,12 @@
  * @param <I> input type
  * @param <O> materialized output type
  */
-public final class CreateViewTransform<I, O> implements
-  Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
-  private OutputCollector<WindowedValue<O>> outputCollector;
+public final class CreateViewTransform<I, O> implements 
Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
   private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
   private final Map<BoundedWindow, List<I>> windowListMap;
 
-  // TODO #259: we can remove this variable by implementing 
ReadyCheckingSideInputReader
-  private boolean isEmitted = false;
+  private OutputCollector<WindowedValue<O>> outputCollector;
+
   private long currentOutputWatermark;
 
   /**
@@ -75,7 +73,6 @@ public void onData(final WindowedValue<KV<?, I>> element) {
 
   @Override
   public void onWatermark(final Watermark inputWatermark) {
-
     // If no data, just forwards the watermark
     if (windowListMap.size() == 0 && currentOutputWatermark < 
inputWatermark.getTimestamp()) {
       currentOutputWatermark = inputWatermark.getTimestamp();
@@ -90,11 +87,10 @@ public void onWatermark(final Watermark inputWatermark) {
       final Map.Entry<BoundedWindow, List<I>> entry = iterator.next();
       if (entry.getKey().maxTimestamp().getMillis() <= 
inputWatermark.getTimestamp()) {
         // emit the windowed data if the watermark timestamp > the window max 
boundary
-        final O view = viewFn.apply(new MultiView<>(entry.getValue()));
+        final O output = viewFn.apply(new MultiView<>(entry.getValue()));
         outputCollector.emit(WindowedValue.of(
-          view, entry.getKey().maxTimestamp(), entry.getKey(), 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
+          output, entry.getKey().maxTimestamp(), entry.getKey(), 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
         iterator.remove();
-        isEmitted = true;
 
         minOutputTimestampOfEmittedWindows =
           Math.min(minOutputTimestampOfEmittedWindows, 
entry.getKey().maxTimestamp().getMillis());
@@ -112,20 +108,12 @@ public void onWatermark(final Watermark inputWatermark) {
   @Override
   public void close() {
     onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
-
-    if (!isEmitted) {
-      // TODO #259: This is an ad-hoc code to resolve the view that has no data
-      // Currently, broadCastWorker reads the view data, but it throws 
exception if no data is available for a view.
-      // We should use watermark value to track whether the materialized data 
in a view is available or not.
-      final O view = viewFn.apply(new MultiView<>(Collections.emptyList()));
-      outputCollector.emit(WindowedValue.valueInGlobalWindow(view));
-    }
   }
 
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("CreateViewTransform:" + viewFn);
+    sb.append("CreateViewTransform  " + viewFn.getClass().getName());
     return sb.toString();
   }
 
@@ -133,13 +121,13 @@ public String toString() {
    * Represents {@code PrimitiveViewT} supplied to the {@link ViewFn}.
    * @param <T> primitive view type
    */
-  public final class MultiView<T> implements 
Materializations.MultimapView<Void, T>, Serializable {
+  public static final class MultiView<T> implements 
Materializations.MultimapView<Void, T>, Serializable {
     private final Iterable<T> iterable;
 
     /**
      * Constructor.
      */
-    MultiView(final Iterable<T> iterable) {
+    public MultiView(final Iterable<T> iterable) {
       // Create a placeholder for side input data. CreateViewTransform#onData 
stores data to this list.
       this.iterable = iterable;
     }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 9f7a4e0f7..699a0dd7f 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -21,8 +21,8 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
@@ -30,12 +30,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 /**
- * DoFn transform implementation.
+ * DoFn transform implementation when there is no side input.
  *
  * @param <InputT> input type.
  * @param <OutputT> output type.
@@ -45,9 +45,6 @@
 
   /**
    * DoFnTransform Constructor.
-   *
-   * @param doFn    doFn.
-   * @param options Pipeline options.
    */
   public DoFnTransform(final DoFn<InputT, OutputT> doFn,
                        final Coder<InputT> inputCoder,
@@ -55,10 +52,10 @@ public DoFnTransform(final DoFn<InputT, OutputT> doFn,
                        final TupleTag<OutputT> mainOutputTag,
                        final List<TupleTag<?>> additionalOutputTags,
                        final WindowingStrategy<?, ?> windowingStrategy,
-                       final Collection<PCollectionView<?>> sideInputs,
-                       final PipelineOptions options) {
+                       final PipelineOptions options,
+                       final DisplayData displayData) {
     super(doFn, inputCoder, outputCoders, mainOutputTag,
-      additionalOutputTags, windowingStrategy, sideInputs, options);
+      additionalOutputTags, windowingStrategy, Collections.emptyMap(), 
options, displayData);
   }
 
   @Override
@@ -68,6 +65,7 @@ protected DoFn wrapDoFn(final DoFn initDoFn) {
 
   @Override
   public void onData(final WindowedValue<InputT> data) {
+    // Do not need any push-back logic.
     checkAndInvokeBundle();
     getDoFnRunner().processElement(data);
     checkAndFinishBundle();
@@ -76,26 +74,16 @@ public void onData(final WindowedValue<InputT> data) {
   @Override
   public void onWatermark(final Watermark watermark) {
     checkAndInvokeBundle();
-    // TODO #216: We should consider push-back data that waits for side input
-    // TODO #216: If there are push-back data, input watermark >= output 
watermark
     getOutputCollector().emitWatermark(watermark);
     checkAndFinishBundle();
   }
 
   @Override
   protected void beforeClose() {
-    // nothing
   }
 
   @Override
   OutputCollector wrapOutputCollector(final OutputCollector oc) {
     return oc;
   }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("DoTransform:" + getDoFn());
-    return sb.toString();
-  }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 84b6835b9..06dde5861 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -22,9 +22,9 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -57,19 +57,19 @@
    */
   public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> 
outputCoders,
                                           final TupleTag<KV<K, 
Iterable<InputT>>> mainOutputTag,
-                                          final List<TupleTag<?>> 
additionalOutputTags,
                                           final WindowingStrategy<?, ?> 
windowingStrategy,
-                                          final Collection<PCollectionView<?>> 
sideInputs,
                                           final PipelineOptions options,
-                                          final SystemReduceFn reduceFn) {
+                                          final SystemReduceFn reduceFn,
+                                          final DisplayData displayData) {
     super(null, /* doFn */
       null, /* inputCoder */
       outputCoders,
       mainOutputTag,
-      additionalOutputTags,
+      Collections.emptyList(),  /*  GBK does not have additional outputs */
       windowingStrategy,
-      sideInputs,
-      options);
+      Collections.emptyMap(), /*  GBK does not have additional side inputs */
+      options,
+      displayData);
     this.keyToValues = new HashMap<>();
     this.reduceFn = reduceFn;
     this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
@@ -93,7 +93,7 @@ protected DoFn wrapDoFn(final DoFn doFn) {
         getWindowingStrategy(),
         inMemoryStateInternalsFactory,
         inMemoryTimerInternalsFactory,
-        getSideInputReader(),
+        null, // GBK has no sideinput.
         reduceFn,
         getOutputManager(),
         getMainOutputTag());
@@ -163,23 +163,19 @@ private void processElementsAndTriggerTimers(final 
Watermark inputWatermark,
    * @param inputWatermark input watermark
    */
   private void emitOutputWatermark(final Watermark inputWatermark) {
-
-    if (keyAndWatermarkHoldMap.isEmpty()) {
-      return;
-    }
-
     // Find min watermark hold
-    final Watermark minWatermarkHold = 
Collections.min(keyAndWatermarkHoldMap.values());
+    final Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+      ? new Watermark(Long.MAX_VALUE) // set this to MAX, in order to just use 
the input watermark.
+      : Collections.min(keyAndWatermarkHoldMap.values());
+    final Watermark outputWatermarkCandidate = new Watermark(
+      Math.max(prevOutputWatermark.getTimestamp(),
+        Math.min(minWatermarkHold.getTimestamp(), 
inputWatermark.getTimestamp())));
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Watermark hold: {}, "
         + "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, 
inputWatermark, prevOutputWatermark);
     }
 
-    final Watermark outputWatermarkCandidate = new Watermark(
-      Math.max(prevOutputWatermark.getTimestamp(),
-        Math.min(minWatermarkHold.getTimestamp(), 
inputWatermark.getTimestamp())));
-
     if (outputWatermarkCandidate.getTimestamp() > 
prevOutputWatermark.getTimestamp()) {
       // progress!
       prevOutputWatermark = outputWatermarkCandidate;
@@ -211,8 +207,6 @@ protected void beforeClose() {
     // Finish any pending windows by advancing the input watermark to infinity.
     processElementsAndTriggerTimers(new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
       BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
-    // Emit watermark to downstream operators
-    emitOutputWatermark(new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
   }
 
   /**
@@ -258,13 +252,6 @@ private void triggerTimers(final K key,
     }
   }
 
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("GroupByKeyAndWindowDoFnTransform:");
-    return sb.toString();
-  }
-
   /**
    * Get timer data.
    */
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 0f4cf5b64..71c68eab3 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -69,12 +69,4 @@ public void close() {
       }
     }
   }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("GroupByKeyTransform:");
-    sb.append(super.toString());
-    return sb.toString();
-  }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
new file mode 100644
index 000000000..d8f0d8ff4
--- /dev/null
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation with push backs for side inputs.
+ *
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class PushBackDoFnTransform<InputT, OutputT> extends 
AbstractDoFnTransform<InputT, InputT, OutputT> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PushBackDoFnTransform.class.getName());
+
+  private List<WindowedValue<InputT>> curPushedBacks;
+  private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back 
exists.
+  private long curInputWatermark;
+  private long curOutputWatermark;
+
+  /**
+   * PushBackDoFnTransform Constructor.
+   */
+  public PushBackDoFnTransform(final DoFn<InputT, OutputT> doFn,
+                               final Coder<InputT> inputCoder,
+                               final Map<TupleTag<?>, Coder<?>> outputCoders,
+                               final TupleTag<OutputT> mainOutputTag,
+                               final List<TupleTag<?>> additionalOutputTags,
+                               final WindowingStrategy<?, ?> windowingStrategy,
+                               final Map<Integer, PCollectionView<?>> 
sideInputs,
+                               final PipelineOptions options,
+                               final DisplayData displayData) {
+    super(doFn, inputCoder, outputCoders, mainOutputTag,
+      additionalOutputTags, windowingStrategy, sideInputs, options, 
displayData);
+    this.curPushedBacks = new ArrayList<>();
+    this.curPushedBackWatermark = Long.MAX_VALUE;
+    this.curInputWatermark = Long.MIN_VALUE;
+    this.curOutputWatermark = Long.MIN_VALUE;
+  }
+
+  @Override
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+    return initDoFn;
+  }
+
+  @Override
+  public void onData(final WindowedValue data) {
+    // Need to distinguish side/main inputs and push-back main inputs.
+    if (data.getValue() instanceof SideInputElement) {
+      // This element is a Side Input
+      // TODO #287: Consider Explicit Multi-Input IR Transform
+      final WindowedValue<SideInputElement> sideInputElement = 
(WindowedValue<SideInputElement>) data;
+      final PCollectionView view = 
getSideInputs().get(sideInputElement.getValue().getSideInputIndex());
+      getSideInputReader().addSideInputElement(view, data);
+
+      handlePushBacks();
+
+      // See if we can emit a new watermark, as we may have processed some 
pushed-back elements
+      onWatermark(new Watermark(curInputWatermark));
+    } else {
+      // This element is the Main Input
+      checkAndInvokeBundle();
+      final Iterable<WindowedValue<InputT>> pushedBack =
+        getPushBackRunner().processElementInReadyWindows(data);
+      for (final WindowedValue wv : pushedBack) {
+        curPushedBackWatermark = Math.min(curPushedBackWatermark, 
wv.getTimestamp().getMillis());
+        curPushedBacks.add(wv);
+      }
+      checkAndFinishBundle();
+    }
+  }
+
+  private void handlePushBacks() {
+    // Force-finish, before (possibly) processing pushed-back data.
+    //
+    // Main reason:
+    // {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
+    // caches for each bundle the side inputs that are not ready.
+    // We need to re-start the bundle to advertise the (possibly) newly 
available side input.
+    forceFinishBundle(); // forced
+
+    // With the new side input added, we may be able to process some 
pushed-back elements.
+    final List<WindowedValue<InputT>> pushedBackAgain = new ArrayList<>();
+    long pushedBackAgainWatermark = Long.MAX_VALUE;
+    for (final WindowedValue<InputT> curPushedBack : curPushedBacks) {
+      checkAndInvokeBundle();
+      final Iterable<WindowedValue<InputT>> pushedBack =
+        getPushBackRunner().processElementInReadyWindows(curPushedBack);
+      checkAndFinishBundle();
+      for (final WindowedValue<InputT> wv : pushedBack) {
+        pushedBackAgainWatermark = Math.min(pushedBackAgainWatermark, 
wv.getTimestamp().getMillis());
+        pushedBackAgain.add(wv);
+      }
+    }
+    curPushedBacks = pushedBackAgain;
+    curPushedBackWatermark = pushedBackAgainWatermark;
+  }
+
+  @Override
+  public void onWatermark(final Watermark watermark) {
+    // TODO #298: Consider Processing DoFn PushBacks on Watermark
+    checkAndInvokeBundle();
+    curInputWatermark = watermark.getTimestamp();
+    
getSideInputReader().setCurrentWatermarkOfAllMainAndSideInputs(curInputWatermark);
+
+    final long outputWatermarkCandidate = Math.min(curInputWatermark, 
curPushedBackWatermark);
+    if (outputWatermarkCandidate > curOutputWatermark) {
+      // Watermark advances!
+      getOutputCollector().emitWatermark(new 
Watermark(outputWatermarkCandidate));
+      curOutputWatermark = outputWatermarkCandidate;
+    }
+    checkAndFinishBundle();
+  }
+
+  @Override
+  protected void beforeClose() {
+    // This makes all unavailable side inputs as available empty side inputs.
+    onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+    // All push-backs should be processed here.
+    handlePushBacks();
+  }
+
+  @Override
+  OutputCollector wrapOutputCollector(final OutputCollector oc) {
+    return oc;
+  }
+}
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java
new file mode 100644
index 000000000..b1536e677
--- /dev/null
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+
+/**
+ * Side input transform implementation.
+ * TODO #297: Consider Removing SideInputTransform
+ * @param <T> input/output type.
+ */
+public final class SideInputTransform<T> implements 
Transform<WindowedValue<T>, WindowedValue<SideInputElement<T>>> {
+  private OutputCollector<WindowedValue<SideInputElement<T>>> outputCollector;
+  private final int index;
+
+  /**
+   * Constructor.
+   */
+  public SideInputTransform(final int index) {
+    this.index = index;
+  }
+
+  @Override
+  public void prepare(final Context context, final 
OutputCollector<WindowedValue<SideInputElement<T>>> oc) {
+    this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final WindowedValue<T> element) {
+    outputCollector.emit(element.withValue(new SideInputElement<>(index, 
element.getValue())));
+  }
+
+  @Override
+  public void onWatermark(final Watermark watermark) {
+    outputCollector.emitWatermark(watermark);
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("SideInputTransform:");
+    sb.append("(index-");
+    sb.append(String.valueOf(index));
+    sb.append(")");
+    sb.append(super.toString());
+    return sb.toString();
+  }
+}
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
index a4346182e..f8f5c9fb6 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.compiler.frontend.beam.transform;
 
 import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -40,14 +41,16 @@
 public final class WindowFnTransform<T, W extends BoundedWindow>
   implements Transform<WindowedValue<T>, WindowedValue<T>> {
   private final WindowFn windowFn;
+  private final DisplayData displayData;
   private OutputCollector<WindowedValue<T>> outputCollector;
 
   /**
    * Default Constructor.
    * @param windowFn windowFn for the Transform.
    */
-  public WindowFnTransform(final WindowFn windowFn) {
+  public WindowFnTransform(final WindowFn windowFn, final DisplayData 
displayData) {
     this.windowFn = windowFn;
+    this.displayData = displayData;
   }
 
   @Override
@@ -101,7 +104,7 @@ public void close() {
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("WindowFnTransform:" + windowFn);
+    sb.append("WindowFnTransform / " + displayData.toString().replaceAll(":", 
" / "));
     return sb.toString();
   }
 }
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
index be3f0083c..278c83c8c 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
@@ -41,7 +41,7 @@ public void testALSDAG() throws Exception {
     final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileALSDAG();
 
     assertEquals(producedDAG.getTopologicalSort(), 
producedDAG.getTopologicalSort());
-    assertEquals(38, producedDAG.getVertices().size());
+    assertEquals(44, producedDAG.getVertices().size());
 
 //    producedDAG.getTopologicalSort().forEach(v -> 
System.out.println(v.getId()));
     final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
index 632c30aff..c6f100d4e 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
@@ -41,7 +41,7 @@ public void testMLRDAG() throws Exception {
     final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileMLRDAG();
 
     assertEquals(producedDAG.getTopologicalSort(), 
producedDAG.getTopologicalSort());
-    assertEquals(36, producedDAG.getVertices().size());
+    assertEquals(39, producedDAG.getVertices().size());
 
     final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX).size());
@@ -51,6 +51,6 @@ public void testMLRDAG() throws Exception {
     final IRVertex vertexY = producedDAG.getTopologicalSort().get(13);
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY).size());
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY.getId()).size());
-    assertEquals(2, producedDAG.getOutgoingEdgesOf(vertexY).size());
+    assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexY).size());
   }
 }
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
index 762e327fd..702ca9d6e 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
@@ -21,6 +21,7 @@
 import org.apache.beam.sdk.transforms.Materialization;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -54,6 +55,7 @@
   public void test() {
 
     final FixedWindows fixedwindows = 
FixedWindows.of(Duration.standardSeconds(1));
+
     final CreateViewTransform<String, Integer> viewTransform =
       new CreateViewTransform(new SumViewFn());
 
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index bad758485..fa1169c3f 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -20,14 +20,14 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -36,17 +36,17 @@
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
 import org.apache.reef.io.Tuple;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.*;
 
-import static java.util.Collections.emptyList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public final class DoFnTransformTest {
 
@@ -78,8 +78,8 @@ public void testSingleOutput() {
         outputTag,
         Collections.emptyList(),
         WindowingStrategy.globalDefault(),
-        emptyList(), /* side inputs */
-        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        DisplayData.none());
 
     final Transform.Context context = mock(Transform.Context.class);
     final OutputCollector<WindowedValue<String>> oc = new 
TestOutputCollector<>();
@@ -112,8 +112,8 @@ public void testCountBundle() {
         outputTag,
         Collections.emptyList(),
         WindowingStrategy.globalDefault(),
-        emptyList(), /* side inputs */
-        pipelineOptions);
+        pipelineOptions,
+        DisplayData.none());
 
     final Transform.Context context = mock(Transform.Context.class);
     final OutputCollector<WindowedValue<String>> oc = new 
TestOutputCollector<>();
@@ -156,8 +156,8 @@ public void testTimeBundle() {
         outputTag,
         Collections.emptyList(),
         WindowingStrategy.globalDefault(),
-        emptyList(), /* side inputs */
-        pipelineOptions);
+        pipelineOptions,
+        DisplayData.none());
 
     final Transform.Context context = mock(Transform.Context.class);
     final OutputCollector<WindowedValue<String>> oc = new 
TestOutputCollector<>();
@@ -208,8 +208,8 @@ public void testMultiOutputOutput() {
         mainOutput,
         tags,
         WindowingStrategy.globalDefault(),
-        emptyList(), /* side inputs */
-        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        DisplayData.none());
 
     // mock context
     final Transform.Context context = mock(Transform.Context.class);
@@ -244,48 +244,70 @@ public void testMultiOutputOutput() {
     doFnTransform.close();
   }
 
-  // TODO #216: implement side input and windowing
   @Test
   public void testSideInputs() {
     // mock context
     final Transform.Context context = mock(Transform.Context.class);
-    when(context.getBroadcastVariable(view1)).thenReturn(
-      WindowedValue.valueInGlobalWindow(ImmutableList.of("1")));
-    when(context.getBroadcastVariable(view2)).thenReturn(
-      WindowedValue.valueInGlobalWindow(ImmutableList.of("2")));
-
     TupleTag<Tuple<String, Iterable<String>>> outputTag = new 
TupleTag<>("main-output");
 
-    WindowedValue<String> first = WindowedValue.valueInGlobalWindow("first");
-    WindowedValue<String> second = WindowedValue.valueInGlobalWindow("second");
+    WindowedValue<String> firstElement = 
WindowedValue.valueInGlobalWindow("first");
+    WindowedValue<String> secondElement = 
WindowedValue.valueInGlobalWindow("second");
 
-    final Map<String, PCollectionView<Iterable<String>>> eventAndViewMap =
-      ImmutableMap.of(first.getValue(), view1, second.getValue(), view2);
+    SideInputElement firstSideinput = new SideInputElement<>(0, 
ImmutableList.of("1"));
+    SideInputElement secondSideinput = new SideInputElement(1, 
ImmutableList.of("2"));
 
-    final DoFnTransform<String, Tuple<String, Iterable<String>>> doFnTransform 
=
-      new DoFnTransform<>(
-        new SimpleSideInputDoFn<>(eventAndViewMap),
+    final Map<Integer, PCollectionView<?>> sideInputMap = new HashMap<>();
+    sideInputMap.put(firstSideinput.getSideInputIndex(), view1);
+    sideInputMap.put(secondSideinput.getSideInputIndex(), view2);
+    final PushBackDoFnTransform<String, String> doFnTransform =
+      new PushBackDoFnTransform(
+        new SimpleSideInputDoFn<String>(view1, view2),
         NULL_INPUT_CODER,
         NULL_OUTPUT_CODERS,
         outputTag,
         Collections.emptyList(),
         WindowingStrategy.globalDefault(),
-        ImmutableList.of(view1, view2), /* side inputs */
-        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+        sideInputMap, /* side inputs */
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        DisplayData.none());
 
-    final OutputCollector<WindowedValue<Tuple<String, Iterable<String>>>> oc = 
new TestOutputCollector<>();
+    final TestOutputCollector<String> oc = new TestOutputCollector<>();
     doFnTransform.prepare(context, oc);
 
-    doFnTransform.onData(first);
-    doFnTransform.onData(second);
-
-    assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("first", 
ImmutableList.of("1"))),
-      ((TestOutputCollector<Tuple<String,Iterable<String>>>) 
oc).getOutput().get(0));
-
-    assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("second", 
ImmutableList.of("2"))),
-      ((TestOutputCollector<Tuple<String,Iterable<String>>>) 
oc).getOutput().get(1));
-
+    // Main input first, Side inputs later
+    doFnTransform.onData(firstElement);
+
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow(firstSideinput));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow(secondSideinput));
+    assertEquals(
+      WindowedValue.valueInGlobalWindow(
+        concat(firstElement.getValue(), firstSideinput.getSideInputValue(), 
secondSideinput.getSideInputValue())),
+      oc.getOutput().get(0));
+
+    // Side inputs first, Main input later
+    doFnTransform.onData(secondElement);
+    assertEquals(
+      WindowedValue.valueInGlobalWindow(
+        concat(secondElement.getValue(), firstSideinput.getSideInputValue(), 
secondSideinput.getSideInputValue())),
+      oc.getOutput().get(1));
+
+    // There should be only 2 final outputs
+    assertEquals(2, oc.getOutput().size());
+
+    // The side inputs should be "READY"
+    assertTrue(doFnTransform.getSideInputReader().isReady(view1, 
GlobalWindow.INSTANCE));
+    assertTrue(doFnTransform.getSideInputReader().isReady(view2, 
GlobalWindow.INSTANCE));
+
+    // This watermark should remove the side inputs. (Now should be "NOT 
READY")
+    doFnTransform.onWatermark(new 
Watermark(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+    Iterable materializedSideInput1 = 
doFnTransform.getSideInputReader().get(view1, GlobalWindow.INSTANCE);
+    Iterable materializedSideInput2 = 
doFnTransform.getSideInputReader().get(view2, GlobalWindow.INSTANCE);
+    assertFalse(materializedSideInput1.iterator().hasNext());
+    assertFalse(materializedSideInput2.iterator().hasNext());
+
+    // There should be only 2 final outputs
     doFnTransform.close();
+    assertEquals(2, oc.getOutput().size());
   }
 
 
@@ -334,21 +356,30 @@ public void processElement(final ProcessContext c) throws 
Exception {
    * Side input do fn.
    * @param <T> type
    */
-  private static class SimpleSideInputDoFn<T, V> extends DoFn<T, Tuple<T, V>> {
-    private final Map<T, PCollectionView<V>> idAndViewMap;
-
-    public SimpleSideInputDoFn(final Map<T, PCollectionView<V>> idAndViewMap) {
-      this.idAndViewMap = idAndViewMap;
+  private static class SimpleSideInputDoFn<T> extends DoFn<T, String> {
+    private final PCollectionView<?> view1;
+    private final PCollectionView<?> view2;
+
+    public SimpleSideInputDoFn(final PCollectionView<?> view1,
+                               final PCollectionView<?> view2) {
+      this.view1 = view1;
+      this.view2 = view2;
     }
 
     @ProcessElement
     public void processElement(final ProcessContext c) throws Exception {
-      final PCollectionView<V> view = idAndViewMap.get(c.element());
-      final V sideInput = c.sideInput(view);
-      c.output(new Tuple<>(c.element(), sideInput));
+      final T element = c.element();
+      final Object view1Value = c.sideInput(view1);
+      final Object view2Value = c.sideInput(view2);
+
+      c.output(concat(element, view1Value, view2Value));
     }
   }
 
+  private static String concat(final Object obj1, final Object obj2, final 
Object obj3) {
+    return obj1.toString() + " / " + obj2 + " / " + obj3;
+  }
+
 
   /**
    * Multi output do fn.
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index f9a44ec96..474c79c0d 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -21,6 +21,7 @@
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.*;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -89,11 +90,10 @@ public void test() {
       new GroupByKeyAndWindowDoFnTransform(
         NULL_OUTPUT_CODERS,
         outputTag,
-        Collections.emptyList(), /* additional outputs */
         WindowingStrategy.of(slidingWindows),
-        emptyList(), /* side inputs */
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
-        SystemReduceFn.buffering(NULL_INPUT_CODER));
+        SystemReduceFn.buffering(NULL_INPUT_CODER),
+        DisplayData.none());
 
     final Instant ts1 = new Instant(1);
     final Instant ts2 = new Instant(100);
@@ -167,10 +167,17 @@ public void test() {
     doFnTransform.onData(WindowedValue.of(
       KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), 
PaneInfo.NO_FIRING));
 
-    // do not emit anything
+
     doFnTransform.onWatermark(watermark2);
-    assertEquals(0, oc.outputs.size());
-    assertEquals(0, oc.watermarks.size());
+
+    assertEquals(0, oc.outputs.size()); // do not emit anything
+   assertEquals(1, oc.watermarks.size());
+
+    // check output watermark
+    assertEquals(1400,
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.watermarks.clear();
 
     doFnTransform.onData(WindowedValue.of(
       KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), 
PaneInfo.NO_FIRING));
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index bdc2a859d..e2f2c0ab2 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -63,7 +63,7 @@ public void setUp() throws Exception {
     final LoopVertex alsLoop = alsLoopOpt.get();
 
     final IRVertex vertex7 = groupedDAG.getTopologicalSort().get(3);
-    final IRVertex vertex15 = alsLoop.getDAG().getTopologicalSort().get(4);
+    final IRVertex vertex15 = alsLoop.getDAG().getTopologicalSort().get(5);
 
     final Set<IREdge> oldDAGIncomingEdges = 
alsLoop.getDagIncomingEdges().get(vertex15);
     final List<IREdge> newDAGIncomingEdge = 
groupedDAG.getIncomingEdgesOf(vertex7);
diff --git 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index 84da0ddde..76bdc03ae 100644
--- 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -21,8 +21,6 @@
 import com.github.fommil.netlib.BLAS;
 import com.github.fommil.netlib.LAPACK;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
-import org.apache.nemo.compiler.frontend.beam.coder.FloatArrayCoder;
-import org.apache.nemo.compiler.frontend.beam.coder.IntArrayCoder;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderProviders;
diff --git 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
index e43101973..ab1760fb5 100644
--- 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
+++ 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
@@ -18,8 +18,6 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.nemo.compiler.frontend.beam.coder.FloatArrayCoder;
-import org.apache.nemo.compiler.frontend.beam.coder.IntArrayCoder;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java
similarity index 97%
rename from 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
rename to 
examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java
index dff48eed8..104e99449 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
+++ 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.beam.coder;
+package org.apache.nemo.examples.beam;
 
 import org.apache.beam.sdk.coders.AtomicCoder;
 
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
 b/examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java
similarity index 97%
rename from 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
rename to 
examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java
index ac9205b4c..f7201378c 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
+++ 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.beam.coder;
+package org.apache.nemo.examples.beam;
 
 import org.apache.beam.sdk.coders.AtomicCoder;
 
diff --git 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
new file mode 100644
index 000000000..30ee405cd
--- /dev/null
+++ 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.List;
+
+/**
+ * A Windowed WordCount application.
+ */
+public final class WindowedBroadcast {
+  /**
+   * Private Constructor.
+   */
+  private WindowedBroadcast() {
+  }
+
+  private static PCollection<Long> getSource(final Pipeline p) {
+    return p.apply(GenerateSequence
+      .from(1)
+      .withRate(2, Duration.standardSeconds(1))
+      .withTimestampFn(num -> new Instant(num * 500))); // 0.5 second between 
subsequent elements
+  }
+  /**
+   * Main function for the MR BEAM program.
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String outputFilePath = args[0];
+
+    final Window<Long> windowFn = Window
+      .<Long>into(SlidingWindows.of(Duration.standardSeconds(2))
+      .every(Duration.standardSeconds(1)));
+
+    final PipelineOptions options = 
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("WindowedBroadcast");
+
+    final Pipeline p = Pipeline.create(options);
+
+    final PCollection<Long> windowedElements = getSource(p).apply(windowFn);
+    final PCollectionView<List<Long>> windowedView = 
windowedElements.apply(View.asList());
+
+    windowedElements.apply(ParDo.of(new DoFn<Long, String>() {
+        @ProcessElement
+        public void processElement(final ProcessContext c) {
+          final Long anElementInTheWindow = c.element();
+          final List<Long> allElementsInTheWindow = c.sideInput(windowedView);
+          System.out.println(anElementInTheWindow + " / " + 
allElementsInTheWindow);
+          if (!allElementsInTheWindow.contains(anElementInTheWindow)) {
+            throw new RuntimeException(anElementInTheWindow + " not in " + 
allElementsInTheWindow.toString());
+          } else {
+            c.output(anElementInTheWindow + " is in " + 
allElementsInTheWindow);
+          }
+        }
+      }).withSideInputs(windowedView)
+    ).apply(new WriteOneFilePerWindow(outputFilePath, 1));
+
+    p.run();
+  }
+}
diff --git 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index d7f8c85c8..0f13dc465 100644
--- 
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ 
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -78,7 +78,7 @@ public void processElement(@Element final String elem,
       return p.apply(GenerateSequence
         .from(1)
         .withRate(2, Duration.standardSeconds(1))
-        .withTimestampFn(num -> new Instant(num * 500)))
+        .withTimestampFn(num -> new Instant(num * 500))) // 0.5 second between 
subsequent elements
         .apply(MapElements.via(new SimpleFunction<Long, KV<String, Long>>() {
           @Override
           public KV<String, Long> apply(final Long val) {
diff --git 
a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java
 
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java
new file mode 100644
index 000000000..5e2fba38f
--- /dev/null
+++ 
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.examples.beam.policy.StreamingPolicyParallelismFive;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test Windowed word count program with JobLauncher.
+ * TODO #291: ITCase for Empty PCollectionViews
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class WindowedBroadcastITCase {
+
+  private static final int TIMEOUT = 120000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + 
"/../resources/";
+
+  private static final String outputFileName = 
"test_output_windowed_broadcast";
+  private static final String expectedOutputFileName = 
"expected_output_windowed_broadcast";
+  private static final String expectedSlidingWindowOutputFileName = 
"expected_output_sliding_windowed_broadcast";
+  private static final String executorResourceFileName = fileBasePath + 
"beam_test_executor_resources.json";
+  private static final String outputFilePath =  fileBasePath + outputFileName;
+
+  // TODO #271: We currently disable this test because we cannot force close 
Nemo
+  // @Test (timeout = TIMEOUT)
+  public void testUnboundedSlidingWindow() throws Exception {
+    builder = new ArgBuilder()
+      
.addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
+      .addUserMain(WindowedBroadcast.class.getCanonicalName())
+      .addUserArgs(outputFilePath);
+
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(WindowedBroadcastITCase.class.getSimpleName())
+      
.addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
+      .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, 
expectedSlidingWindowOutputFileName);
+    } finally {
+    }
+  }
+}
diff --git 
a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
 
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index c0134aa33..7ef0e0281 100644
--- 
a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ 
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -33,6 +33,7 @@
 
 /**
  * Test Windowed word count program with JobLauncher.
+ * TODO #299: WindowedWordCountITCase Hangs (Heisenbug)
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
@@ -58,7 +59,7 @@ public void testBatchFixedWindow() throws Exception {
 
     JobLauncher.main(builder
         .addResourceJson(executorResourceFileName)
-        .addJobId(WindowedWordCountITCase.class.getSimpleName())
+        .addJobId(WindowedWordCountITCase.class.getSimpleName() + 
"testBatchFixedWindow")
         
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .build());
 
@@ -78,7 +79,7 @@ public void testBatchSlidingWindow() throws Exception {
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
-      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addJobId(WindowedWordCountITCase.class.getSimpleName() + 
"testBatchSlidingWindow")
       .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
       .build());
 
@@ -98,7 +99,7 @@ public void testStreamingSchedulerAndPipeFixedWindow() throws 
Exception {
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
-      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addJobId(WindowedWordCountITCase.class.getSimpleName() + 
"testStreamingSchedulerAndPipeFixedWindow")
       
.addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
       .build());
 
@@ -119,7 +120,7 @@ public void testStreamingSchedulerAndPipeSlidingWindow() 
throws Exception {
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
-      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addJobId(WindowedWordCountITCase.class.getSimpleName() + 
"testStreamingSchedulerAndPipeSlidingWindow")
       
.addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
       .build());
 
@@ -132,7 +133,7 @@ public void testStreamingSchedulerAndPipeSlidingWindow() 
throws Exception {
 
 
   // TODO #271: We currently disable this test because we cannot force close 
Nemo
-  //@Test (timeout = TIMEOUT)
+  // @Test (timeout = TIMEOUT)
   public void testUnboundedSlidingWindow() throws Exception {
     builder = new ArgBuilder()
       
.addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index f7aa184a8..f972ce067 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -120,7 +120,7 @@ private synchronized void onTaskReceived(final Task task) {
    * @param task to launch.
    */
   private void launchTask(final Task task) {
-    LOG.info("Launch task: {}", task);
+    LOG.info("Launch task: {}", task.getTaskId());
     try {
       final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
           SerializationUtils.deserialize(task.getSerializedIRDag());
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
index 17a62c5cd..42806b7de 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
@@ -27,7 +27,6 @@
 import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
 import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import org.apache.nemo.runtime.executor.datatransfer.InputReader;
 import net.jcip.annotations.ThreadSafe;
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.reef.tang.annotations.Parameter;
@@ -36,9 +35,7 @@
 
 import javax.inject.Inject;
 import java.io.Serializable;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -50,7 +47,6 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(BroadcastManagerWorker.class.getName());
   private static BroadcastManagerWorker staticReference;
 
-  private final ConcurrentHashMap<Serializable, InputReader> idToReader;
   private final LoadingCache<Serializable, Object> idToVariableCache;
 
   /**
@@ -63,65 +59,33 @@
    */
   @Inject
   private BroadcastManagerWorker(@Parameter(JobConf.ExecutorId.class) final 
String executorId,
-                                final PersistentConnectionToMasterMap 
toMaster) {
+                                 final PersistentConnectionToMasterMap 
toMaster) {
     staticReference = this;
-    this.idToReader = new ConcurrentHashMap<>();
     this.idToVariableCache = CacheBuilder.newBuilder()
       .maximumSize(100)
       .expireAfterWrite(10, TimeUnit.MINUTES)
       .build(
         new CacheLoader<Serializable, Object>() {
           public Object load(final Serializable id) throws Exception {
-            LOG.info("Start to load broadcast {}", id.toString());
-            if (idToReader.containsKey(id)) {
-              // Get from reader
-              final InputReader inputReader = idToReader.get(id);
-              final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> 
iterators = inputReader.read();
-              if (iterators.size() != 1) {
-                throw new IllegalStateException(id.toString());
-              }
-              final DataUtil.IteratorWithNumBytes iterator = 
iterators.get(0).get();
-              if (!iterator.hasNext()) {
-                throw new IllegalStateException(id.toString() + " (no element) 
" + iterator.toString());
-              }
-              final Object result = iterator.next();
-              if (iterator.hasNext()) {
-                throw new IllegalStateException(id.toString() + " (more than 
single element) " + iterator.toString());
-              }
-              return result;
-            } else {
-              // Get from master
-              final CompletableFuture<ControlMessage.Message> 
responseFromMasterFuture = toMaster
-                
.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).request(
-                  ControlMessage.Message.newBuilder()
-                    .setId(RuntimeIdManager.generateMessageId())
-                    
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
-                    
.setType(ControlMessage.MessageType.RequestBroadcastVariable)
-                    .setRequestbroadcastVariableMsg(
-                      
ControlMessage.RequestBroadcastVariableMessage.newBuilder()
-                        .setExecutorId(executorId)
-                        
.setBroadcastId(ByteString.copyFrom(SerializationUtils.serialize(id)))
-                        .build())
-                    .build());
-              return SerializationUtils.deserialize(
-                
responseFromMasterFuture.get().getBroadcastVariableMsg().getVariable().toByteArray());
-            }
+            // Get from master
+            final CompletableFuture<ControlMessage.Message> 
responseFromMasterFuture = toMaster
+              
.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).request(
+                ControlMessage.Message.newBuilder()
+                  .setId(RuntimeIdManager.generateMessageId())
+                  
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+                  .setType(ControlMessage.MessageType.RequestBroadcastVariable)
+                  .setRequestbroadcastVariableMsg(
+                    ControlMessage.RequestBroadcastVariableMessage.newBuilder()
+                      .setExecutorId(executorId)
+                      
.setBroadcastId(ByteString.copyFrom(SerializationUtils.serialize(id)))
+                      .build())
+                  .build());
+            return SerializationUtils.deserialize(
+              
responseFromMasterFuture.get().getBroadcastVariableMsg().getVariable().toByteArray());
           }
         });
   }
 
-  /**
-   * When the broadcast variable can be read by an input reader.
-   * (i.e., the variable is expressed as an IREdge, and reside in a executor 
as a block)
-   *
-   * @param id of the broadcast variable.
-   * @param inputReader the {@link InputReader} to register.
-   */
-  public void registerInputReader(final Serializable id,
-                                  final InputReader inputReader) {
-    this.idToReader.put(id, inputReader);
-  }
-
   /**
    * Get the variable with the id.
    * @param id of the variable.
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
index ed6073ef6..43d11cfc4 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -33,6 +33,7 @@
    * Constructor.
    */
   public DedicatedKeyPerElementPartitioner() {
+    // TODO #288: DedicatedKeyPerElementPartitioner should not always return 0
     key = 0;
   }
 
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
index 613eccc06..0402fd404 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
@@ -62,7 +62,6 @@ private int findNextMinWatermarkIndex() {
 
   @Override
   public void trackAndEmitWatermarks(final int edgeIndex, final Watermark 
watermark) {
-
     if (LOG.isDebugEnabled()) {
       LOG.debug("Track watermark {} emitted from edge {}:, {}", 
watermark.getTimestamp(), edgeIndex,
         watermarks.toString());
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 12d993261..74c3ffbe0 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -105,7 +105,6 @@ public void emit(final O output) {
 
   @Override
   public void emitWatermark(final Watermark watermark) {
-
     if (LOG.isDebugEnabled()) {
       LOG.debug("{} emits watermark {}", irVertex.getId(), watermark);
     }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index dd7039416..03d7470f0 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -78,7 +78,6 @@
 
   private void writeData(final Object element, final List<ByteOutputContext> 
pipeList) {
     pipeList.forEach(pipe -> {
-
       try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = 
pipe.newOutputStream()) {
         // Serialize (Do not compress)
         final DirectByteArrayOutputStream bytesOutputStream = new 
DirectByteArrayOutputStream();
@@ -149,10 +148,14 @@ private void doInitialize() {
   }
 
   private List<ByteOutputContext> getPipeToWrite(final Object element) {
-    return runtimeEdge.getPropertyValue(CommunicationPatternProperty.class)
-      .get()
-      .equals(CommunicationPatternProperty.Value.OneToOne)
-      ? Collections.singletonList(pipes.get(0))
-      : Collections.singletonList(pipes.get((int) 
partitioner.partition(element)));
+    final CommunicationPatternProperty.Value comm =
+      (CommunicationPatternProperty.Value) 
runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get();
+    if (comm.equals(CommunicationPatternProperty.Value.OneToOne)) {
+      return Collections.singletonList(pipes.get(0));
+    } else if (comm.equals(CommunicationPatternProperty.Value.BroadCast)) {
+      return pipes;
+    } else {
+      return Collections.singletonList(pipes.get((int) 
partitioner.partition(element)));
+    }
   }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
index e8135f9f0..e3562ed39 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
@@ -20,12 +20,15 @@
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * This is a special implementation for single input data stream for 
optimization.
  */
 public final class SingleInputWatermarkManager implements 
InputWatermarkManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SingleInputWatermarkManager.class.getName());
 
   private final OutputCollector watermarkCollector;
 
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
index 2ca3df86c..215a6a85f 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
@@ -31,6 +31,10 @@
   private final IRVertex dataSource;
   private final OutputCollector outputCollector;
 
+  /**
+   * @param dataSource to fetch from.
+   * @param outputCollector for the data fetched.
+   */
   DataFetcher(final IRVertex dataSource,
               final OutputCollector outputCollector) {
     this.dataSource = dataSource;
@@ -48,4 +52,8 @@
   OutputCollector getOutputCollector() {
     return outputCollector;
   }
+
+  IRVertex getDataSource() {
+    return dataSource;
+  }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
index 001060c7a..7ce1ed91f 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -63,6 +63,7 @@
   // A watermark manager
   private InputWatermarkManager inputWatermarkManager;
 
+
   MultiThreadParentTaskDataFetcher(final IRVertex dataSource,
                                    final InputReader readerForParentTask,
                                    final OutputCollector outputCollector) {
@@ -113,8 +114,6 @@ private void fetchDataLazily() {
           // Consume this iterator to the end.
           while (iterator.hasNext()) { // blocked on the iterator.
             final Object element = iterator.next();
-
-
             if (element instanceof WatermarkWithIndex) {
               // watermark element
               // the input watermark manager is accessed by multiple threads
@@ -177,17 +176,14 @@ public void close() throws Exception {
    * It receives the watermark from InputWatermarkManager.
    */
   private final class WatermarkCollector implements OutputCollector {
-
     @Override
     public void emit(final Object output) {
       throw new IllegalStateException("Should not be called");
     }
-
     @Override
     public void emitWatermark(final Watermark watermark) {
       elementQueue.offer(watermark);
     }
-
     @Override
     public void emit(final String dstVertexId, final Object output) {
       throw new IllegalStateException("Should not be called");
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 80bbea241..d3c223f66 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -50,7 +50,8 @@
   private long serBytes = 0;
   private long encodedBytes = 0;
 
-  ParentTaskDataFetcher(final IRVertex dataSource, final InputReader 
readerForParentTask,
+  ParentTaskDataFetcher(final IRVertex dataSource,
+                        final InputReader readerForParentTask,
                         final OutputCollector outputCollector) {
     super(dataSource, outputCollector);
     this.readersForParentTask = readerForParentTask;
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 518bff36f..b08b12a25 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -25,12 +25,11 @@
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import 
org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
-import 
org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
 import org.apache.nemo.common.ir.vertex.*;
 import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
-import 
org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
 import org.apache.nemo.common.punctuation.Watermark;
+import 
org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
 import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -71,7 +70,7 @@
   private boolean isExecuted;
   private final String taskId;
   private final TaskStateManager taskStateManager;
-  private final List<DataFetcher> nonBroadcastDataFetchers;
+  private final List<DataFetcher> dataFetchers;
   private final BroadcastManagerWorker broadcastManagerWorker;
   private final List<VertexHarness> sortedHarnesses;
 
@@ -121,7 +120,7 @@ public TaskExecutor(final Task task,
 
     // Prepare data structures
     final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, 
irVertexDag, intermediateDataIOFactory);
-    this.nonBroadcastDataFetchers = pair.left();
+    this.dataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
   }
 
@@ -188,7 +187,6 @@ public TaskExecutor(final Task task,
     // in {@link this#getInternalMainOutputs and this#internalMainOutputs}
     final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap = 
new HashMap<>();
     reverseTopologicallySorted.forEach(childVertex -> {
-
       if (childVertex instanceof OperatorVertex) {
         final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, 
childVertex);
         if (edges.size() == 1) {
@@ -201,11 +199,10 @@ public TaskExecutor(final Task task,
               new OperatorWatermarkCollector((OperatorVertex) childVertex)));
         }
       }
-
     });
 
     // Create a harness for each vertex
-    final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
+    final List<DataFetcher> dataFetcherList = new ArrayList<>();
     final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
 
     reverseTopologicallySorted.forEach(irVertex -> {
@@ -250,38 +247,17 @@ public TaskExecutor(final Task task,
       // Source read
       if (irVertex instanceof SourceVertex) {
         // Source vertex read
-        nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(
-          (SourceVertex) irVertex, sourceReader.get(), outputCollector));
+        dataFetcherList.add(new SourceVertexDataFetcher(
+          (SourceVertex) irVertex,
+          sourceReader.get(),
+          outputCollector));
       }
 
-      // Parent-task read (broadcasts)
-      final List<StageEdge> inEdgesForThisVertex = task.getTaskIncomingEdges()
-        .stream()
-        .filter(inEdge -> 
inEdge.getDstIRVertex().getId().equals(irVertex.getId()))
-        .collect(Collectors.toList());
-      final List<StageEdge> broadcastInEdges = inEdgesForThisVertex
-        .stream()
-        .filter(stageEdge -> 
stageEdge.getPropertyValue(BroadcastVariableIdProperty.class).isPresent())
-        .collect(Collectors.toList());
-      final List<InputReader> broadcastReaders =
-        getParentTaskReaders(taskIndex, broadcastInEdges, 
intermediateDataIOFactory);
-      if (broadcastInEdges.size() != broadcastReaders.size()) {
-        throw new IllegalStateException(broadcastInEdges.toString() + ", " + 
broadcastReaders.toString());
-      }
-      for (int i = 0; i < broadcastInEdges.size(); i++) {
-        final StageEdge inEdge = broadcastInEdges.get(i);
-        broadcastManagerWorker.registerInputReader(
-          inEdge.getPropertyValue(BroadcastVariableIdProperty.class)
-            .orElseThrow(() -> new IllegalStateException(inEdge.toString())),
-          broadcastReaders.get(i));
-      }
-
-      // Parent-task read (non-broadcasts)
-      final List<StageEdge> nonBroadcastInEdges = new 
ArrayList<>(inEdgesForThisVertex);
-      nonBroadcastInEdges.removeAll(broadcastInEdges);
-
-      nonBroadcastInEdges
+      // Parent-task read
+      // TODO #285: Cache broadcasted data
+      task.getTaskIncomingEdges()
         .stream()
+        .filter(inEdge -> 
inEdge.getDstIRVertex().getId().equals(irVertex.getId())) // edge to this vertex
         .map(incomingEdge ->
           Pair.of(incomingEdge, intermediateDataIOFactory
             .createReader(taskIndex, incomingEdge.getSrcIRVertex(), 
incomingEdge)))
@@ -291,14 +267,21 @@ public TaskExecutor(final Task task,
             final int edgeIndex = edgeIndexMap.get(edge);
             final InputWatermarkManager watermarkManager = 
operatorWatermarkManagerMap.get(irVertex);
             final InputReader parentTaskReader = pair.right();
+            final OutputCollector dataFetcherOutputCollector =
+              new DataFetcherOutputCollector((OperatorVertex) irVertex, 
edgeIndex, watermarkManager);
+
             if (parentTaskReader instanceof PipeInputReader) {
-              nonBroadcastDataFetcherList.add(
-                new 
MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), 
parentTaskReader,
-                  new DataFetcherOutputCollector((OperatorVertex) irVertex, 
edgeIndex, watermarkManager)));
+              dataFetcherList.add(
+                new MultiThreadParentTaskDataFetcher(
+                  parentTaskReader.getSrcIrVertex(),
+                  parentTaskReader,
+                  dataFetcherOutputCollector));
             } else {
-              nonBroadcastDataFetcherList.add(
-                new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), 
parentTaskReader,
-                  new DataFetcherOutputCollector((OperatorVertex) irVertex, 
edgeIndex, watermarkManager)));
+              dataFetcherList.add(
+                new ParentTaskDataFetcher(
+                  parentTaskReader.getSrcIrVertex(),
+                  parentTaskReader,
+                  dataFetcherOutputCollector));
             }
           }
         });
@@ -309,7 +292,7 @@ public TaskExecutor(final Task task,
       .map(vertex -> vertexIdToHarness.get(vertex.getId()))
       .collect(Collectors.toList());
 
-    return Pair.of(nonBroadcastDataFetcherList, sortedHarnessList);
+    return Pair.of(dataFetcherList, sortedHarnessList);
   }
 
   /**
@@ -319,7 +302,8 @@ private void processElement(final OutputCollector 
outputCollector, final Object
     outputCollector.emit(dataElement);
   }
 
-  private void processWatermark(final OutputCollector outputCollector, final 
Watermark watermark) {
+  private void processWatermark(final OutputCollector outputCollector,
+                                final Watermark watermark) {
     outputCollector.emitWatermark(watermark);
   }
 
@@ -338,7 +322,7 @@ public void execute() {
 
   /**
    * The task is executed in the following two phases.
-   * - Phase 1: Consume task-external input data (non-broadcasts)
+   * - Phase 1: Consume task-external input data
    * - Phase 2: Finalize task-internal states and data elements
    */
   private void doExecute() {
@@ -349,8 +333,8 @@ private void doExecute() {
     LOG.info("{} started", taskId);
     taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, 
Optional.empty(), Optional.empty());
 
-    // Phase 1: Consume task-external input data. (non-broadcasts)
-    if (!handleDataFetchers(nonBroadcastDataFetchers)) {
+    // Phase 1: Consume task-external input data.
+    if (!handleDataFetchers(dataFetchers)) {
       return;
     }
 
@@ -383,14 +367,14 @@ private void finalizeVertex(final VertexHarness 
vertexHarness) {
   }
 
   /**
-   * Process an element generated from the dataFetcher.
-   * If the element is an instance of Finishmark, we remove the dataFetcher 
from the current list.
-   * @param element element
+   * Process an event generated from the dataFetcher.
+   * If the event is an instance of Finishmark, we remove the dataFetcher from 
the current list.
+   * @param event event
    * @param dataFetcher current data fetcher
    */
-  private void handleElement(final Object element,
-                             final DataFetcher dataFetcher) {
-    if (element instanceof Finishmark) {
+  private void onEventFromDataFetcher(final Object event,
+                                      final DataFetcher dataFetcher) {
+    if (event instanceof Finishmark) {
       // We've consumed all the data from this data fetcher.
       if (dataFetcher instanceof SourceVertexDataFetcher) {
         boundedSourceReadTime += ((SourceVertexDataFetcher) 
dataFetcher).getBoundedSourceReadTime();
@@ -401,12 +385,12 @@ private void handleElement(final Object element,
         serializedReadBytes += ((MultiThreadParentTaskDataFetcher) 
dataFetcher).getSerializedBytes();
         encodedReadBytes += ((MultiThreadParentTaskDataFetcher) 
dataFetcher).getEncodedBytes();
       }
-    } else if (element instanceof Watermark) {
+    } else if (event instanceof Watermark) {
       // Watermark
-      processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
+      processWatermark(dataFetcher.getOutputCollector(), (Watermark) event);
     } else {
       // Process data element
-      processElement(dataFetcher.getOutputCollector(), element);
+      processElement(dataFetcher.getOutputCollector(), event);
     }
   }
 
@@ -457,7 +441,7 @@ private boolean handleDataFetchers(final List<DataFetcher> 
fetchers) {
         final DataFetcher dataFetcher = availableIterator.next();
         try {
           final Object element = dataFetcher.fetchDataElement();
-          handleElement(element, dataFetcher);
+          onEventFromDataFetcher(element, dataFetcher);
           if (element instanceof Finishmark) {
             availableIterator.remove();
           }
@@ -485,7 +469,7 @@ private boolean handleDataFetchers(final List<DataFetcher> 
fetchers) {
         final DataFetcher dataFetcher = pendingIterator.next();
         try {
           final Object element = dataFetcher.fetchDataElement();
-          handleElement(element, dataFetcher);
+          onEventFromDataFetcher(element, dataFetcher);
 
           // We processed data. This means the data fetcher is now available.
           // Add current data fetcher to available
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 6cbf69401..6b94ca754 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -24,6 +24,7 @@
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.BlockInputReader;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
+import org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
@@ -121,9 +122,9 @@ public void testErrorWhenReadingData() throws Exception {
 
   private ParentTaskDataFetcher createFetcher(final InputReader 
readerForParentTask) {
     return new ParentTaskDataFetcher(
-        mock(IRVertex.class),
-        readerForParentTask, // This is the only argument that affects the 
behavior of ParentTaskDataFetcher
-        mock(OutputCollector.class));
+      mock(IRVertex.class),
+      readerForParentTask, // This is the only argument that affects the 
behavior of ParentTaskDataFetcher
+      mock(OutputCollector.class));
   }
 
   private InputReader generateInputReader(final CompletableFuture 
completableFuture) {
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 6ae716ae2..919143b3c 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -26,7 +26,6 @@
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.edge.IREdge;
 import 
org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
-import 
org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
 import 
org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
@@ -66,7 +65,6 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -422,16 +420,16 @@ public void testTwoOperatorsWithBroadcastVariable() {
         .buildWithoutSourceSinkCheck();
 
     final StageEdge taskOutEdge = mockStageEdgeFrom(operatorIRVertex2);
+    final StageEdge taskInEdge = mockStageEdgeTo(operatorIRVertex1);
 
-    final StageEdge broadcastInEdge = mockBroadcastVariableStageEdgeTo(
-      new OperatorVertex(singleListTransform), operatorIRVertex2, broadcastId, 
elements);
+    when(broadcastManagerWorker.get(broadcastId)).thenReturn(new 
ArrayList<>(elements));
 
     final Task task = new Task(
         "testSourceVertexDataFetching",
         generateTaskId(),
         TASK_EXECUTION_PROPERTY_MAP,
         new byte[0],
-        Arrays.asList(mockStageEdgeTo(operatorIRVertex1), broadcastInEdge),
+        Collections.singletonList(taskInEdge),
         Collections.singletonList(taskOutEdge),
         Collections.emptyMap());
 
@@ -541,23 +539,6 @@ private StageEdge mockStageEdgeTo(final IRVertex irVertex) 
{
       mock(Stage.class));
   }
 
-  private StageEdge mockBroadcastVariableStageEdgeTo(final IRVertex srcVertex,
-                                                     final IRVertex dstVertex,
-                                                     final Serializable 
broadcastVariableId,
-                                                     final Object 
broadcastVariable) {
-    
when(broadcastManagerWorker.get(broadcastVariableId)).thenReturn(broadcastVariable);
-
-    final ExecutionPropertyMap executionPropertyMap =
-      ExecutionPropertyMap.of(mock(IREdge.class), 
CommunicationPatternProperty.Value.OneToOne);
-    
executionPropertyMap.put(BroadcastVariableIdProperty.of(broadcastVariableId));
-    return new StageEdge("runtime outgoing edge id",
-      executionPropertyMap,
-      srcVertex,
-      dstVertex,
-      mock(Stage.class),
-      mock(Stage.class));
-  }
-
   /**
    * Represents the answer return an inter-stage {@link InputReader},
    * which will have multiple iterable according to the source parallelism.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to