This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.30.0 by this push:
     new 0176b9d  [BEAM-12273] Support non-multimap materialization in Twister2 
runner
     new fbd09de  Merge pull request #14725 from kennknowles/twister2-cherrypick
0176b9d is described below

commit 0176b9d558ebff61ac189f100a8b3435a212d19a
Author: Kenneth Knowles <k...@google.com>
AuthorDate: Mon May 3 14:10:23 2021 -0700

    [BEAM-12273] Support non-multimap materialization in Twister2 runner
---
 .../batch/PCollectionViewTranslatorBatch.java      | 52 +++++++++++---
 .../translators/functions/ByteToElemFunction.java  | 79 ++++++++++++++++++++
 .../translators/functions/ElemToBytesFunction.java | 84 ++++++++++++++++++++++
 .../twister2/utils/Twister2SideInputReader.java    | 76 +++++++++++++++-----
 4 files changed, 262 insertions(+), 29 deletions(-)

diff --git 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
index a8c1771..9bc32fc 100644
--- 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
+++ 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
@@ -22,11 +22,14 @@ import java.io.IOException;
 import 
org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.twister2.Twister2BatchTranslationContext;
 import org.apache.beam.runners.twister2.translators.BatchTransformTranslator;
+import 
org.apache.beam.runners.twister2.translators.functions.ByteToElemFunction;
 import 
org.apache.beam.runners.twister2.translators.functions.ByteToWindowFunctionPrimitive;
+import 
org.apache.beam.runners.twister2.translators.functions.ElemToBytesFunction;
 import 
org.apache.beam.runners.twister2.translators.functions.MapToTupleFunction;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -58,23 +61,50 @@ public class PCollectionViewTranslatorBatch<ElemT, ViewT>
                 context.getCurrentTransform();
     org.apache.beam.sdk.values.PCollectionView<ViewT> input;
     PCollection<ElemT> inputPCol = context.getInput(transform);
-    final KvCoder coder = (KvCoder) inputPCol.getCoder();
-    Coder inputKeyCoder = coder.getKeyCoder();
+    final Coder coder = inputPCol.getCoder();
     WindowingStrategy windowingStrategy = inputPCol.getWindowingStrategy();
     WindowFn windowFn = windowingStrategy.getWindowFn();
-    final WindowedValue.WindowedValueCoder wvCoder =
-        WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), 
windowFn.windowCoder());
-    BatchTSet<WindowedValue<ElemT>> inputGathered =
-        inputDataSet
-            .direct()
-            .map(new MapToTupleFunction<>(inputKeyCoder, wvCoder))
-            .allGather()
-            .map(new ByteToWindowFunctionPrimitive(inputKeyCoder, wvCoder));
     try {
       input = CreatePCollectionViewTranslation.getView(application);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    context.setSideInputDataSet(input.getTagInternal().getId(), inputGathered);
+
+    switch (input.getViewFn().getMaterialization().getUrn()) {
+      case Materializations.MULTIMAP_MATERIALIZATION_URN:
+        KvCoder kvCoder = (KvCoder<?, ?>) coder;
+        final Coder keyCoder = kvCoder.getKeyCoder();
+        final WindowedValue.WindowedValueCoder kvwvCoder =
+            WindowedValue.FullWindowedValueCoder.of(
+                kvCoder.getValueCoder(), windowFn.windowCoder());
+        BatchTSet<WindowedValue<ElemT>> multimapMaterialization =
+            inputDataSet
+                .direct()
+                .map(new MapToTupleFunction<>(keyCoder, kvwvCoder))
+                .allGather()
+                .map(new ByteToWindowFunctionPrimitive(keyCoder, kvwvCoder));
+        context.setSideInputDataSet(input.getTagInternal().getId(), 
multimapMaterialization);
+        break;
+      case Materializations.ITERABLE_MATERIALIZATION_URN:
+        final WindowedValue.WindowedValueCoder wvCoder =
+            WindowedValue.FullWindowedValueCoder.of(coder, 
windowFn.windowCoder());
+        BatchTSet<WindowedValue<ElemT>> iterableMaterialization =
+            inputDataSet
+                .direct()
+                .map(new ElemToBytesFunction<>(wvCoder))
+                .allGather()
+                .map(new ByteToElemFunction(wvCoder));
+        try {
+          input = CreatePCollectionViewTranslation.getView(application);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        context.setSideInputDataSet(input.getTagInternal().getId(), 
iterableMaterialization);
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Unknown side input materialization "
+                + input.getViewFn().getMaterialization().getUrn());
+    }
   }
 }
