[beam] branch release-2.30.0 updated: [BEAM-12273] Support non-multimap materialization in Twister2 runner
This is an automated email from the ASF dual-hosted git repository. heejong pushed a commit to branch release-2.30.0 in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/release-2.30.0 by this push: new 0176b9d [BEAM-12273] Support non-multimap materialization in Twister2 runner new fbd09de Merge pull request #14725 from kennknowles/twister2-cherrypick 0176b9d is described below commit 0176b9d558ebff61ac189f100a8b3435a212d19a Author: Kenneth Knowles AuthorDate: Mon May 3 14:10:23 2021 -0700 [BEAM-12273] Support non-multimap materialization in Twister2 runner --- .../batch/PCollectionViewTranslatorBatch.java | 52 +++--- .../translators/functions/ByteToElemFunction.java | 79 .../translators/functions/ElemToBytesFunction.java | 84 ++ .../twister2/utils/Twister2SideInputReader.java| 76 +++- 4 files changed, 262 insertions(+), 29 deletions(-) diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java index a8c1771..9bc32fc 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java @@ -22,11 +22,14 @@ import java.io.IOException; import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation; import org.apache.beam.runners.twister2.Twister2BatchTranslationContext; import org.apache.beam.runners.twister2.translators.BatchTransformTranslator; +import org.apache.beam.runners.twister2.translators.functions.ByteToElemFunction; import org.apache.beam.runners.twister2.translators.functions.ByteToWindowFunctionPrimitive; +import org.apache.beam.runners.twister2.translators.functions.ElemToBytesFunction; import org.apache.beam.runners.twister2.translators.functions.MapToTupleFunction; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -58,23 +61,50 @@ public class PCollectionViewTranslatorBatch context.getCurrentTransform(); org.apache.beam.sdk.values.PCollectionView input; PCollection inputPCol = context.getInput(transform); -final KvCoder coder = (KvCoder) inputPCol.getCoder(); -Coder inputKeyCoder = coder.getKeyCoder(); +final Coder coder = inputPCol.getCoder(); WindowingStrategy windowingStrategy = inputPCol.getWindowingStrategy(); WindowFn windowFn = windowingStrategy.getWindowFn(); -final WindowedValue.WindowedValueCoder wvCoder = -WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder()); -BatchTSet> inputGathered = -inputDataSet -.direct() -.map(new MapToTupleFunction<>(inputKeyCoder, wvCoder)) -.allGather() -.map(new ByteToWindowFunctionPrimitive(inputKeyCoder, wvCoder)); try { input = CreatePCollectionViewTranslation.getView(application); } catch (IOException e) { throw new RuntimeException(e); } -context.setSideInputDataSet(input.getTagInternal().getId(), inputGathered); + +switch (input.getViewFn().getMaterialization().getUrn()) { + case Materializations.MULTIMAP_MATERIALIZATION_URN: +KvCoder kvCoder = (KvCoder) coder; +final Coder keyCoder = kvCoder.getKeyCoder(); +final WindowedValue.WindowedValueCoder kvwvCoder = +WindowedValue.FullWindowedValueCoder.of( +kvCoder.getValueCoder(), windowFn.windowCoder()); +BatchTSet> multimapMaterialization = +inputDataSet +.direct() +.map(new MapToTupleFunction<>(keyCoder, kvwvCoder)) +.allGather() +.map(new ByteToWindowFunctionPrimitive(keyCoder, kvwvCoder)); +context.setSideInputDataSet(input.getTagInternal().getId(), multimapMaterialization); +break; + case Materializations.ITERABLE_MATERIALIZATION_URN: +final WindowedValue.WindowedValueCoder wvCoder = +WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder()); +BatchTSet> iterableMaterialization = +inputDataSet +.direct() +.map(new ElemToBytesFunction<>(wvCoder)) +.allGather() +.map(new ByteToElemFunction(wvCoder)); +try { + input = CreatePCollectionViewTranslation.getView(a
[beam] tag nightly-master updated (b9e8aca -> 1ee9a42)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to tag nightly-master in repository https://gitbox.apache.org/repos/asf/beam.git. *** WARNING: tag nightly-master was modified! *** from b9e8aca (commit) to 1ee9a42 (commit) from b9e8aca Merge pull request #14709 from pabloem/revert-pr13617 add 79ea6dd [BEAM-12253] Change Read.UnboundedSourceAsSDFRestrictionTracker to use the cache. add 07e7613a Merge pull request #14704: [BEAM-12253] Change Read.UnboundedSourceAsSDFRestrictionTracker.getSplitBacklog to use the reader cache add a0014d7 [BEAM-12271] Make max width for code snippets adaptive with text blocks add 66caaa9 Merge pull request #14708: [BEAM-12271] Make max width for code snippets adaptive with text blocks add 0547c64 [BEAM-12280] Upgrade Flink runner to Flink version 1.12.3 add 7f0d11e Merge pull request #14721: [BEAM-12280] Upgrade Flink runner to Flink version 1.12.3 add b36b8e5 Use __getstate__ & __setstate__ for deterministic coding add 9e9deb9 Re-raise exceptions during deterministic encoding add e40489f Add test for non-deterministic inputs add 75b6c5d Merge pull request #14680 [BEAM-11719] Use __getstate__ & __setstate__ for deterministic coding add 60fd371 [BEAM-12273] Support non-multimap materialization in Twister2 runner add 2e7b06b Merge pull request #14715: [BEAM-12273] Support non-multimap materialization in Twister2 runner add 1ee9a42 [BEAM-11759] Create Beam glossary (#14717) No new revisions were added by this update. Summary of changes: runners/flink/1.12/build.gradle| 2 +- .../batch/PCollectionViewTranslatorBatch.java | 52 ++- ...ctionPrimitive.java => ByteToElemFunction.java} | 46 +- ...TupleFunction.java => ElemToBytesFunction.java} | 41 +- .../twister2/utils/Twister2SideInputReader.java| 76 +++- .../src/main/java/org/apache/beam/sdk/io/Read.java | 41 +- sdks/python/apache_beam/coders/coder_impl.py | 53 ++- .../apache_beam/coders/coders_test_common.py | 25 ++ website/www/site/assets/scss/_global.sass | 4 +- .../www/site/content/en/documentation/_index.md| 1 + .../www/site/content/en/documentation/glossary.md | 464 + .../partials/section-menu/en/documentation.html| 2 + 12 files changed, 674 insertions(+), 133 deletions(-) copy runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/{ByteToWindowFunctionPrimitive.java => ByteToElemFunction.java} (59%) copy runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/{MapToTupleFunction.java => ElemToBytesFunction.java} (60%) create mode 100644 website/www/site/content/en/documentation/glossary.md
[beam] branch master updated (2e7b06b -> 1ee9a42)
This is an automated email from the ASF dual-hosted git repository. bhulette pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 2e7b06b Merge pull request #14715: [BEAM-12273] Support non-multimap materialization in Twister2 runner add 1ee9a42 [BEAM-11759] Create Beam glossary (#14717) No new revisions were added by this update. Summary of changes: .../www/site/content/en/documentation/_index.md| 1 + .../www/site/content/en/documentation/glossary.md | 464 + .../partials/section-menu/en/documentation.html| 2 + 3 files changed, 467 insertions(+) create mode 100644 website/www/site/content/en/documentation/glossary.md
[beam] branch master updated (75b6c5d -> 2e7b06b)
This is an automated email from the ASF dual-hosted git repository. kenn pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 75b6c5d Merge pull request #14680 [BEAM-11719] Use __getstate__ & __setstate__ for deterministic coding add 60fd371 [BEAM-12273] Support non-multimap materialization in Twister2 runner add 2e7b06b Merge pull request #14715: [BEAM-12273] Support non-multimap materialization in Twister2 runner No new revisions were added by this update. Summary of changes: .../batch/PCollectionViewTranslatorBatch.java | 52 +++ ...ctionPrimitive.java => ByteToElemFunction.java} | 46 ++--- ...TupleFunction.java => ElemToBytesFunction.java} | 41 +++- .../twister2/utils/Twister2SideInputReader.java| 76 +- 4 files changed, 113 insertions(+), 102 deletions(-) copy runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/{ByteToWindowFunctionPrimitive.java => ByteToElemFunction.java} (59%) copy runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/{MapToTupleFunction.java => ElemToBytesFunction.java} (60%)
[beam] branch master updated (7f0d11e -> 75b6c5d)
This is an automated email from the ASF dual-hosted git repository. robertwb pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 7f0d11e Merge pull request #14721: [BEAM-12280] Upgrade Flink runner to Flink version 1.12.3 add b36b8e5 Use __getstate__ & __setstate__ for deterministic coding add 9e9deb9 Re-raise exceptions during deterministic encoding add e40489f Add test for non-deterministic inputs add 75b6c5d Merge pull request #14680 [BEAM-11719] Use __getstate__ & __setstate__ for deterministic coding No new revisions were added by this update. Summary of changes: sdks/python/apache_beam/coders/coder_impl.py | 53 ++ .../apache_beam/coders/coders_test_common.py | 25 ++ 2 files changed, 68 insertions(+), 10 deletions(-)
[beam] branch master updated (66caaa9 -> 7f0d11e)
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 66caaa9 Merge pull request #14708: [BEAM-12271] Make max width for code snippets adaptive with text blocks add 0547c64 [BEAM-12280] Upgrade Flink runner to Flink version 1.12.3 add 7f0d11e Merge pull request #14721: [BEAM-12280] Upgrade Flink runner to Flink version 1.12.3 No new revisions were added by this update. Summary of changes: runners/flink/1.12/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch master updated (07e7613a -> 66caaa9)
This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 07e7613a Merge pull request #14704: [BEAM-12253] Change Read.UnboundedSourceAsSDFRestrictionTracker.getSplitBacklog to use the reader cache add a0014d7 [BEAM-12271] Make max width for code snippets adaptive with text blocks add 66caaa9 Merge pull request #14708: [BEAM-12271] Make max width for code snippets adaptive with text blocks No new revisions were added by this update. Summary of changes: website/www/site/assets/scss/_global.sass | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)