taegeonum commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r234427574
########## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/MaterializedViewReader.java ########## @@ -16,48 +16,55 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nemo.compiler.frontend.beam.transform; +package org.apache.nemo.compiler.frontend.beam; -import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.ReadyCheckingSideInputReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.Pair; import javax.annotation.Nullable; -import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** - * A sideinput reader that reads/writes side input values to context. */ -public final class BroadcastVariableSideInputReader implements SideInputReader { +public final class MaterializedViewReader implements ReadyCheckingSideInputReader { + final List<PCollectionView<?>> sideInputs; + final Map<Pair<PCollectionView<?>, BoundedWindow>, Object> materializedViews; - // Nemo context for storing/getting side inputs - private final Transform.Context context; - - // The list of side inputs that we're handling - private final Collection<PCollectionView<?>> sideInputs; - - BroadcastVariableSideInputReader(final Transform.Context context, - final Collection<PCollectionView<?>> sideInputs) { - this.context = context; + public MaterializedViewReader(final List<PCollectionView<?>> sideInputs) { this.sideInputs = sideInputs; + this.materializedViews = new HashMap<>(); + } + + @Override + public boolean isReady(final PCollectionView<?> view, final BoundedWindow window) { + return materializedViews.containsKey(Pair.of(view, window)); } @Nullable @Override public <T> T get(final PCollectionView<T> view, final BoundedWindow window) { - // TODO #216: implement side input and windowing - return ((WindowedValue<T>) context.getBroadcastVariable(view)).getValue(); + return (T) materializedViews.get(Pair.of(view, window)); } @Override - public <T> boolean contains(final PCollectionView<T> view) { + public <T> boolean contains(PCollectionView<T> view) { return sideInputs.contains(view); } @Override public boolean isEmpty() { return sideInputs.isEmpty(); } + + public <T> void addView(final PCollectionView<T> view, Review comment: addMaterializedViewData ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services