This is an automated email from the ASF dual-hosted git repository. heejong pushed a commit to branch release-2.30.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.30.0 by this push: new 0176b9d [BEAM-12273] Support non-multimap materialization in Twister2 runner new fbd09de Merge pull request #14725 from kennknowles/twister2-cherrypick 0176b9d is described below commit 0176b9d558ebff61ac189f100a8b3435a212d19a Author: Kenneth Knowles <k...@google.com> AuthorDate: Mon May 3 14:10:23 2021 -0700 [BEAM-12273] Support non-multimap materialization in Twister2 runner --- .../batch/PCollectionViewTranslatorBatch.java | 52 +++++++++++--- .../translators/functions/ByteToElemFunction.java | 79 ++++++++++++++++++++ .../translators/functions/ElemToBytesFunction.java | 84 ++++++++++++++++++++++ .../twister2/utils/Twister2SideInputReader.java | 76 +++++++++++++++----- 4 files changed, 262 insertions(+), 29 deletions(-) diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java index a8c1771..9bc32fc 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java @@ -22,11 +22,14 @@ import java.io.IOException; import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; import org.apache.beam.runners.twister2.Twister2BatchTranslationContext; import org.apache.beam.runners.twister2.translators.BatchTransformTranslator; +import org.apache.beam.runners.twister2.translators.functions.ByteToElemFunction; import org.apache.beam.runners.twister2.translators.functions.ByteToWindowFunctionPrimitive; +import org.apache.beam.runners.twister2.translators.functions.ElemToBytesFunction; import org.apache.beam.runners.twister2.translators.functions.MapToTupleFunction; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -58,23 +61,50 @@ public class PCollectionViewTranslatorBatch<ElemT, ViewT> context.getCurrentTransform(); org.apache.beam.sdk.values.PCollectionView<ViewT> input; PCollection<ElemT> inputPCol = context.getInput(transform); - final KvCoder coder = (KvCoder) inputPCol.getCoder(); - Coder inputKeyCoder = coder.getKeyCoder(); + final Coder coder = inputPCol.getCoder(); WindowingStrategy windowingStrategy = inputPCol.getWindowingStrategy(); WindowFn windowFn = windowingStrategy.getWindowFn(); - final WindowedValue.WindowedValueCoder wvCoder = - WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder()); - BatchTSet<WindowedValue<ElemT>> inputGathered = - inputDataSet - .direct() - .map(new MapToTupleFunction<>(inputKeyCoder, wvCoder)) - .allGather() - .map(new ByteToWindowFunctionPrimitive(inputKeyCoder, wvCoder)); try { input = CreatePCollectionViewTranslation.getView(application); } catch (IOException e) { throw new RuntimeException(e); } - context.setSideInputDataSet(input.getTagInternal().getId(), inputGathered); + + switch (input.getViewFn().getMaterialization().getUrn()) { + case Materializations.MULTIMAP_MATERIALIZATION_URN: + KvCoder kvCoder = (KvCoder<?, ?>) coder; + final Coder keyCoder = kvCoder.getKeyCoder(); + final WindowedValue.WindowedValueCoder kvwvCoder = + WindowedValue.FullWindowedValueCoder.of( + kvCoder.getValueCoder(), windowFn.windowCoder()); + BatchTSet<WindowedValue<ElemT>> multimapMaterialization = + inputDataSet + .direct() + .map(new MapToTupleFunction<>(keyCoder, kvwvCoder)) + .allGather() + .map(new ByteToWindowFunctionPrimitive(keyCoder, kvwvCoder)); + context.setSideInputDataSet(input.getTagInternal().getId(), multimapMaterialization); + break; + case Materializations.ITERABLE_MATERIALIZATION_URN: + final WindowedValue.WindowedValueCoder wvCoder = + WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder()); + BatchTSet<WindowedValue<ElemT>> iterableMaterialization = + inputDataSet + .direct() + .map(new ElemToBytesFunction<>(wvCoder)) + .allGather() + .map(new ByteToElemFunction(wvCoder)); + try { + input = CreatePCollectionViewTranslation.getView(application); + } catch (IOException e) { + throw new RuntimeException(e); + } + context.setSideInputDataSet(input.getTagInternal().getId(), iterableMaterialization); + break; + default: + throw new UnsupportedOperationException( + "Unknown side input materialization " + + input.getViewFn().getMaterialization().getUrn()); + } } } diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java new file mode 100644 index 0000000..578225f --- /dev/null +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2.translators.functions; + +import edu.iu.dsc.tws.api.tset.TSetContext; +import edu.iu.dsc.tws.api.tset.fn.MapFunc; +import java.io.ObjectStreamException; +import java.util.logging.Logger; +import org.apache.beam.runners.twister2.utils.TranslationUtils; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; + +/** ByteToWindow function. */ +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +public class ByteToElemFunction<V> implements MapFunc<WindowedValue<V>, byte[]> { + private transient WindowedValueCoder<V> wvCoder; + private static final Logger LOG = Logger.getLogger(ByteToElemFunction.class.getName()); + + private transient boolean isInitialized = false; + private byte[] wvCoderBytes; + + public ByteToElemFunction() { + // non arg constructor needed for kryo + isInitialized = false; + } + + public ByteToElemFunction(final WindowedValueCoder<V> wvCoder) { + this.wvCoder = wvCoder; + + wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder); + } + + @Override + public WindowedValue<V> map(byte[] input) { + return TranslationUtils.fromByteArray(input, wvCoder); + } + + @Override + public void prepare(TSetContext context) { + initTransient(); + } + + /** + * Method used to initialize the transient variables that were sent over as byte arrays or proto + * buffers. + */ + private void initTransient() { + if (isInitialized) { + return; + } + + wvCoder = + (WindowedValueCoder<V>) + SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Custom Coder Bytes"); + this.isInitialized = true; + } + + protected Object readResolve() throws ObjectStreamException { + return this; + } +} diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java new file mode 100644 index 0000000..c83acdd --- /dev/null +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.twister2.translators.functions; + +import edu.iu.dsc.tws.api.tset.TSetContext; +import edu.iu.dsc.tws.api.tset.fn.MapFunc; +import java.io.ObjectStreamException; +import java.util.logging.Logger; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Map to tuple function. */ +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +public class ElemToBytesFunction<V> implements MapFunc<byte[], WindowedValue<V>> { + + private transient WindowedValue.WindowedValueCoder<V> wvCoder; + private static final Logger LOG = Logger.getLogger(ElemToBytesFunction.class.getName()); + + private transient boolean isInitialized = false; + private byte[] wvCoderBytes; + + public ElemToBytesFunction() { + // non arg constructor needed for kryo + this.isInitialized = false; + } + + public ElemToBytesFunction(WindowedValue.WindowedValueCoder<V> wvCoder) { + this.wvCoder = wvCoder; + wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder); + } + + @Override + public @Nullable byte[] map(WindowedValue<V> input) { + try { + return CoderUtils.encodeToByteArray(wvCoder, input); + } catch (CoderException e) { + LOG.info(e.getMessage()); + } + return null; + } + + @Override + public void prepare(TSetContext context) { + initTransient(); + } + + /** + * Method used to initialize the transient variables that were sent over as byte arrays or proto + * buffers. + */ + private void initTransient() { + if (isInitialized) { + return; + } + wvCoder = + (WindowedValue.WindowedValueCoder<V>) + SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Coder"); + this.isInitialized = true; + } + + protected Object readResolve() throws ObjectStreamException { + return this; + } +} diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java index bbcd392..6ed77c7 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java @@ -23,6 +23,7 @@ import edu.iu.dsc.tws.api.dataset.DataPartition; import edu.iu.dsc.tws.api.dataset.DataPartitionConsumer; import edu.iu.dsc.tws.api.tset.TSetContext; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,11 +32,11 @@ import org.apache.beam.runners.core.InMemoryMultimapSideInputView; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.Materializations.MultimapView; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -75,40 +76,79 @@ public class Twister2SideInputReader implements SideInputReader { } private <T> T getSideInput(PCollectionView<T> view, BoundedWindow window) { - Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements = new HashMap<>(); + switch (view.getViewFn().getMaterialization().getUrn()) { + case Materializations.MULTIMAP_MATERIALIZATION_URN: + return getMultimapSideInput(view, window); + case Materializations.ITERABLE_MATERIALIZATION_URN: + return getIterableSideInput(view, window); + default: + throw new IllegalArgumentException( + "Unknown materialization type: " + view.getViewFn().getMaterialization().getUrn()); + } + } + + private <T> T getMultimapSideInput(PCollectionView<T> view, BoundedWindow window) { + Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = getPartitionedElements(view); + Map<BoundedWindow, T> resultMap = new HashMap<>(); + + ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn(); + for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements : + partitionedElements.entrySet()) { + + Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder(); + resultMap.put( + elements.getKey(), + viewFn.apply( + InMemoryMultimapSideInputView.fromIterable( + keyCoder, + (Iterable) + elements.getValue().stream() + .map(WindowedValue::getValue) + .collect(Collectors.toList())))); + } + T result = resultMap.get(window); + if (result == null) { + result = viewFn.apply(InMemoryMultimapSideInputView.empty()); + } + return result; + } + + private Map<BoundedWindow, List<WindowedValue<?>>> getPartitionedElements( + PCollectionView<?> view) { + Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = new HashMap<>(); DataPartition<?> sideInput = runtimeContext.getInput(view.getTagInternal().getId()); DataPartitionConsumer<?> dataPartitionConsumer = sideInput.getConsumer(); while (dataPartitionConsumer.hasNext()) { - WindowedValue<KV<?, ?>> winValue = (WindowedValue<KV<?, ?>>) dataPartitionConsumer.next(); + WindowedValue<?> winValue = (WindowedValue<?>) dataPartitionConsumer.next(); for (BoundedWindow tbw : winValue.getWindows()) { - List<WindowedValue<KV<?, ?>>> windowedValues = + List<WindowedValue<?>> windowedValues = partitionedElements.computeIfAbsent(tbw, k -> new ArrayList<>()); windowedValues.add(winValue); } } + return partitionedElements; + } + + private <T> T getIterableSideInput(PCollectionView<T> view, BoundedWindow window) { + Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = getPartitionedElements(view); + ViewFn<Materializations.IterableView, T> viewFn = + (ViewFn<Materializations.IterableView, T>) view.getViewFn(); Map<BoundedWindow, T> resultMap = new HashMap<>(); - for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements : + for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements : partitionedElements.entrySet()) { - - ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn(); - Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder(); resultMap.put( elements.getKey(), - (T) - viewFn.apply( - InMemoryMultimapSideInputView.fromIterable( - keyCoder, - (Iterable) - elements.getValue().stream() - .map(WindowedValue::getValue) - .collect(Collectors.toList())))); + viewFn.apply( + () -> + elements.getValue().stream() + .map(WindowedValue::getValue) + .collect(Collectors.toList()))); } T result = resultMap.get(window); if (result == null) { - ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn(); - result = viewFn.apply(InMemoryMultimapSideInputView.empty()); + result = viewFn.apply(() -> Collections.<T>emptyList()); } return result; }