diff --git 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
new file mode 100644
index 0000000..578225f
--- /dev/null
+++ 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.runners.twister2.utils.TranslationUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+
+/** ByteToWindow function. */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ByteToElemFunction<V> implements MapFunc<WindowedValue<V>, 
byte[]> {
+  private transient WindowedValueCoder<V> wvCoder;
+  private static final Logger LOG = 
Logger.getLogger(ByteToElemFunction.class.getName());
+
+  private transient boolean isInitialized = false;
+  private byte[] wvCoderBytes;
+
+  public ByteToElemFunction() {
+    // non arg constructor needed for kryo
+    isInitialized = false;
+  }
+
+  public ByteToElemFunction(final WindowedValueCoder<V> wvCoder) {
+    this.wvCoder = wvCoder;
+
+    wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+  }
+
+  @Override
+  public WindowedValue<V> map(byte[] input) {
+    return TranslationUtils.fromByteArray(input, wvCoder);
+  }
+
+  @Override
+  public void prepare(TSetContext context) {
+    initTransient();
+  }
+
+  /**
+   * Method used to initialize the transient variables that were sent over as 
byte arrays or proto
+   * buffers.
+   */
+  private void initTransient() {
+    if (isInitialized) {
+      return;
+    }
+
+    wvCoder =
+        (WindowedValueCoder<V>)
+            SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Custom 
Coder Bytes");
+    this.isInitialized = true;
+  }
+
+  protected Object readResolve() throws ObjectStreamException {
+    return this;
+  }
+}
diff --git 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
new file mode 100644
index 0000000..c83acdd
--- /dev/null
+++ 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Map to tuple function. */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ElemToBytesFunction<V> implements MapFunc<byte[], 
WindowedValue<V>> {
+
+  private transient WindowedValue.WindowedValueCoder<V> wvCoder;
+  private static final Logger LOG = 
Logger.getLogger(ElemToBytesFunction.class.getName());
+
+  private transient boolean isInitialized = false;
+  private byte[] wvCoderBytes;
+
+  public ElemToBytesFunction() {
+    // non arg constructor needed for kryo
+    this.isInitialized = false;
+  }
+
+  public ElemToBytesFunction(WindowedValue.WindowedValueCoder<V> wvCoder) {
+    this.wvCoder = wvCoder;
+    wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+  }
+
+  @Override
+  public @Nullable byte[] map(WindowedValue<V> input) {
+    try {
+      return CoderUtils.encodeToByteArray(wvCoder, input);
+    } catch (CoderException e) {
+      LOG.info(e.getMessage());
+    }
+    return null;
+  }
+
+  @Override
+  public void prepare(TSetContext context) {
+    initTransient();
+  }
+
+  /**
+   * Method used to initialize the transient variables that were sent over as 
byte arrays or proto
+   * buffers.
+   */
+  private void initTransient() {
+    if (isInitialized) {
+      return;
+    }
+    wvCoder =
+        (WindowedValue.WindowedValueCoder<V>)
+            SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Coder");
+    this.isInitialized = true;
+  }
+
+  protected Object readResolve() throws ObjectStreamException {
+    return this;
+  }
+}
diff --git 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
index bbcd392..6ed77c7 100644
--- 
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
+++ 
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
@@ -23,6 +23,7 @@ import edu.iu.dsc.tws.api.dataset.DataPartition;
 import edu.iu.dsc.tws.api.dataset.DataPartitionConsumer;
 import edu.iu.dsc.tws.api.tset.TSetContext;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,11 +32,11 @@ import 
