taegeonum commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r234427574
 
 

 ##########
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/MaterializedViewReader.java
 ##########
 @@ -16,48 +16,55 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.beam.transform;
+package org.apache.nemo.compiler.frontend.beam;
 
-import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.Pair;
 
 import javax.annotation.Nullable;
-import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
- * A sideinput reader that reads/writes side input values to context.
  */
-public final class BroadcastVariableSideInputReader implements SideInputReader 
{
+public final class MaterializedViewReader implements 
ReadyCheckingSideInputReader {
+  final List<PCollectionView<?>> sideInputs;
+  final Map<Pair<PCollectionView<?>, BoundedWindow>, Object> materializedViews;
 
-  // Nemo context for storing/getting side inputs
-  private final Transform.Context context;
-
-  // The list of side inputs that we're handling
-  private final Collection<PCollectionView<?>> sideInputs;
-
-  BroadcastVariableSideInputReader(final Transform.Context context,
-                                   final Collection<PCollectionView<?>> 
sideInputs) {
-    this.context = context;
+  public MaterializedViewReader(final List<PCollectionView<?>> sideInputs) {
     this.sideInputs = sideInputs;
+    this.materializedViews = new HashMap<>();
+  }
+
+  @Override
+  public boolean isReady(final PCollectionView<?> view, final BoundedWindow 
window) {
+    return materializedViews.containsKey(Pair.of(view, window));
   }
 
   @Nullable
   @Override
   public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
-    // TODO #216: implement side input and windowing
-    return ((WindowedValue<T>) context.getBroadcastVariable(view)).getValue();
+    return (T) materializedViews.get(Pair.of(view, window));
   }
 
   @Override
-  public <T> boolean contains(final PCollectionView<T> view) {
+  public <T> boolean contains(PCollectionView<T> view) {
     return sideInputs.contains(view);
   }
 
   @Override
   public boolean isEmpty() {
     return sideInputs.isEmpty();
   }
+
+  public <T> void addView(final PCollectionView<T> view,
 
 Review comment:
   addMaterializedViewData

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to