org.apache.beam.runners.core.InMemoryMultimapSideInputView;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.Materializations.MultimapView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -75,40 +76,79 @@ public class Twister2SideInputReader implements 
SideInputReader {
   }
 
   private <T> T getSideInput(PCollectionView<T> view, BoundedWindow window) {
-    Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements = 
new HashMap<>();
+    switch (view.getViewFn().getMaterialization().getUrn()) {
+      case Materializations.MULTIMAP_MATERIALIZATION_URN:
+        return getMultimapSideInput(view, window);
+      case Materializations.ITERABLE_MATERIALIZATION_URN:
+        return getIterableSideInput(view, window);
+      default:
+        throw new IllegalArgumentException(
+            "Unknown materialization type: " + 
view.getViewFn().getMaterialization().getUrn());
+    }
+  }
+
+  private <T> T getMultimapSideInput(PCollectionView<T> view, BoundedWindow 
window) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = 
getPartitionedElements(view);
+    Map<BoundedWindow, T> resultMap = new HashMap<>();
+
+    ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) 
view.getViewFn();
+    for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements :
+        partitionedElements.entrySet()) {
+
+      Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+      resultMap.put(
+          elements.getKey(),
+          viewFn.apply(
+              InMemoryMultimapSideInputView.fromIterable(
+                  keyCoder,
+                  (Iterable)
+                      elements.getValue().stream()
+                          .map(WindowedValue::getValue)
+                          .collect(Collectors.toList()))));
+    }
+    T result = resultMap.get(window);
+    if (result == null) {
+      result = viewFn.apply(InMemoryMultimapSideInputView.empty());
+    }
+    return result;
+  }
+
+  private Map<BoundedWindow, List<WindowedValue<?>>> getPartitionedElements(
+      PCollectionView<?> view) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = new 
HashMap<>();
     DataPartition<?> sideInput = 
runtimeContext.getInput(view.getTagInternal().getId());
     DataPartitionConsumer<?> dataPartitionConsumer = sideInput.getConsumer();
     while (dataPartitionConsumer.hasNext()) {
-      WindowedValue<KV<?, ?>> winValue = (WindowedValue<KV<?, ?>>) 
dataPartitionConsumer.next();
+      WindowedValue<?> winValue = (WindowedValue<?>) 
dataPartitionConsumer.next();
       for (BoundedWindow tbw : winValue.getWindows()) {
-        List<WindowedValue<KV<?, ?>>> windowedValues =
+        List<WindowedValue<?>> windowedValues =
             partitionedElements.computeIfAbsent(tbw, k -> new ArrayList<>());
         windowedValues.add(winValue);
       }
     }
+    return partitionedElements;
+  }
+
+  private <T> T getIterableSideInput(PCollectionView<T> view, BoundedWindow 
window) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = 
getPartitionedElements(view);
 
+    ViewFn<Materializations.IterableView, T> viewFn =
+        (ViewFn<Materializations.IterableView, T>) view.getViewFn();
     Map<BoundedWindow, T> resultMap = new HashMap<>();
 
-    for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements :
+    for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements :
         partitionedElements.entrySet()) {
-
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) 
view.getViewFn();
-      Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
       resultMap.put(
           elements.getKey(),
-          (T)
-              viewFn.apply(
-                  InMemoryMultimapSideInputView.fromIterable(
-                      keyCoder,
-                      (Iterable)
-                          elements.getValue().stream()
-                              .map(WindowedValue::getValue)
-                              .collect(Collectors.toList()))));
+          viewFn.apply(
+              () ->
+                  elements.getValue().stream()
+                      .map(WindowedValue::getValue)
+                      .collect(Collectors.toList())));
     }
     T result = resultMap.get(window);
     if (result == null) {
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) 
view.getViewFn();
-      result = viewFn.apply(InMemoryMultimapSideInputView.empty());
+      result = viewFn.apply(() -> Collections.<T>emptyList());
     }
     return result;
   }

Reply via email to