[beam] branch master updated: [Spark Dataset runner] Avoid copying outputs for most cases in ParDo translation (related to #24711) (#25624)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 5a2b93ddaa7 [Spark Dataset runner] Avoid copying outputs for most cases in ParDo translation (related to #24711) (#25624) 5a2b93ddaa7 is described below commit 5a2b93ddaa707053eddaf98553935a05062d3e8f Author: Moritz Mack AuthorDate: Mon Mar 6 11:50:22 2023 +0100 [Spark Dataset runner] Avoid copying outputs for most cases in ParDo translation (related to #24711) (#25624) --- .../translation/batch/ParDoTranslatorBatch.java| 40 +++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index da40e4c9c50..3b5c94d311f 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -27,6 +27,7 @@ import static org.apache.spark.sql.functions.col; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -48,6 +49,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Dataset; @@ -113,12 +115,10 @@ class ParDoTranslatorBatch MetricsAccumulator metrics = MetricsAccumulator.getInstance(cxt.getSparkSession()); TupleTag mainOut = transform.getMainOutputTag(); -// Filter out unconsumed PCollections (except mainOut) to potentially avoid the costs of caching -// if not really beneficial. + +// Filter out obsolete PCollections to only cache when absolutely necessary Map, PCollection> outputs = -Maps.filterEntries( -cxt.getOutputs(), -e -> e != null && (e.getKey().equals(mainOut) || !cxt.isLeaf(e.getValue(; +skipObsoleteOutputs(cxt.getOutputs(), mainOut, transform.getAdditionalOutputTags(), cxt); if (outputs.size() > 1) { // In case of multiple outputs / tags, map each tag to a column by index. @@ -169,6 +169,36 @@ class ParDoTranslatorBatch } } + /** + * Filter out obsolete, unused output tags except for {@code mainTag}. + * + * This can help to avoid unnecessary caching in case of multiple outputs if only {@code + * mainTag} is consumed. + */ + private Map, PCollection> skipObsoleteOutputs( + Map, PCollection> outputs, + TupleTag mainTag, + TupleTagList otherTags, + Context cxt) { +switch (outputs.size()) { + case 1: +return outputs; // always keep main output + case 2: +TupleTag otherTag = otherTags.get(0); +return cxt.isLeaf(checkStateNotNull(outputs.get(otherTag))) +? Collections.singletonMap(mainTag, checkStateNotNull(outputs.get(mainTag))) +: outputs; + default: +Map, PCollection> filtered = Maps.newHashMapWithExpectedSize(outputs.size()); +for (Map.Entry, PCollection> e : outputs.entrySet()) { + if (e.getKey().equals(mainTag) || !cxt.isLeaf(e.getValue())) { +filtered.put(e.getKey(), e.getValue()); + } +} +return filtered; +} + } + static Fun1, TraversableOnce> selectByColumnIdx(int idx) { return t -> idx == t._1 ? listOf(t._2) : emptyList(); }
[beam] branch master updated: Evaluate removal of RDD caching for MEMORY_ONLY in the Spark Dataset runner (#25327)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new f49b450fc15 Evaluate removal of RDD caching for MEMORY_ONLY in the Spark Dataset runner (#25327) f49b450fc15 is described below commit f49b450fc1535c5d9bb191ecf200f04e69fa0de1 Author: Moritz Mack AuthorDate: Fri Mar 3 15:27:50 2023 +0100 Evaluate removal of RDD caching for MEMORY_ONLY in the Spark Dataset runner (#25327) --- .../translation/PipelineTranslator.java| 18 +- .../translation/batch/ParDoTranslatorBatch.java| 65 ++ 2 files changed, 19 insertions(+), 64 deletions(-) diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java index ea8441b0bf1..75fd6353123 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java @@ -21,7 +21,6 @@ import static org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior.DO_ import static org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.sdk.values.PCollection.IsBounded.UNBOUNDED; -import static org.apache.spark.storage.StorageLevel.MEMORY_ONLY; import java.io.IOException; import java.io.Serializable; @@ -231,7 +230,6 @@ public abstract class PipelineTranslator { private final PipelineOptions options; private final Supplier optionsSupplier; private final StorageLevel storageLevel; -private final boolean isMemoryOnly; private final Set> leaves; @@ -244,7 +242,6 @@ public abstract class PipelineTranslator { this.options = options; this.optionsSupplier = new BroadcastOptions(sparkSession, options); this.storageLevel = StorageLevel.fromString(options.getStorageLevel()); - this.isMemoryOnly = storageLevel.equals(MEMORY_ONLY()); this.encoders = new HashMap<>(); this.leaves = new HashSet<>(); } @@ -294,18 +291,9 @@ public abstract class PipelineTranslator { TranslationResult result = getResult(pCollection); result.dataset = dataset; - if (!cache && isMemoryOnly) { -result.resetPlanComplexity(); // cached as RDD in memory which breaks linage - } else if (cache && result.usages() > 1) { -if (isMemoryOnly) { - // Cache as RDD in-memory only, this helps to also break linage of complex query plans. - LOG.info("Dataset {} will be cached in-memory as RDD for reuse.", result.name); - result.dataset = sparkSession.createDataset(dataset.rdd().persist(), dataset.encoder()); - result.resetPlanComplexity(); -} else { - LOG.info("Dataset {} will be cached for reuse.", result.name); - dataset.persist(storageLevel); // use NONE to disable -} + if (cache && result.usages() > 1) { +LOG.info("Dataset {} will be cached for reuse.", result.name); +dataset.persist(storageLevel); // use NONE to disable } if (result.estimatePlanComplexity() > PLAN_COMPLEXITY_THRESHOLD) { diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 40d26be8a8b..da40e4c9c50 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -23,7 +23,6 @@ import static org.apache.beam.runners.spark.structuredstreaming.translation.util import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import static org.apache.spark.sql.functions.col; -import static org.apache.spark.storage.StorageLevel.MEMORY_ONLY; import java.io.IOException; import java.util.ArrayList; @@ -51,14 +50,12 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.spark.broadcast.Broadcast; -import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset;
[beam] branch master updated: [Spark Dataset runner] Fix SparkSessionFactory to better support running on a cluster. (#24862)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 1aa5acc0627 [Spark Dataset runner] Fix SparkSessionFactory to better support running on a cluster. (#24862) 1aa5acc0627 is described below commit 1aa5acc0627eca7e607b91432c57ea3cac2bd7d4 Author: Moritz Mack AuthorDate: Mon Jan 16 17:26:34 2023 +0100 [Spark Dataset runner] Fix SparkSessionFactory to better support running on a cluster. (#24862) * [Spark Dataset runner] Fix SparkSessionFactory to better support running on a cluster (fixes #24861). --- .../SparkStructuredStreamingRunner.java| 2 - .../translation/SparkSessionFactory.java | 100 - runners/spark/spark_runner.gradle | 1 + 3 files changed, 80 insertions(+), 23 deletions(-) diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java index 3b9f96cdb7e..6e08bf1c0b4 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.spark.structuredstreaming; -import static org.apache.beam.runners.spark.SparkCommonPipelineOptions.prepareFilesToStage; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import java.util.concurrent.ExecutorService; @@ -188,7 +187,6 @@ public final class SparkStructuredStreamingRunner } PipelineTranslator.replaceTransforms(pipeline, options); -prepareFilesToStage(options); PipelineTranslator pipelineTranslator = new PipelineTranslatorBatch(); return pipelineTranslator.translate(pipeline, sparkSession, options); diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkSessionFactory.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkSessionFactory.java index 6632f4d707a..16eb1131c00 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkSessionFactory.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/SparkSessionFactory.java @@ -17,13 +17,20 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation; +import static org.apache.commons.lang3.ArrayUtils.EMPTY_STRING_ARRAY; +import static org.apache.commons.lang3.StringUtils.substringBetween; +import static org.apache.commons.lang3.math.NumberUtils.toInt; + import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.serializers.JavaSerializer; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; -import java.util.List; import javax.annotation.Nullable; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.ArrayUtils; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.core.construction.resources.PipelineResources; import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SideInputValues; import org.apache.beam.sdk.coders.AvroCoder; @@ -74,6 +81,8 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Collections2; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.serializer.KryoRegistrator; import org.apache.spark.serializer.KryoSerializer; @@ -87,6 +96,24 @@ public class SparkSessionFactory { private static final Logger LOG = LoggerFactory.getLogger(SparkSessionFactory.class); + // Patterns to exclude local JRE and certain artifact (groups) in Maven and Gradle cache. + private static final Collection SPARK_JAR_EXCLUDES = + Lists.newArrayList( + "jre/lib/ext/", + "/org/slf4j/", + "/org.slf4j/", + "/log4j/", + "/io/dropwizard/metrics/", + "/io.dropwizard.metrics/", + "/org/apache/spark/", + "/org.apache.spark/", + "/org/apache/hadoop/", +
[beam] branch master updated: [Spark Dataset runner] Reuse SparkSession when testing using class rule. (#24812)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 208a14e05b5 [Spark Dataset runner] Reuse SparkSession when testing using class rule. (#24812) 208a14e05b5 is described below commit 208a14e05b5cf4021ef978ec45d75a96e20bc3a6 Author: Moritz Mack AuthorDate: Thu Dec 29 11:08:36 2022 +0100 [Spark Dataset runner] Reuse SparkSession when testing using class rule. (#24812) --- .../translation/batch/CombineGloballyTest.java | 18 ++-- .../batch/CombineGroupedValuesTest.java| 18 ++-- .../translation/batch/CombinePerKeyTest.java | 18 ++-- .../translation/batch/ComplexSourceTest.java | 17 +-- .../translation/batch/FlattenTest.java | 24 +- .../translation/batch/GroupByKeyTest.java | 18 ++-- .../translation/batch/SimpleSourceTest.java| 18 ++-- .../translation/batch/WindowAssignTest.java| 18 ++-- 8 files changed, 51 insertions(+), 98 deletions(-) diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java index dca8b664bd3..cca192df9de 100644 --- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java +++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java @@ -18,10 +18,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.Serializable; -import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; -import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine; @@ -38,6 +35,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,15 +46,11 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class CombineGloballyTest implements Serializable { - @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions()); + @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule(); - private static PipelineOptions testOptions() { -SparkStructuredStreamingPipelineOptions options = - PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class); -options.setRunner(SparkStructuredStreamingRunner.class); -options.setTestMode(true); -return options; - } + @Rule + public transient TestPipeline pipeline = + TestPipeline.fromOptions(SESSION.createPipelineOptions()); @Test public void testCombineGlobally() { diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java index cce3199d2c3..774186c1821 100644 --- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java +++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java @@ -18,14 +18,11 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import java.io.Serializable; -import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions; -import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner; +import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine; @@ -34,6 +31,7
[beam] branch master updated: [Spark Dataset runner] Reduce binary size of Java serialized task related for ParDo translation (#24543)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2cd38984a35 [Spark Dataset runner] Reduce binary size of Java serialized task related for ParDo translation (#24543) 2cd38984a35 is described below commit 2cd38984a354c76ada42cb51f13a398babaf1b76 Author: Moritz Mack AuthorDate: Mon Dec 19 14:13:08 2022 +0100 [Spark Dataset runner] Reduce binary size of Java serialized task related for ParDo translation (#24543) * [Spark Dataset runner] Reduce binary size of Java serialized broadcasted task related for ParDo translation (related to #23845) --- .../batch/DoFnMapPartitionsFactory.java| 204 .../batch/DoFnPartitionIteratorFactory.java| 272 + .../translation/batch/ParDoTranslatorBatch.java| 125 -- 3 files changed, 323 insertions(+), 278 deletions(-) diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnMapPartitionsFactory.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnMapPartitionsFactory.java deleted file mode 100644 index a53e5ca3a79..000 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnMapPartitionsFactory.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.structuredstreaming.translation.batch; - -import static java.util.stream.Collectors.toCollection; -import static java.util.stream.Collectors.toMap; -import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.scalaIterator; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayListWithCapacity; - -import java.io.Serializable; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.SideInputReader; -import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator; -import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.CachedSideInputReader; -import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext; -import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun1; -import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.Fun2; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnSchemaInformation; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator; -import org.apache.spark.api.java.function.MapPartitionsFunction; -import org.checkerframework.checker.nullness.qual.NonNull; -import scala.collection.Iterator; - -/** - * Encapsulates a {@link DoFn} inside a Spark {@link - * org.apache.spark.api.java.function.MapPartitionsFunction}. - */ -class DoFnMapPartitionsFactory implements Serializable { - private final String stepName; - - private final DoFn doFn; - private final DoFnSchemaInformation doFnSchema; - private final Supplier options; - - private final Coder coder; - private final WindowingStrategy windowingStrategy; - private final TupleTag mainOutput; - private final List> additionalOutputs; - private final Map, Coder> outputCode
[beam] branch master updated: [Spark runner] Support running (VR) tests with Java 17 (closes #24400) (#24401)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new dd8c035937a [Spark runner] Support running (VR) tests with Java 17 (closes #24400) (#24401) dd8c035937a is described below commit dd8c035937a275246a7901718d7bcd890536731b Author: Moritz Mack AuthorDate: Mon Dec 5 15:58:26 2022 +0100 [Spark runner] Support running (VR) tests with Java 17 (closes #24400) (#24401) --- runners/spark/spark_runner.gradle | 30 ++ 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index 365b0935151..1c04aef4d8c 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -61,6 +61,22 @@ def sparkTestProperties(overrides = [:]) { ] } + +def sparkTestJvmArgs() { + // run tests with Java 17 using -PcompileAndRunTestsWithJava17 -Pjava17Home=??? + if (project.hasProperty("compileAndRunTestsWithJava17")) { +return [ + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + // add-opens below required for Kryo FieldSerializer / SparkRunnerKryoRegistratorTest + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" +] + } else { +return [] + } +} + def hadoopVersions = [ "285" : "2.8.5", "292" : "2.9.2", @@ -113,6 +129,7 @@ test { // Change log level to debug only for the package and nested packages: // systemProperty "org.slf4j.simpleLogger.log.org.apache.beam.runners.spark.stateful", "debug" jvmArgs "-XX:-UseGCOverheadLimit" + jvmArgs += sparkTestJvmArgs() if (System.getProperty("beamSurefireArgline")) { jvmArgs System.getProperty("beamSurefireArgline") } @@ -230,6 +247,8 @@ def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) { // Disable gradle cache outputs.upToDateWhen { false } systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"]) + jvmArgs += sparkTestJvmArgs() + jvmArgs '-Xmx3g' classpath = configurations.validatesRunner testClassesDirs = files( @@ -263,7 +282,6 @@ def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) { excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery' excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle' } - jvmArgs '-Xmx3g' } def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) { @@ -271,6 +289,7 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) // Disable gradle cache outputs.upToDateWhen { false } systemProperties sparkTestProperties(["--enableSparkMetricSinks": "true"]) + jvmArgs += sparkTestJvmArgs() classpath = configurations.validatesRunner testClassesDirs += files( @@ -330,6 +349,8 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) { systemProperty 'spark.kryo.classesToRegister', 'org.apache.beam.sdk.transforms.ViewTest$NonDeterministicStringCoder,' + 'org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.RegularImmutableList' + jvmArgs += sparkTestJvmArgs() + jvmArgs '-Xmx7g' // Increase memory heap in order to avoid OOM errors classpath = configurations.validatesRunner testClassesDirs = files( @@ -339,8 +360,7 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) { testClassesDirs += files(project.sourceSets.test.output.classesDirs) maxParallelForks 4 - // Increase memory heap in order to avoid OOM errors - jvmArgs '-Xmx7g' + useJUnit { includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' // Should be run only in a properly configured SDK harness environment @@ -408,6 +428,8 @@ tasks.register("examplesIntegrationTest", Test) { "--tempRoot": "${tempLocation}", "--project" : "${gcpProject}" ]) + jvmArgs += sparkTestJvmArgs() + jvmArgs '-Xmx3g' include '**/*IT.class' maxParallelForks 4 @@ -420,7 +442,6 @@ tasks.register("examplesIntegrationTest", Test) { excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding' } } - jvmArgs '-Xmx3g' } hadoopVersions.each { kv -> @@ -429,6 +450,7 @@ hadoopVersions.each { kv -> description = "Runs Spark tests with Hadoop version $kv.value" classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath systemProperties sparkTestProperties() +jvmArgs += sparkTestJvmArgs() include "**/*Test.class" maxParallelForks 4
[beam] branch master updated: [Spark dataset runner] Cache datasets if used multiple times (#24009)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 217e697d6ca [Spark dataset runner] Cache datasets if used multiple times (#24009) 217e697d6ca is described below commit 217e697d6ca5b674fe92e25df2f1a54d01a1660f Author: Moritz Mack AuthorDate: Fri Nov 18 16:05:07 2022 +0100 [Spark dataset runner] Cache datasets if used multiple times (#24009) * [Spark dataset runner] Cache datasets if used multiple times (closes #24008) --- .../SparkStructuredStreamingRunner.java| 34 +- .../aggregators/AggregatorsAccumulator.java| 6 +- .../metrics/MetricsAccumulator.java| 6 +- .../translation/EvaluationContext.java | 120 +++ .../translation/PipelineTranslator.java| 380 +++-- .../translation/TransformTranslator.java | 118 +++ .../translation/TranslationContext.java| 141 .../CreatePCollectionViewTranslatorBatch.java | 46 --- .../translation/batch/ParDoTranslatorBatch.java| 15 +- .../translation/batch/PipelineTranslatorBatch.java | 25 +- .../translation/helpers/EncoderProvider.java | 58 11 files changed, 536 insertions(+), 413 deletions(-) diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java index 875d59ce549..7d6f40ae904 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java @@ -30,8 +30,9 @@ import org.apache.beam.runners.spark.structuredstreaming.metrics.AggregatorMetri import org.apache.beam.runners.spark.structuredstreaming.metrics.CompositeSource; import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.structuredstreaming.metrics.SparkBeamMetricSource; +import org.apache.beam.runners.spark.structuredstreaming.translation.EvaluationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator; -import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; +import org.apache.beam.runners.spark.structuredstreaming.translation.SparkSessionFactory; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; @@ -43,9 +44,10 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.spark.SparkContext; import org.apache.spark.SparkEnv$; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.metrics.MetricsSystem; +import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,7 +149,7 @@ public final class SparkStructuredStreamingRunner AggregatorsAccumulator.clear(); MetricsAccumulator.clear(); -final TranslationContext translationContext = translatePipeline(pipeline); +final EvaluationContext evaluationContext = translatePipeline(pipeline); final ExecutorService executorService = Executors.newSingleThreadExecutor( @@ -156,14 +158,14 @@ public final class SparkStructuredStreamingRunner executorService.submit( () -> { // TODO initialise other services: checkpointing, metrics system, listeners, ... - translationContext.startPipeline(); + evaluationContext.evaluate(); }); executorService.shutdown(); Runnable onTerminalState = options.getUseActiveSparkSession() ? () -> {} -: () -> translationContext.getSparkSession().stop(); +: () -> evaluationContext.getSparkSession().stop(); SparkStructuredStreamingPipelineResult result = new SparkStructuredStreamingPipelineResult(submissionFuture, onTerminalState); @@ -183,8 +185,8 @@ public final class SparkStructuredStreamingRunner return result; } - private TranslationContext translatePipeline(Pipeline pipeline) { -PipelineTranslator.detectTranslationMode(pipeline, options); + private EvaluationContext translatePipeline(Pipeline pipeline) { +PipelineTranslator.detectStreamingMode(pipel
[beam] branch master updated (72127f93f45 -> b9f6af54d52)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 72127f93f45 Fixes #22156: Fix Spark3 runner to compile against Spark 3.2/3.3 and add version tests to verify compatibility going forward (#22157) new 8d59cf7de71 Closes #22407: Separate sources for SparkStructuredStreamingRunner for Spark 2 & 3 to allow for easier improvements for Spark 3. new 79d677fed70 Add deprecation warning for Spark 2 in SparkStructuredStreamingRunner new b9f6af54d52 Merge pull request #22408 from mosche/22407-separate-spark-ssrunner-sources The 36602 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/beam/runners/spark/structuredstreaming/Constants.java | 0 .../structuredstreaming/SparkStructuredStreamingPipelineOptions.java | 0 .../structuredstreaming/SparkStructuredStreamingPipelineResult.java | 0 .../spark/structuredstreaming/SparkStructuredStreamingRunner.java | 4 .../structuredstreaming/SparkStructuredStreamingRunnerRegistrar.java | 0 .../spark/structuredstreaming/aggregators/AggregatorsAccumulator.java | 0 .../spark/structuredstreaming/aggregators/NamedAggregators.java | 0 .../structuredstreaming/aggregators/NamedAggregatorsAccumulator.java | 0 .../runners/spark/structuredstreaming/aggregators/package-info.java | 0 .../beam/runners/spark/structuredstreaming/examples/WordCount.java| 0 .../runners/spark/structuredstreaming/metrics/AggregatorMetric.java | 0 .../spark/structuredstreaming/metrics/AggregatorMetricSource.java | 0 .../beam/runners/spark/structuredstreaming/metrics/BeamMetricSet.java | 0 .../runners/spark/structuredstreaming/metrics/CompositeSource.java| 0 .../runners/spark/structuredstreaming/metrics/MetricsAccumulator.java | 0 .../metrics/MetricsContainerStepMapAccumulator.java | 0 .../runners/spark/structuredstreaming/metrics/SparkBeamMetric.java| 0 .../spark/structuredstreaming/metrics/SparkBeamMetricSource.java | 0 .../structuredstreaming/metrics/SparkMetricsContainerStepMap.java | 0 .../runners/spark/structuredstreaming/metrics/WithMetricsSupport.java | 0 .../beam/runners/spark/structuredstreaming/metrics/package-info.java | 0 .../spark/structuredstreaming/metrics/sink/CodahaleCsvSink.java | 0 .../spark/structuredstreaming/metrics/sink/CodahaleGraphiteSink.java | 0 .../runners/spark/structuredstreaming/metrics/sink/package-info.java | 0 .../apache/beam/runners/spark/structuredstreaming/package-info.java | 0 .../structuredstreaming/translation/AbstractTranslationContext.java | 0 .../spark/structuredstreaming/translation/PipelineTranslator.java | 0 .../spark/structuredstreaming/translation/SparkSessionFactory.java| 0 .../structuredstreaming/translation/SparkTransformOverrides.java | 0 .../spark/structuredstreaming/translation/TransformTranslator.java| 0 .../structuredstreaming/translation/batch/AggregatorCombiner.java | 0 .../translation/batch/CombinePerKeyTranslatorBatch.java | 0 .../translation/batch/CreatePCollectionViewTranslatorBatch.java | 0 .../spark/structuredstreaming/translation/batch/DoFnFunction.java | 0 .../structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java | 0 .../structuredstreaming/translation/batch/FlattenTranslatorBatch.java | 0 .../translation/batch/GroupByKeyTranslatorBatch.java | 0 .../structuredstreaming/translation/batch/ImpulseTranslatorBatch.java | 0 .../structuredstreaming/translation/batch/ParDoTranslatorBatch.java | 0 .../translation/batch/PipelineTranslatorBatch.java| 0 .../spark/structuredstreaming/translation/batch/ProcessContext.java | 0 .../translation/batch/ReadSourceTranslatorBatch.java | 0 .../translation/batch/ReshuffleTranslatorBatch.java | 0 .../translation/batch/WindowAssignTranslatorBatch.java| 0 .../batch/functions/GroupAlsoByWindowViaOutputBufferFn.java | 0 .../translation/batch/functions/NoOpStepContext.java | 0 .../translation/batch/functions/SparkSideInputReader.java | 0 .../structuredstreaming/translation/batch/functions/package-info.java | 0 .../spark/structuredstreaming/translation/batch/package-info.java | 0 .../spark/structuredstreaming/translation/helpers/CoderHelpers.java | 0 .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java | 0 .../spark/structuredstreaming/translation/helpers/KVHelpers.java | 0 .../structuredstreaming/translation/helpers/MultiOutputCoder.java | 0 .../spark/structuredstreaming/translation/helpers/RowHelpers
[beam] branch master updated: Deprecate runner support for Spark 2.4 (closes #22094)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new cd6bb9569e5 Deprecate runner support for Spark 2.4 (closes #22094) new 680ed5b3a49 Merge pull request #22097 from mosche/22094-DeprecateSpark2 cd6bb9569e5 is described below commit cd6bb9569e5f8a0a9c6b55473c13a0b453ee6c8f Author: Moritz Mack AuthorDate: Wed Jun 29 14:51:50 2022 +0200 Deprecate runner support for Spark 2.4 (closes #22094) --- CHANGES.md| 1 + .../runners/spark/translation/SparkContextFactory.java| 8 +++- .../www/site/content/en/documentation/runners/spark.md| 15 --- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 9a5873ae940..53af5dd28b0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,6 +70,7 @@ ## Deprecations +* Support for Spark 2.4.x is deprecated and will be dropped with the release of Beam 2.44.0 or soon after (Spark runner) ([#22094](https://github.com/apache/beam/issues/22094)). * X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)). ## Bugfixes diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index 9f9465ccde8..4b714b65581 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -143,6 +143,12 @@ public final class SparkContextFactory { conf.setAppName(options.getAppName()); // register immutable collections serializers because the SDK uses them. conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName()); -return new JavaSparkContext(conf); +JavaSparkContext jsc = new JavaSparkContext(conf); +if (jsc.sc().version().startsWith("2")) { + LOG.warn( + "Support for Spark 2 is deprecated, this runner will be removed in a few releases.\n" + + "Spark 2 is reaching its EOL, consider migrating to Spark 3."); +} +return jsc; } } diff --git a/website/www/site/content/en/documentation/runners/spark.md b/website/www/site/content/en/documentation/runners/spark.md index 91b72d542a7..abc1031840b 100644 --- a/website/www/site/content/en/documentation/runners/spark.md +++ b/website/www/site/content/en/documentation/runners/spark.md @@ -67,7 +67,8 @@ the portable Runner. For more information on portability, please visit the ## Spark Runner prerequisites and setup -The Spark runner currently supports Spark's 2.x branch, and more specifically any version greater than 2.4.0. +The Spark runner currently supports Spark's 3.1.x branch. +> **Note:** Support for Spark 2.4.x is deprecated and will be dropped with the release of Beam 2.44.0 (or soon after). {{< paragraph class="language-java" >}} You can add a dependency on the latest version of the Spark runner by adding to your pom.xml the following: @@ -76,7 +77,7 @@ You can add a dependency on the latest version of the Spark runner by adding to {{< highlight java >}} org.apache.beam - beam-runners-spark + beam-runners-spark-3 {{< param release_latest >}} {{< /highlight >}} @@ -90,13 +91,13 @@ In some cases, such as running in local mode/Standalone, your (self-contained) a {{< highlight java >}} org.apache.spark - spark-core_2.11 + spark-core_2.12 ${spark.version} org.apache.spark - spark-streaming_2.11 + spark-streaming_2.12 ${spark.version} {{< /highlight >}} @@ -193,7 +194,7 @@ download it on the [Downloads page](/get-started/downloads/). {{< paragraph class="language-py" >}} 1. Start the JobService endpoint: * with Docker (preferred): `docker run --net=host apache/beam_spark_job_server:latest` -* or from Beam source code: `./gradlew :runners:spark:2:job-server:runShadow` +* or from Beam source code: `./gradlew :runners:spark:3:job-server:runShadow` {{< /paragraph >}} {{< paragraph class="language-py" >}} @@ -228,7 +229,7 @@ For more details on the different deployment modes see: [Standalone](https://spa {{< paragraph class="language-py" >}} 2. Start JobService that will connect with the Spark master: * with Docker (preferred): `docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:7077` -* or from Beam source code: `./gradlew :runners:spark:2:job-server:runShadow -PsparkMasterUrl=spark://localhost:7077` +* or from Beam source
[beam] branch master updated (3c623abd631 -> e62ae391985)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git from 3c623abd631 [Playground] [Hotfix] Remove autoscrolling from embedded editor (#21717) add 72e52cb1546 [BEAM-12918] Add PostCommit_Java_Tpcds_Dataflow job new e62ae391985 Merge pull request #17680: [BEAM-12918] Add PostCommit_Java_Tpcds_Dataflow job The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../job_PostCommit_Java_Tpcds_Dataflow.groovy | 96 ++ .../org/apache/beam/sdk/tpcds/TpcdsOptions.java| 4 +- 2 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 .test-infra/jenkins/job_PostCommit_Java_Tpcds_Dataflow.groovy
[beam] 01/01: Merge pull request #17680: [BEAM-12918] Add PostCommit_Java_Tpcds_Dataflow job
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git commit e62ae391985fc13c7df1ee6e088525835ceaa560 Merge: 3c623abd631 72e52cb1546 Author: Etienne Chauchot AuthorDate: Tue Jun 7 15:16:41 2022 +0200 Merge pull request #17680: [BEAM-12918] Add PostCommit_Java_Tpcds_Dataflow job [BEAM-12918] Add PostCommit_Java_Tpcds_Dataflow job .../job_PostCommit_Java_Tpcds_Dataflow.groovy | 96 ++ .../org/apache/beam/sdk/tpcds/TpcdsOptions.java| 4 +- 2 files changed, 99 insertions(+), 1 deletion(-)
[beam] branch master updated: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1 (#17406)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a6ee885888b [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1 (#17406) a6ee885888b is described below commit a6ee885888b3ad89e778e7dab50ee1e37bcef1d4 Author: Moritz Mack AuthorDate: Thu May 12 15:08:19 2022 +0200 [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1 (#17406) * [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1 and set provided SparkContext via SparkContextFactory to avoid losing it during a serde roundtrip in TestPipenline. --- runners/spark/spark_runner.gradle | 14 +- .../beam/runners/spark/SparkContextOptions.java| 6 + .../beam/runners/spark/SparkRunnerDebugger.java| 6 +- .../spark/translation/SparkContextFactory.java | 134 +++-- .../org/apache/beam/runners/spark/CacheTest.java | 25 ++- .../runners/spark/GlobalWatermarkHolderTest.java | 19 +-- .../runners/spark/ProvidedSparkContextTest.java| 93 +--- ...ntextRule.java => SparkContextOptionsRule.java} | 31 ++-- .../beam/runners/spark/SparkContextRule.java | 90 +++ .../beam/runners/spark/SparkPipelineStateTest.java | 167 +++-- .../runners/spark/SparkRunnerDebuggerTest.java | 12 +- .../metrics/sink/SparkMetricsSinkTest.java | 13 +- .../coders/SparkRunnerKryoRegistratorTest.java | 118 +++ .../spark/metrics/SparkMetricsPusherTest.java | 3 - .../structuredstreaming/SparkSessionRule.java | 73 + .../translation/helpers/EncoderHelpersTest.java| 14 +- .../translation/streaming/CreateStreamTest.java| 2 - .../ResumeFromCheckpointStreamingTest.java | 4 - .../streaming/SparkCoGroupByKeyStreamingTest.java | 3 - .../streaming/TrackStreamingSourcesTest.java | 19 +-- 20 files changed, 474 insertions(+), 372 deletions(-) diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index acbe1806c8e..138b823a77e 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -94,7 +94,6 @@ if (copySourceBase) { } test { - systemProperty "beam.spark.test.reuseSparkContext", "true" systemProperty "spark.sql.shuffle.partitions", "4" systemProperty "spark.ui.enabled", "false" systemProperty "spark.ui.showConsoleProgress", "false" @@ -113,17 +112,14 @@ test { jvmArgs System.getProperty("beamSurefireArgline") } - // Only one SparkContext may be running in a JVM (SPARK-2243) - forkEvery 1 maxParallelForks 4 useJUnit { excludeCategories "org.apache.beam.runners.spark.StreamingTest" excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery" } - filter { -// BEAM-11653 MetricsSinkTest is failing with Spark 3 -excludeTestsMatching 'org.apache.beam.runners.spark.aggregators.metrics.sink.SparkMetricsSinkTest' - } + + // easily re-run all tests (to deal with flaky tests / SparkContext leaks) + if(project.hasProperty("rerun-tests")) { outputs.upToDateWhen {false} } } dependencies { @@ -291,10 +287,6 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) useJUnit { includeCategories 'org.apache.beam.runners.spark.StreamingTest' } - filter { -// BEAM-11653 MetricsSinkTest is failing with Spark 3 -excludeTestsMatching 'org.apache.beam.runners.spark.aggregators.metrics.sink.SparkMetricsSinkTest' - } } tasks.register("validatesStructuredStreamingRunnerBatch", Test) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java index 13ae67878eb..39caee7e6ba 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java @@ -37,6 +37,12 @@ import org.apache.spark.streaming.api.java.JavaStreamingListener; * which link to Spark dependencies, won't be scanned by {@link PipelineOptions} reflective * instantiation. Note that {@link SparkContextOptions} is not registered with {@link * SparkRunnerRegistrar}. + * + * Note: It's recommended to use {@link + * org.apache.beam.runners.spark.translation.SparkContextFactory#setProvidedSparkContext(JavaSparkContext)} + * instead of {@link SparkContextOptions#setProvidedSparkContext(JavaSparkContext)} for testing. + * When using @{@link org.apache.beam.sdk.testing.Test
[beam] branch master updated: [BEAM-12918] Add PostCommit_Java_Tpcds_Spark job
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c654733b45e [BEAM-12918] Add PostCommit_Java_Tpcds_Spark job new 5c21fbccec5 Merge pull request #15679 from aromanenko-dev/BEAM-12918-tpcds-jenkins c654733b45e is described below commit c654733b45e44ee88af3498133623aa056de6e73 Author: Alexey Romanenko AuthorDate: Thu Oct 7 18:28:12 2021 +0200 [BEAM-12918] Add PostCommit_Java_Tpcds_Spark job --- .../jenkins/job_PostCommit_Java_Tpcds_Spark.groovy | 188 + .../beam/sdk/tpcds/TpcdsParametersReader.java | 3 +- 2 files changed, 190 insertions(+), 1 deletion(-) diff --git a/.test-infra/jenkins/job_PostCommit_Java_Tpcds_Spark.groovy b/.test-infra/jenkins/job_PostCommit_Java_Tpcds_Spark.groovy new file mode 100644 index 000..89d2169cd8b --- /dev/null +++ b/.test-infra/jenkins/job_PostCommit_Java_Tpcds_Spark.groovy @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import NoPhraseTriggeringPostCommitBuilder +import PhraseTriggeringPostCommitBuilder +import InfluxDBCredentialsHelper + +// This job runs the Tpcds benchmark suite against the Spark runner. +NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Tpcds_Spark', +'Spark Runner Tpcds Tests', this) { + description('Runs the Tpcds suite on the Spark runner.') + + // Set common parameters. + commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 240, true, 'beam-perf') + InfluxDBCredentialsHelper.useCredentials(delegate) + + // Gradle goals for this job. + steps { +shell('echo "*** RUN TPCDS IN BATCH MODE USING SPARK 2 RUNNER ***"') +gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + tasks(':sdks:java:testing:tpcds:run') + commonJobProperties.setGradleSwitches(delegate) + switches('-Ptpcds.runner=":runners:spark:2"' + + ' -Ptpcds.args="' + + [ +'--dataSize=1GB', +'--sourceType=PARQUET', +'--dataDirectory=gs://beam-tpcds/datasets/parquet/partitioned', +'--resultsDirectory=gs://beam-tpcds/results/', +'--tpcParallel=1', +'--runner=SparkRunner', +'--queries=3' + ].join(' ')) +} +shell('echo "*** RUN TPCDS IN BATCH MODE USING SPARK 3 RUNNER ***"') +gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + tasks(':sdks:java:testing:tpcds:run') + commonJobProperties.setGradleSwitches(delegate) + switches('-Ptpcds.runner=":runners:spark:3"' + + ' -Ptpcds.args="' + + [ +'--dataSize=1GB', +'--sourceType=PARQUET', +'--dataDirectory=gs://beam-tpcds/datasets/parquet/partitioned', +'--resultsDirectory=gs://beam-tpcds/results/', +'--tpcParallel=1', +'--runner=SparkRunner', +'--queries=3' + ].join(' ')) +} + +shell('echo "*** RUN TPCDS IN BATCH MODE USING SPARK 2 STRUCTURED STREAMING RUNNER ***"') +gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + tasks(':sdks:java:testing:tpcds:run') + commonJobProperties.setGradleSwitches(delegate) + switches('-Ptpcds.runner=":runners:spark:2"' + + ' -Ptpcds.args="' + + [ +'--dataSize=1GB', +
[beam] branch master updated (ec62f3d -> a1a6321)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from ec62f3d Merge pull request #17194 from apache/revert-17066-fixTodoUp new 5bc685e [BEAM-14003] Adds compat for Elasticsearch 8.0.0 new b535270 [BEAM-13136] Removes support for Elasticsearch 2.x new a1a6321 Merge pull request #16953 from egalpin/elasticsearchio-es-8.0-compat The 35120 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../elasticsearch-tests-5/build.gradle | 4 - .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 7 - .../java/org/elasticsearch/bootstrap/JarHell.java | 34 - .../elasticsearch-tests-6/build.gradle | 4 - .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 7 - .../java/org/elasticsearch/bootstrap/JarHell.java | 36 - .../elasticsearch-tests-7/build.gradle | 7 - .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 7 - .../java/org/elasticsearch/bootstrap/JarHell.java | 36 - .../build.gradle | 23 +-- .../sdk/io/elasticsearch/ElasticsearchIOIT.java| 4 +- .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 13 +- .../elasticsearch/ElasticsearchIOTestCommon.java | 60 .../io/elasticsearch/ElasticsearchIOTestUtils.java | 44 -- .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 162 ++--- settings.gradle.kts| 1 + 16 files changed, 157 insertions(+), 292 deletions(-) delete mode 100644 sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/elasticsearch/bootstrap/JarHell.java delete mode 100644 sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/elasticsearch/bootstrap/JarHell.java delete mode 100644 sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/elasticsearch/bootstrap/JarHell.java copy sdks/java/io/elasticsearch-tests/{elasticsearch-tests-common => elasticsearch-tests-8}/build.gradle (83%) copy sdks/java/io/elasticsearch-tests/{elasticsearch-tests-7 => elasticsearch-tests-8}/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java (99%) copy sdks/java/io/elasticsearch-tests/{elasticsearch-tests-7 => elasticsearch-tests-8}/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java (95%)
[beam] branch master updated: [adhoc] Prepare aws2 ClientConfiguration for json serialization and cleanup AWS Module (#16894)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 06e7c20 [adhoc] Prepare aws2 ClientConfiguration for json serialization and cleanup AWS Module (#16894) 06e7c20 is described below commit 06e7c201dbdd8fe37d308b0bad2b1684e85e1dc7 Author: Moritz Mack AuthorDate: Thu Mar 3 14:38:41 2022 +0100 [adhoc] Prepare aws2 ClientConfiguration for json serialization and cleanup AWS Module (#16894) * [adhoc] Prepare aws2 ClientConfiguration and related classes for json serialization and cleanup AWS Module --- sdks/java/io/amazon-web-services2/build.gradle | 1 + .../sdk/io/aws2/common/ClientConfiguration.java| 95 -- .../io/aws2/common/HttpClientConfiguration.java| 23 +- .../sdk/io/aws2/common/RetryConfiguration.java | 43 +- .../apache/beam/sdk/io/aws2/options/AwsModule.java | 80 ++ .../apache/beam/sdk/io/aws2/s3/SSECustomerKey.java | 6 ++ .../io/aws2/common/ClientConfigurationTest.java| 31 ++- .../aws2/common/HttpClientConfigurationTest.java | 49 +++ .../sdk/io/aws2/common/RetryConfigurationTest.java | 22 + .../beam/sdk/io/aws2/options/AwsModuleTest.java| 29 --- .../beam/sdk/io/aws2/s3/SSECustomerKeyTest.java| 17 +++- 11 files changed, 229 insertions(+), 167 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/build.gradle b/sdks/java/io/amazon-web-services2/build.gradle index dceb4c4..817b7b4 100644 --- a/sdks/java/io/amazon-web-services2/build.gradle +++ b/sdks/java/io/amazon-web-services2/build.gradle @@ -30,6 +30,7 @@ ext.summary = "IO library to read and write Amazon Web Services services from Be dependencies { implementation library.java.vendored_guava_26_0_jre + implementation library.java.error_prone_annotations implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.aws_java_sdk2_apache_client implementation library.java.aws_java_sdk2_netty_client diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java index 4371d75..9ee8eb2 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java @@ -19,19 +19,21 @@ package org.apache.beam.sdk.io.aws2.common; import static org.apache.beam.sdk.io.aws2.options.AwsSerializableUtils.deserializeAwsCredentialsProvider; import static org.apache.beam.sdk.io.aws2.options.AwsSerializableUtils.serializeAwsCredentialsProvider; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; import com.google.auto.value.AutoValue; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import com.google.auto.value.extension.memoized.Memoized; import java.io.Serializable; import java.net.URI; import java.util.function.Consumer; import javax.annotation.Nullable; import org.apache.beam.sdk.io.aws2.options.AwsOptions; import org.checkerframework.dataflow.qual.Pure; -import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.regions.Region; @@ -47,18 +49,28 @@ import software.amazon.awssdk.regions.Region; * uses a backoff strategy with equal jitter for computing the delay before the next retry. */ @AutoValue +@JsonInclude(value = JsonInclude.Include.NON_EMPTY) +@JsonDeserialize(builder = ClientConfiguration.Builder.class) public abstract class ClientConfiguration implements Serializable { /** * Optional {@link AwsCredentialsProvider}. If set, this overwrites the default in {@link * AwsOptions#getAwsCredentialsProvider()}. */ - public abstract @Nullable @Pure AwsCredentialsProvider credentialsProvider(); + @JsonProperty + @Memoized + public @Nullable @Pure AwsCredentialsProvider credentialsProvider() { +return credentialsProviderAsJson() != null +? deserializeAwsCredentialsProvider(credentialsProviderAsJson()) +: null; + } /** * Optional {@link Region}. If set, this overwrites the default in {@link
[beam] branch master updated: Use default context output rather than outputWithTimestamp for ElasticsearchIO
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 49d376c Use default context output rather than outputWithTimestamp for ElasticsearchIO new 786d9b9 Merge pull request #16744 from egalpin/elasticsearch-default-output 49d376c is described below commit 49d376c2c581e1cb2d244247f06eaf260ad77d26 Author: egalpin AuthorDate: Fri Feb 4 15:57:52 2022 -0500 Use default context output rather than outputWithTimestamp for ElasticsearchIO --- .../java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 273443a..dc29ac6 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -2359,9 +2359,10 @@ public class ElasticsearchIO { @Override public void output( -TupleTag tag, Document document, Instant timestamp, BoundedWindow ignored) { - // Note: window is intentionally unused, but required as a param to fit the interface - context.outputWithTimestamp(tag, document, timestamp); +TupleTag tag, Document document, Instant ignored1, BoundedWindow ignored2) { + // Note: window and timestamp are intentionally unused, but required as params to fit the + // interface + context.output(tag, document); } }
[beam] branch master updated (1c579f9 -> 6d60ac3)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 1c579f9 Remove build status from PR (#16902) add 6d60ac3 [BEAM-13738] Reenable ignored SQS test after bumping elasticmq for fixed version (#16914) No new revisions were added by this update. Summary of changes: sdks/java/io/amazon-web-services2/build.gradle | 2 +- .../org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReaderTest.java | 2 -- .../io/amazon-web-services2/src/test/resources/application.conf | 6 +++--- 3 files changed, 4 insertions(+), 6 deletions(-) copy playground/frontend/app.yaml => sdks/java/io/amazon-web-services2/src/test/resources/application.conf (79%)
[beam] branch master updated (0037bf3 -> f87f146)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 0037bf3 [release-2.36.0] Fix pickler argument for 2.36 blog (#16774) add f87f146 [adhoc] Avoid using SerializablePipelineOptions for testing to minimize dependencies (#16881) No new revisions were added by this update. Summary of changes: .../beam/sdk/io/aws2/options/AwsOptionsTest.java | 8 +++ .../beam/sdk/io/aws2/options/S3OptionsTest.java| 7 +++--- .../io/aws2/options/SerializationTestUtil.java}| 26 +- 3 files changed, 22 insertions(+), 19 deletions(-) copy sdks/java/io/{synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/SyntheticTestUtils.java => amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java} (55%)
[beam] branch master updated: [BEAM-13510] Don't retry on invalid SQS receipt handles.
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 00208fb [BEAM-13510] Don't retry on invalid SQS receipt handles. new e1a3fdf Merge pull request #16478 from mosche/BEAM-13510-InvalidRetries 00208fb is described below commit 00208fbb62feb04260ac07d3ba64c9fd316aaeea Author: Moritz Mack AuthorDate: Wed Dec 29 15:40:42 2021 +0100 [BEAM-13510] Don't retry on invalid SQS receipt handles. --- sdks/java/io/amazon-web-services2/build.gradle | 2 +- .../beam/sdk/io/aws2/sqs/SqsCheckpointMark.java| 33 ++- .../beam/sdk/io/aws2/sqs/SqsUnboundedReader.java | 288 + .../beam/sdk/io/aws2/sqs/EmbeddedSqsServer.java| 69 +++-- .../org/apache/beam/sdk/io/aws2/sqs/SqsIOTest.java | 15 +- .../sdk/io/aws2/sqs/SqsUnboundedReaderTest.java| 213 --- .../sdk/io/aws2/sqs/SqsUnboundedSourceTest.java| 22 +- 7 files changed, 371 insertions(+), 271 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/build.gradle b/sdks/java/io/amazon-web-services2/build.gradle index 2fe845d..b8aa0ec 100644 --- a/sdks/java/io/amazon-web-services2/build.gradle +++ b/sdks/java/io/amazon-web-services2/build.gradle @@ -62,7 +62,7 @@ dependencies { testImplementation library.java.mockito_core testImplementation library.java.guava_testlib testImplementation library.java.junit - testImplementation 'org.elasticmq:elasticmq-rest-sqs_2.12:0.15.6' + testImplementation 'org.elasticmq:elasticmq-rest-sqs_2.12:0.15.6' // later versions conflict with s3mock testImplementation library.java.hamcrest testImplementation library.java.powermock testImplementation library.java.powermock_mockito diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java index 2a9a630..823f96c 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsCheckpointMark.java @@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import java.io.IOException; import java.io.Serializable; +import java.util.Collection; import java.util.List; import java.util.Optional; import org.apache.beam.sdk.io.UnboundedSource; @@ -35,28 +36,38 @@ import org.checkerframework.checker.nullness.qual.Nullable; class SqsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable { /** - * If the checkpoint is for persisting: the reader who's snapshotted state we are persisting. If - * the checkpoint is for restoring: {@literal null}. Not persisted in durable checkpoint. CAUTION: - * Between a checkpoint being taken and {@link #finalizeCheckpoint()} being called the 'true' - * active reader may have changed. + * The reader this checkpoint was created from. + * + * Not persisted in durable checkpoint, {@code null} after restoring the checkpoint. + * + * CAUTION:Between a checkpoint being taken and {@link #finalizeCheckpoint()} being + * called the 'true' active reader may have changed. */ private transient Optional reader; + /** - * If the checkpoint is for persisting: The ids of messages which have been passed downstream - * since the last checkpoint. If the checkpoint is for restoring: {@literal null}. Not persisted - * in durable checkpoint. + * Contains message ids that have been passed downstream since the last checkpoint. + * + * Corresponding messages have to be purged from SQS when finalizing the checkpoint to prevent + * re-delivery. + * + * Not persisted in durable checkpoint, {@code null} after restoring the checkpoint. */ private @Nullable List safeToDeleteIds; /** - * If the checkpoint is for persisting: The receipt handles of messages which have been received - * from SQS but not yet passed downstream at the time of the snapshot. If the checkpoint is for - * restoring: Same, but recovered from durable storage. + * Contains receipt handles of messages which have been received from SQS, but not yet passed + * downstream at the time of the snapshot. + * + * When restoring from a checkpoint, the visibility timeout of corresponding messages is set to + * {@code 0} to trigger immediate re-delivery. */ @VisibleForTesting final List notYetReadReceipts; SqsCheckpointMark( - SqsUnboundedReader reader, List messagesToDelete, List notYetReadReceipts) { + SqsUnboundedReader reader, + Collection messagesToDelete, + Collection notYetReadReceipts) { this.
[beam] branch master updated: Merge pull request #16309: [BEAM-13503] Set a default value to throwWriteErrors in BulkIO constructor
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new fd3f207 Merge pull request #16309: [BEAM-13503] Set a default value to throwWriteErrors in BulkIO constructor fd3f207 is described below commit fd3f2073bfc91ab866d97ae2b3591261212be9e7 Author: Kyle Hersey AuthorDate: Wed Jan 19 04:18:19 2022 -0500 Merge pull request #16309: [BEAM-13503] Set a default value to throwWriteErrors in BulkIO constructor [BEAM-13503] Set a default value to throwWriteErrors in BulkIO constructor --- .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 6 ++ .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 6 ++ .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 6 ++ .../io/elasticsearch/ElasticsearchIOTestCommon.java | 19 +++ .../beam/sdk/io/elasticsearch/ElasticsearchIO.java| 1 + 5 files changed, 38 insertions(+) diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 4023b24..6dd2c69 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -265,6 +265,12 @@ public class ElasticsearchIOTest implements Serializable { } @Test + public void testDocToBulkAndBulkIO() throws Exception { +elasticsearchIOTestCommon.setPipeline(pipeline); +elasticsearchIOTestCommon.testDocToBulkAndBulkIO(); + } + + @Test public void testDocumentCoder() throws Exception { elasticsearchIOTestCommon.testDocumentCoder(); } diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 3e77be8..9b74161 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -258,6 +258,12 @@ public class ElasticsearchIOTest implements Serializable { } @Test + public void testDocToBulkAndBulkIO() throws Exception { +elasticsearchIOTestCommon.setPipeline(pipeline); +elasticsearchIOTestCommon.testDocToBulkAndBulkIO(); + } + + @Test public void testDocumentCoder() throws Exception { elasticsearchIOTestCommon.testDocumentCoder(); } diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 4d78666..094e3e4 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -259,6 +259,12 @@ public class ElasticsearchIOTest implements Serializable { } @Test + public void testDocToBulkAndBulkIO() throws Exception { +elasticsearchIOTestCommon.setPipeline(pipeline); +elasticsearchIOTestCommon.testDocToBulkAndBulkIO(); + } + + @Test public void testDocumentCoder() throws Exception { elasticsearchIOTestCommon.testDocumentCoder(); } diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java index 9bdcaee..981a151 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java @@ -287,6 +287,25 @@ class ElasticsearchIOTestCommon implements Serializable { executeWriteTest(write); } + /** Test that DocToBulk and BulkIO can be constructed and operate independently of Write. */ + void testDocToBulkAndBulkIO() throws Exception { +DocToBulk docToBulk = + ElasticsearchIO.docToBulk
[beam] branch master updated: Merge pull request #16507: [BEAM-13137] Fixes ES utest size flakiness with _flush api and index.store.stats_refresh_interval=0
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 8c2e1fd Merge pull request #16507: [BEAM-13137] Fixes ES utest size flakiness with _flush api and index.store.stats_refresh_interval=0 8c2e1fd is described below commit 8c2e1fd6109871496457c45ddcbfb10747c3cdcf Author: Evan Galpin AuthorDate: Mon Jan 17 04:03:11 2022 -0500 Merge pull request #16507: [BEAM-13137] Fixes ES utest size flakiness with _flush api and index.store.stats_refresh_interval=0 * [BEAM-13137] Fixes ES utest size flakiness with _flush api and index.store.stats_refresh_interval=0 --- .../sdk/io/elasticsearch/ElasticsearchIOTest.java| 3 --- .../sdk/io/elasticsearch/ElasticsearchIOTest.java| 3 --- .../sdk/io/elasticsearch/ElasticsearchIOTest.java| 3 --- .../io/elasticsearch/ElasticsearchIOTestCommon.java | 4 ++-- .../io/elasticsearch/ElasticsearchIOTestUtils.java | 20 +--- 5 files changed, 15 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index da593bf..4023b24 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -33,7 +33,6 @@ import org.elasticsearch.client.RestClient; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -84,7 +83,6 @@ public class ElasticsearchIOTest implements Serializable { @Rule public TestPipeline pipeline = TestPipeline.create(); - @Ignore("https://issues.apache.org/jira/browse/BEAM-5172";) @Test public void testSizes() throws Exception { // need to create the index using the helper method (not create it at first insertion) @@ -156,7 +154,6 @@ public class ElasticsearchIOTest implements Serializable { elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes(); } - @Ignore("https://issues.apache.org/jira/browse/BEAM-5172";) @Test public void testSplit() throws Exception { // need to create the index using the helper method (not create it at first insertion) diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index d9124cd..3e77be8 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -33,7 +33,6 @@ import org.elasticsearch.client.RestClient; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -83,7 +82,6 @@ public class ElasticsearchIOTest implements Serializable { @Rule public TestPipeline pipeline = TestPipeline.create(); - @Ignore("https://issues.apache.org/jira/browse/BEAM-5172";) @Test public void testSizes() throws Exception { // need to create the index using the helper method (not create it at first insertion) @@ -155,7 +153,6 @@ public class ElasticsearchIOTest implements Serializable { elasticsearchIOTestCommon.testWriteWithMaxBatchSizeBytes(); } - @Ignore("https://issues.apache.org/jira/browse/BEAM-5172";) @Test public void testSplit() throws Exception { // need to create the index using the helper method (not create it at first insertion) diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 761e60a..4d78666 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -33,7 +33,6 @@ import org.elasticsearch.client.RestClien
[beam] branch master updated: [BEAM-13233] Replace AWS API used to list shards from DescribeStream to ListShards
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 95ec22b [BEAM-13233] Replace AWS API used to list shards from DescribeStream to ListShards new 67bcf1e Merge pull request #15973 from mosche/BEAM-13233-aws2-list-shards 95ec22b is described below commit 95ec22b64eb5a12a8bf08068bfdbce614b1cf2ab Author: mosche AuthorDate: Mon Nov 15 12:24:01 2021 +0100 [BEAM-13233] Replace AWS API used to list shards from DescribeStream to ListShards --- CHANGES.md | 1 + .../aws2/kinesis/DynamicCheckpointGenerator.java | 24 +- .../io/aws2/kinesis/KinesisReaderCheckpoint.java | 5 + .../beam/sdk/io/aws2/kinesis/ShardReadersPool.java | 46 +++- .../sdk/io/aws2/kinesis/ShardRecordsIterator.java | 6 +- .../io/aws2/kinesis/SimplifiedKinesisClient.java | 170 +--- .../beam/sdk/io/aws2/kinesis/StartingPoint.java| 7 +- .../io/aws2/kinesis/StartingPointShardsFinder.java | 201 -- .../sdk/io/aws2/kinesis/AmazonKinesisMock.java | 73 +++-- .../kinesis/DynamicCheckpointGeneratorTest.java| 36 +-- .../sdk/io/aws2/kinesis/KinesisMockReadTest.java | 15 +- .../sdk/io/aws2/kinesis/ShardReadersPoolTest.java | 8 +- .../aws2/kinesis/SimplifiedKinesisClientTest.java | 306 +++-- .../kinesis/StartingPointShardsFinderTest.java | 292 14 files changed, 529 insertions(+), 661 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 06a9069..00192c1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -83,6 +83,7 @@ * X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * A new URN convention was adopted for cross-language transforms and existing URNs were updated. This may break advanced use-cases, for example, if a custom expansion service is used to connect diffrent Beam Java and Python versions. ([BEAM-12047](https://issues.apache.org/jira/browse/BEAM-12047)). * The upgrade to Calcite 1.28.0 introduces a breaking change in the SUBSTRING function in SqlTransform, when used with the Calcite dialect ([BEAM-13099](https://issues.apache.org/jira/browse/BEAM-13099), [CALCITE-4427](https://issues.apache.org/jira/browse/CALCITE-4427)). +* ListShards (with DescribeStreamSummary) is used instead of DescribeStream to list shards in Kinesis streams (AWS SDK v2). Due to this change, as mentioned in [AWS documentation](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html), for fine-grained IAM policies it is required to update them to allow calls to ListShards and DescribeStreamSummary APIs. For more information, see [Controlling Access to Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams [...] ## Deprecations diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/DynamicCheckpointGenerator.java index cd81ea2..3b23349 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/DynamicCheckpointGenerator.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/DynamicCheckpointGenerator.java @@ -17,9 +17,7 @@ */ package org.apache.beam.sdk.io.aws2.kinesis; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; - -import java.util.Set; +import java.util.List; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,35 +32,23 @@ class DynamicCheckpointGenerator implements CheckpointGenerator { private static final Logger LOG = LoggerFactory.getLogger(DynamicCheckpointGenerator.class); private final String streamName; private final StartingPoint startingPoint; - private final StartingPointShardsFinder startingPointShardsFinder; public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) { this.streamName = streamName; this.startingPoint = startingPoint; -this.startingPointShardsFinder = new StartingPointShardsFinder(); - } - - public DynamicCheckpointGenerator( - String streamName, - StartingPoint startingPoint, - StartingPointShardsFinder startingPointShardsFinder) { -this.streamName = checkNotNull(streamName, "streamName"); -this.startingPoint = checkNotNull(startingPoint, "startingPoint"); -this.startingPointShardsFinder = -checkNotNull(startingPointShardsFinder, "startingPointShardsFinder"); } @Override public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis) throws TransientKinesisException {
[beam] branch master updated (8578c9a -> a05aa45)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 8578c9a [BEAM-13079] Updates cross-language transform URNs to use the new convention (#15748) new 6bfd83d [BEAM-10990] Adds response filtering for ElasticsearchIO new f7691e6 [BEAM-5172] Tries to reduce ES uTest flakiness new a05aa45 Merge pull request #15381 from egalpin/BEAM-10990-elasticsearch-response-filtering The 33290 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../elasticsearch-tests-5/build.gradle | 4 + .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 26 +- .../elasticsearch-tests-6/build.gradle | 4 + .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 26 +- .../elasticsearch-tests-7/build.gradle | 6 +- .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 26 +- .../elasticsearch/ElasticsearchIOTestCommon.java | 184 +++-- .../io/elasticsearch/ElasticsearchIOTestUtils.java | 26 +- .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 456 + 9 files changed, 647 insertions(+), 111 deletions(-)
[beam] branch master updated (b362a53 -> d930983)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from b362a53 [BEAM-12876] Adding doc and glossary entry for resource hints (#15499) add d930983 [BEAM-12153] revert "implement GroupByKey with CombinePerKey with Concatenate CombineFn" because of performance issues. (#15508) No new revisions were added by this update. Summary of changes: .../batch/GroupByKeyTranslatorBatch.java | 100 + 1 file changed, 40 insertions(+), 60 deletions(-)
[beam] branch master updated (291aa6c -> 7bfeca2)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 291aa6c [BEAM-6516] Fixes race condition in RabbitMqIO causing duplicate acks (#15157) add 7bfeca2 [BEAM-12601] Add append-only option (#15257) No new revisions were added by this update. Summary of changes: CHANGES.md | 1 + .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 12 .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 12 .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 12 .../elasticsearch/ElasticsearchIOTestCommon.java | 19 + .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 80 -- 6 files changed, 114 insertions(+), 22 deletions(-)
[beam] branch master updated (39cf3fc -> 2144cab)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 39cf3fc Merge pull request #15264 from [BEAM-12670] Relocate bq client exception imports to try block and conditionally turn off tests if imports fail new c4508c8 [BEAM-12591] Put Spark Structured Streaming runner sources back to main src folder new 473d187 [BEAM-12629] As spark DataSourceV2 is only available for spark 2, provide a DataSourceV2 based impl for spark 2 and create a structure for extension with a spark 3 source. new ad6bea8 [BEAM-12627] Deal with spark Encoders braking change between spark 2 and spark 3 by providing an implementation for each of them. new f0014d9 [BEAM-12591] move SchemaHelpers to correct package new fd9bb74 [BEAM-8470] Disable wait for termination in a streaming pipeline because it is infinite by definition new 94ce5d3 [BEAM-12630] Deal with breaking change in streaming pipelines start by introducing an AbstractTranslationContext and version specific implementations new b8dc86c [BEAM-12629] Make source tests spark version agnostic and move them back to common spark module new b1d5dc4 [BEAM-12629] Make a spark 3 source impl new 75247cb [BEAM-12591] Fix checkstyle and spotless new e10b2eb [BEAM-12629] Reduce serializable to only needed classes and Fix schema inference new cc3ff98 [BEAM-12591] Add checkstyle exceptions for version specific classes because checkstyle does not correctly detect package files across multiple source directories new 81033b1 [BEAM-12629] Fix sources javadocs and improve impl new 23fd65d [BEAM-12591] Add spark 3 to structured streaming validates runner tests new 2144cab Merge pull request #15218 from echauchot/BEAM-7093-spark3-fix-for-SS-runner The 32603 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: ...ValidatesRunner_SparkStructuredStreaming.groovy | 1 + .../translation/TranslationContext.java| 240 + .../translation/batch/DatasetSourceBatch.java | 9 +- .../translation/helpers/EncoderFactory.java| 54 + .../streaming/DatasetSourceStreaming.java | 9 +- .../translation/batch/SimpleSourceTest.java| 101 - .../translation/TranslationContext.java} | 29 ++- .../translation/batch/DatasetSourceBatch.java | 240 + .../translation/helpers/EncoderFactory.java| 49 + .../streaming/DatasetSourceStreaming.java} | 9 +- .../spark/structuredstreaming/Constants.java} | 8 +- .../SparkStructuredStreamingPipelineOptions.java | 0 .../SparkStructuredStreamingPipelineResult.java| 0 .../SparkStructuredStreamingRunner.java| 6 +- .../SparkStructuredStreamingRunnerRegistrar.java | 0 .../aggregators/AggregatorsAccumulator.java| 0 .../aggregators/NamedAggregators.java | 0 .../aggregators/NamedAggregatorsAccumulator.java | 0 .../aggregators/package-info.java | 0 .../structuredstreaming/examples/WordCount.java| 0 .../metrics/AggregatorMetric.java | 0 .../metrics/AggregatorMetricSource.java| 0 .../metrics/CompositeSource.java | 0 .../metrics/MetricsAccumulator.java| 0 .../MetricsContainerStepMapAccumulator.java| 0 .../metrics/SparkBeamMetric.java | 0 .../metrics/SparkBeamMetricSource.java | 0 .../metrics/SparkMetricsContainerStepMap.java | 0 .../metrics/WithMetricsSupport.java| 0 .../structuredstreaming/metrics/package-info.java | 0 .../metrics/sink/CodahaleCsvSink.java | 0 .../metrics/sink/CodahaleGraphiteSink.java | 0 .../metrics/sink/package-info.java | 0 .../spark/structuredstreaming/package-info.java| 0 .../translation/AbstractTranslationContext.java} | 19 +- .../translation/PipelineTranslator.java| 4 +- .../translation/SparkTransformOverrides.java | 0 .../translation/TransformTranslator.java | 2 +- .../translation/batch/AggregatorCombiner.java | 0 .../batch/CombinePerKeyTranslatorBatch.java| 4 +- .../CreatePCollectionViewTranslatorBatch.java | 4 +- .../translation/batch/DoFnFunction.java| 0 .../translation/batch/DoFnRunnerWithMetrics.java | 0 .../translation/batch/FlattenTranslatorBatch.java | 4 +- .../batch/GroupByKeyTranslatorBatch.java | 4 +- .../translation/batch/ImpulseTranslatorBatch.java | 4 +- .../translation/batch/ParDo
[beam] branch master updated (4cc279c -> 4f1f1c1)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 4cc279c Merge pull request #14718: [BEAM-12276] Narrow check for output timestamp of event time timers add 4f1f1c1 [BEAM-12093] Overhaul ElasticsearchIO.Write (#14347) No new revisions were added by this update. Summary of changes: CHANGES.md |2 + .../sdk/io/elasticsearch/ElasticsearchIOIT.java| 26 + .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 23 + .../elasticsearch-tests-5/build.gradle |2 + .../sdk/io/elasticsearch/ElasticsearchIOIT.java| 32 + .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 31 + .../elasticsearch-tests-6/build.gradle |2 + .../sdk/io/elasticsearch/ElasticsearchIOIT.java| 32 + .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 31 + .../elasticsearch-tests-7/build.gradle |2 + .../sdk/io/elasticsearch/ElasticsearchIOIT.java| 32 + .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 31 + .../elasticsearch/ElasticsearchIOTestCommon.java | 320 - .../io/elasticsearch/ElasticsearchIOTestUtils.java | 171 ++- sdks/java/io/elasticsearch/OWNERS |1 + .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 1277 +++- 16 files changed, 1691 insertions(+), 324 deletions(-)
[beam] branch master updated (f0db6e7 -> 38464d1)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from f0db6e7 Merge pull request #13640: [BEAM-11543] Increase Spanner streaming read timeout to 2 hours new 995c776 Remove unused endOfStreamMonitor from nexmark new b0b8ffd Also rename metric new 38464d1 Merge pull request #13435 from rHermes/remove-unused-monitor-in-nexmark The 30159 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../src/main/java/org/apache/beam/sdk/nexmark/Monitor.java | 13 - .../java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java | 12 ++-- .../org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java | 3 --- 3 files changed, 10 insertions(+), 18 deletions(-)
[beam] branch master updated: Nexmark: fix typos and clean unused options
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 6122341 Nexmark: fix typos and clean unused options new 4507139 Merge pull request #13403 from rHermes/nexmark-misc-improvements 6122341 is described below commit 612234198071a3c2ab76dd8386529f7b53ac4efc Author: rHermes AuthorDate: Sun Nov 22 09:40:29 2020 +0100 Nexmark: fix typos and clean unused options This removes some unused options from launcher, fixes the justModelResultRate mode by fixing the formatting string, uses the right name for the SessionSideInputJoin and renames the argument in one launcher option, just for consistency. --- .../beam/sdk/nexmark/NexmarkConfiguration.java | 2 +- .../apache/beam/sdk/nexmark/NexmarkLauncher.java | 4 +-- .../apache/beam/sdk/nexmark/NexmarkOptions.java| 30 +- .../sdk/nexmark/queries/SessionSideInputJoin.java | 2 +- 4 files changed, 5 insertions(+), 33 deletions(-) diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java index db9586a..1108eb6 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java @@ -320,7 +320,7 @@ public class NexmarkConfiguration implements Serializable { fanout = options.getFanout(); } if (options.getMaxAuctionsWaitingTime() != null) { - fanout = options.getMaxAuctionsWaitingTime(); + maxAuctionsWaitingTime = options.getMaxAuctionsWaitingTime(); } if (options.getOccasionalDelaySec() != null) { occasionalDelaySec = options.getOccasionalDelaySec(); diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index 4ddeb34..fd2044f 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -1101,10 +1101,10 @@ public class NexmarkLauncher { Collections.sort(counts); int n = counts.size(); if (n < 5) { - NexmarkUtils.console("Query%s: only %d samples", model.configuration.query, n); + NexmarkUtils.console("Query %s: only %d samples", model.configuration.query, n); } else { NexmarkUtils.console( - "Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d", + "Query %s: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d", model.configuration.query, n, counts.get(0), diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java index 508b589..31a459e 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkOptions.java @@ -349,7 +349,7 @@ public interface NexmarkOptions @Nullable Integer getMaxAuctionsWaitingTime(); - void setMaxAuctionsWaitingTime(Integer fanout); + void setMaxAuctionsWaitingTime(Integer maxAuctionsWaitingTime); @Description("Length of occasional delay to impose on events (in seconds).") @Nullable @@ -414,28 +414,6 @@ public interface NexmarkOptions void setRunningTimeMinutes(Long value); @Description( - "If set and --monitorJobs is true, check that the system watermark is never more " - + "than this far behind real time") - @Nullable - Long getMaxSystemLagSeconds(); - - void setMaxSystemLagSeconds(Long value); - - @Description( - "If set and --monitorJobs is true, check that the data watermark is never more " - + "than this far behind real time") - @Nullable - Long getMaxDataLagSeconds(); - - void setMaxDataLagSeconds(Long value); - - @Description("Only start validating watermarks after this many seconds") - @Nullable - Long getWatermarkValidationDelaySeconds(); - - void setWatermarkValidationDelaySeconds(Long value); - - @Description( "Specify 'sql' to use Calcite SQL queries " + "or 'zetasql' to use ZetaSQL queries." + "Otherwise Java transforms will be used") @@ -480,12 +458,6 @@ public interface NexmarkOptions void setNexmarkParallel(int value); - @Description("InfluxDB measurement to
[beam] branch master updated (2f47b82 -> cbd9aa2)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 2f47b82 Merge pull request #12451: [BEAM-10602] Use python_streaming_pardo_5 table for latency results new a5eab74 [BEAM-10471] issue a JMX call to cassandra StorageServiceMbeam.refreshSizeEstimates after insertion to update the cassandra internal size estimates to have consistent results in CassandraIOTest.testEstimatedSizeBytes new e3ea99b [BEAM-10471] fix a wrong comment new cbd9aa2 Merge pull request #12432 from echauchot/BEAM-10471-force-refresh-size-estimates The 28180 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java| 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-)
[beam] branch master updated: [BEAM-10471] Change assertion method to improve expressiveness and increase error margin on non deterministic size test to avoid flakiness
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 2003985 [BEAM-10471] Change assertion method to improve expressiveness and increase error margin on non deterministic size test to avoid flakiness new bedae33 Merge pull request #12276 from echauchot/BEAM-10471-cassandraIO-size-flakiness 2003985 is described below commit 20039857d808b4a94e7990e76b7092350d249a91 Author: Etienne Chauchot AuthorDate: Thu Jul 16 12:18:31 2020 +0200 [BEAM-10471] Change assertion method to improve expressiveness and increase error margin on non deterministic size test to avoid flakiness --- .../java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index 0f91448..fda20e8 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -23,6 +23,7 @@ import static org.apache.beam.sdk.io.cassandra.CassandraIO.CassandraSource.getEs import static org.apache.beam.sdk.io.cassandra.CassandraIO.CassandraSource.getRingFraction; import static org.apache.beam.sdk.io.cassandra.CassandraIO.CassandraSource.isMurmur3Partitioner; import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -287,8 +288,12 @@ public class CassandraIOTest implements Serializable { .withTable(CASSANDRA_TABLE); CassandraIO.CassandraSource source = new CassandraIO.CassandraSource<>(read, null); long estimatedSizeBytes = source.getEstimatedSizeBytes(pipelineOptions); -// the size is non determanistic in Cassandra backend -assertTrue((estimatedSizeBytes >= 12960L * 0.9f) && (estimatedSizeBytes <= 12960L * 1.1f)); +// the size is non determanistic in Cassandra backend: checks that estimatedSizeBytes >= 12960L +// -20% && estimatedSizeBytes <= 12960L +20% +assertThat( +"wrong estimated size in " + CASSANDRA_KEYSPACE + "/" + CASSANDRA_TABLE, +(double) estimatedSizeBytes, +closeTo(12960.0D, 2592.0D)); } @Test
[beam] branch master updated (3384f62 -> 9eab347)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 3384f62 Merge pull request #11990: [BEAM-] Remove Apex runner new 6499899 [BEAM-10082] Remove dependency on java.xml.ws. new 02f0d9a Remove deprecated MetricName methods. new 9eab347 Merge pull request #12001 from tysonjh/BEAM-10082 The 27512 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runners/extensions/metrics/MetricsHttpSink.java | 15 +++ .../java/org/apache/beam/sdk/metrics/MetricName.java | 20 2 files changed, 11 insertions(+), 24 deletions(-)
[beam] branch master updated (0c0302d -> e1963c1)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 0c0302d Merge pull request #11777: [BEAM-10054] Fix watermark hold for on_time_pane add 6eb8733 [BEAM-10017] Expose Cassandra Connect/Read timeouts add e2b6abe [BEAM-10017] Minor fixes of javadoc and error messages add e1963c1 Merge pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts No new revisions were added by this update. Summary of changes: .../apache/beam/sdk/io/cassandra/CassandraIO.java | 147 - 1 file changed, 141 insertions(+), 6 deletions(-)
[beam] branch master updated (209180f -> 6e8e566)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 209180f Merge pull request #11424: Remove outdated doc for ReadFromBigQuery transform new 289cb66 Add GBK load tests script for spark structured streaming runner new 210658a Add CoGBK load tests script for spark structured streaming runner new 1709a6b Add Combine load tests script for spark structured streaming runner new 8a14cf0 Add Pardo load tests script for spark structured streaming runner new 6e8e566 Merge pull request #11135 from echauchot/add_spark-ss-runner-to-GBK-load-tests The 26571 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: ...s_CoGBK_Java_spark_structured_streaming.groovy} | 68 -- ...Combine_Java_spark_structured_streaming.groovy} | 63 +++-- ...sts_GBK_Java_spark_structured_streaming.groovy} | 100 ++--- ...s_ParDo_Java_spark_structured_streaming.groovy} | 68 -- 4 files changed, 74 insertions(+), 225 deletions(-) copy .test-infra/jenkins/{job_LoadTests_CoGBK_Java.groovy => job_LoadTests_CoGBK_Java_spark_structured_streaming.groovy} (76%) copy .test-infra/jenkins/{job_LoadTests_Combine_Java.groovy => job_LoadTests_Combine_Java_spark_structured_streaming.groovy} (69%) copy .test-infra/jenkins/{job_LoadTests_GBK_Java.groovy => job_LoadTests_GBK_Java_spark_structured_streaming.groovy} (75%) copy .test-infra/jenkins/{job_LoadTests_ParDo_Java.groovy => job_LoadTests_ParDo_Java_spark_structured_streaming.groovy} (71%)
[beam] branch master updated (5e6ba4f -> 5989def)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 5e6ba4f Merge pull request #11037: [BEAM-9434] Improve Spark runner reshuffle translation to maximize parallelism new 0859001 [BEAM-5980] Change load-tests build to include spark-sql for spark structured streaming runner new 7356a47 [BEAM-9436] avoid one flatmap step and a KV creation per element by doing the (mandatory for ReducefnRunner) materialization when grouping by windows. new 5989def Merge pull request #11055 from echauchot/BEAM-9436-avoid-materialization-in-GBK The 26379 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../batch/GroupByKeyTranslatorBatch.java | 35 ++ .../GroupAlsoByWindowViaOutputBufferFn.java| 20 + sdks/java/testing/load-tests/build.gradle | 1 + 3 files changed, 17 insertions(+), 39 deletions(-)
[beam] branch master updated (87b8235 -> 5257ed3)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 87b8235 Merge pull request #10646 from brucearctor/master new c7914bd [BEAM-9205] Add ValidatesRunner annotation to the MetricsPusherTest class (not only on the methods) and remove unneeded MetricsPusherTest inside spark structured streaming runner (because this runner uses the VR common test) new bc9243a [BEAM-9205] Fix validatesRunner tests configuration in spark module new 5257ed3 Merge pull request #10669 from echauchot/metricsPusher_validates_runner The 25323 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runners/core/metrics/MetricsPusherTest.java| 2 +- runners/spark/build.gradle | 17 +++- .../metrics/MetricsPusherTest.java | 92 -- 3 files changed, 15 insertions(+), 96 deletions(-) delete mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/metrics/MetricsPusherTest.java
[beam] branch master updated (0bdd5c2 -> 331e104)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 0bdd5c2 Merge pull request #10536 from Hannah-Jiang/add_running_mode_to_fnapi add a3040d0 [BEAM-9019] Remove BeamCoderWrapper to avoid extra object allocation and move EncodersTest in correct package. add 331e104 Merge pull request #10461: [BEAM-9019] Remove BeamCoderWrapper to avoid extra object allocation No new revisions were added by this update. Summary of changes: .../translation/SchemaHelpers.java | 12 +- .../translation/helpers/EncoderHelpers.java| 140 + .../helpers/EncoderHelpersTest.java} | 15 +-- 3 files changed, 69 insertions(+), 98 deletions(-) rename runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/{utils/EncodersTest.java => translation/helpers/EncoderHelpersTest.java} (79%)
[beam] branch master updated (025f849 -> f5d8bcd)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from 025f849 Merge pull request #10450: [BEAM-9019] Improve Encoders: replace as much as possible of catalyst generated code with java compiled code new 581c957 [BEAM-5192] Migrate ElasticsearchIO to v7 new 05afd22 [BEAM-5192] Minor change of ESIO public configuration API: MaxRetryTimeout support was removed in ES 7.0. So, as we use the same REST client for all versions of ESIO, change the socketAndRetryTimeout parameter to only socketTimeout parameter. new 22468b1 [BEAM-5192] Fix missing ifs for ES7 specificities. new aa89952 [BEAM-5192] Remove unneeded transitive dependencies, upgrade ES and randomized testing minor versions new 49dc83b [BEAM-5192] Disable MockHttpTransport plugin to enabe http dialog to embedded ES 7. new b2db1ae [BEAM-5192] Fix util class, elasticsearch changed their json output of search request ! new 6edec1e [BEAM-5192] Set a custom json serializer for document metadata to be able to change serialization of metadata at runtime because ElasticSearch change it in version 7 new 534a462 [BEAM-5192] Remove testWritePartialUpdateWithErrors because triggering and error by setting a mapping on a type + a wrong insert is no more supported on ES7. And this test only ensured that the error message in case of write error was correct which is already tested in testWriteWithErrors. new 233ec7d [BEAM-5192] use <= and >= in version specific code instead of == to be future proof. There is still a check in getBackendVersion that protects us from running on a untested version of Elasticsearch. new f5d8bcd Merge pull request #10433 from echauchot/BEAM-5192-ES7 The 24818 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 20 +--- .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 15 +-- .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 15 +-- .../build.gradle | 14 +-- .../src/test/contrib/create_elk_container.sh | 2 +- .../sdk/io/elasticsearch/ElasticsearchIOIT.java| 8 +- .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 25 ++-- .../java/org/elasticsearch/bootstrap/JarHell.java | 0 .../elasticsearch-tests-common/build.gradle| 19 ++-- .../elasticsearch/ElasticsearchIOTestCommon.java | 63 --- .../io/elasticsearch/ElasticsearchIOTestUtils.java | 72 ++-- sdks/java/io/elasticsearch/build.gradle| 6 +- .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 126 + settings.gradle| 1 + 14 files changed, 181 insertions(+), 205 deletions(-) copy sdks/java/io/elasticsearch-tests/{elasticsearch-tests-5 => elasticsearch-tests-7}/build.gradle (89%) copy sdks/java/io/elasticsearch-tests/{elasticsearch-tests-6 => elasticsearch-tests-7}/src/test/contrib/create_elk_container.sh (94%) copy sdks/java/io/elasticsearch-tests/{elasticsearch-tests-6 => elasticsearch-tests-7}/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java (97%) copy sdks/java/io/elasticsearch-tests/{elasticsearch-tests-6 => elasticsearch-tests-7}/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java (88%) copy sdks/java/io/elasticsearch-tests/{elasticsearch-tests-6 => elasticsearch-tests-7}/src/test/java/org/elasticsearch/bootstrap/JarHell.java (100%)
[beam] branch master updated (b7e464b -> 2080520)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from b7e464b [BEAM-8920] Go SDK: faster transforms/filter.Distinct with CombinePerKey new 4dae3ae [BEAM-8830] flatten: better detect empty collections new 62bd3ff [BEAM-8830] Fix empty dataset creation: use the beam coder wrapper that sets the schema to binary. new a9a68a8 [BEAM-8830] Exclude failing SDF tests new 2080520 Merge pull request #10293 from echauchot/BEAM-8830-flattens-spark The 24622 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: runners/spark/build.gradle | 9 - .../structuredstreaming/translation/TranslationContext.java| 5 +++-- .../translation/batch/FlattenTranslatorBatch.java | 10 -- 3 files changed, 11 insertions(+), 13 deletions(-)
[beam] branch master updated: [BEAM-8470] Exclude failed ValidatesRunner tests
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 9ceb449 [BEAM-8470] Exclude failed ValidatesRunner tests new 3e7865e Merge pull request #10221 from aromanenko-dev/BEAM-8470-exclude-failed-tests 9ceb449 is described below commit 9ceb449208be84edddb3d3c8a648cef6ea93c3dd Author: Alexey Romanenko AuthorDate: Tue Nov 26 15:19:11 2019 +0100 [BEAM-8470] Exclude failed ValidatesRunner tests --- runners/spark/build.gradle | 14 ++ 1 file changed, 14 insertions(+) diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle index 7c11b87..5e876d0 100644 --- a/runners/spark/build.gradle +++ b/runners/spark/build.gradle @@ -204,6 +204,20 @@ task validatesStructuredStreamingRunnerBatch(type: Test) { excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms' } + filter { +// Exclude failed tests. They must be included back once correspondent transform will be fixed. +excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineFnsTest.testComposedCombineWithContext' +excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testSimpleCombineWithContext' +excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$CombineWithContextTests.testSimpleCombineWithContextEmpty' +excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testFixedWindowsCombineWithContext' +excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext' +excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSlidingWindowsCombineWithContext' +excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput' +excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders' +excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmpty' +excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenPCollectionsEmptyThenParDo' +excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testLifecycleMethodsBounded' + } } task validatesRunner {
[beam] branch spark-runner_structured-streaming updated (f5cfbc1 -> ac3132c)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from f5cfbc1 [BEAM-8470] Remove "validatesStructuredStreamingRunnerBatch" from "validatesRunner" task add ac3132c [BEAM-8470] Fix timestamps in combine output: assign the timestamp to the window and not merge all the timestamps before combine No new revisions were added by this update. Summary of changes: .../translation/batch/AggregatorCombiner.java| 16 ++-- 1 file changed, 10 insertions(+), 6 deletions(-)
[beam] branch spark-runner_structured-streaming updated (6186093 -> fe9e723)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. omit 6186093 spotless omit 003bcd8 [BEAM-8470] Fix CombineTest.testCountPerElementWithSlidingWindows expected results omit 3506d6d [BEAM-8470] set log level to info to avoid resource consumption in production mode omit 2e123ac [BEAM-8470] Fix the output timestamps of combine: timestamps must be merged to a window before combining add 9d5fdb8 [BEAM-8470] Fix the output timestamps of combine: timestamps must be merged to a window before combining add 87b4de7 [BEAM-8470] set log level to info to avoid resource consumption in production mode add fe9e723 [BEAM-8470] Fix CombineTest.testCountPerElementWithSlidingWindows expected results This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (6186093) \ N -- N -- N refs/heads/spark-runner_structured-streaming (fe9e723) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes:
[beam] branch spark-runner_structured-streaming updated (4e5757d -> 6186093)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. omit 4e5757d [BEAM-8470] Fix CombineTest.testCountPerElementWithSlidingWindows expected results and apply spotless add 003bcd8 [BEAM-8470] Fix CombineTest.testCountPerElementWithSlidingWindows expected results add 6186093 spotless This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (4e5757d) \ N -- N -- N refs/heads/spark-runner_structured-streaming (6186093) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java | 2 -- 1 file changed, 2 deletions(-)
[beam] branch spark-runner_structured-streaming updated (bb0ffa6 -> 4e5757d)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. omit bb0ffa6 [BEAM-8470] Fix CombineTest.testCountPerElementWithSlidingWindows expected results add 4e5757d [BEAM-8470] Fix CombineTest.testCountPerElementWithSlidingWindows expected results and apply spotless This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (bb0ffa6) \ N -- N -- N refs/heads/spark-runner_structured-streaming (4e5757d) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .../translation/batch/AggregatorCombiner.java | 29 ++ .../translation/batch/CombineTest.java | 11 ++-- .../apache/beam/sdk/nexmark/NexmarkLauncher.java | 2 ++ 3 files changed, 29 insertions(+), 13 deletions(-)
[beam] branch spark-runner_structured-streaming updated (3506d6d -> bb0ffa6)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from 3506d6d [BEAM-8470] set log level to info to avoid resource consumption in production mode add bb0ffa6 [BEAM-8470] Fix CombineTest.testCountPerElementWithSlidingWindows expected results No new revisions were added by this update. Summary of changes: .../spark/structuredstreaming/translation/batch/CombineTest.java | 9 +++-- 1 file changed, 3 insertions(+), 6 deletions(-)
[beam] branch spark-runner_structured-streaming updated (eae506b -> 3506d6d)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. omit eae506b [BEAM-8470] Fix wrong expected results in CombineTest.testBinaryCombineWithSlidingWindows omit ef0e870 [BEAM-8470] Fix tests: use correct SparkStructuredStreamingPipelineOptions, set testMode to true. Some renaming omit 6826025 [BEAM-8470] Add a test to test combine translation on binaryCombineFn with sliding windows omit f668e06 [temp] nexmark run in new spark runner omit 7ee091e [temp] debug pardo print input and output in both spark runners and put spark in debug mode add c4f2575 [BEAM-8470] Add a test to test combine translation on binaryCombineFn with sliding windows add 2e048f9 [BEAM-8470] Fix tests: use correct SparkStructuredStreamingPipelineOptions, set testMode to true. Some renaming add fba2109 [BEAM-8470] Fix wrong expected results in CombineTest.testBinaryCombineWithSlidingWindows add a7d67b2 [BEAM-8470] Add disclaimers about this runner being experimental add 4a2c59d [BEAM-8470] Fix: create an empty accumulator in combine.mergeAccumulators, because this method modifies its first input accumulator. Decrease memory usage by storing only accumulator and timestamp in the combine.merge map add d949942 [BEAM-8470] Apply spotless add 5a059d1 [BEAM-8470] Add a countPerElement test with sliding windows add 2e123ac [BEAM-8470] Fix the output timestamps of combine: timestamps must be merged to a window before combining add 3506d6d [BEAM-8470] set log level to info to avoid resource consumption in production mode This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (eae506b) \ N -- N -- N refs/heads/spark-runner_structured-streaming (3506d6d) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .../SparkStructuredStreamingRunner.java| 19 +++-- .../translation/batch/AggregatorCombiner.java | 39 ++ .../translation/batch/ParDoTranslatorBatch.java| 43 +-- .../spark/translation/TransformTranslator.java | 21 +- runners/spark/src/main/resources/log4j.properties | 4 +- .../metrics/sink/SparkMetricsSinkTest.java | 4 +- .../metrics/MetricsPusherTest.java | 6 +- .../translation/batch/CombineTest.java | 84 +++--- .../translation/batch/ComplexSourceTest.java | 4 +- .../translation/batch/FlattenTest.java | 4 +- .../translation/batch/GroupByKeyTest.java | 7 +- .../translation/batch/ParDoTest.java | 19 +++-- .../translation/batch/SimpleSourceTest.java| 4 +- .../translation/batch/WindowAssignTest.java| 7 +- .../translation/streaming/SimpleSourceTest.java| 4 +- .../apache/beam/sdk/nexmark/NexmarkLauncher.java | 2 - .../apache/beam/sdk/nexmark/queries/Query5.java| 2 +- .../nexmark/src/main/resources/log4j.properties| 2 +- 18 files changed, 138 insertions(+), 137 deletions(-)
[beam] branch spark-runner_structured-streaming updated (2b3e99f -> eae506b)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. omit 2b3e99f [BEAM-8470] Fix wrong expected results in CombineTest.testBinaryCombineWithSlidingWindows omit 2883d6f [BEAM-8470] Fix tests: use correct SparkStructuredStreamingPipelineOptions, set testMode to true. Some renaming omit 9c41f65 [BEAM-8470] Add a test to test combine translation on binaryCombineFn with sliding windows omit 1768142 Revert "[temp] debug pardo print input and output in both spark runners and put spark in debug mode" add 6826025 [BEAM-8470] Add a test to test combine translation on binaryCombineFn with sliding windows add ef0e870 [BEAM-8470] Fix tests: use correct SparkStructuredStreamingPipelineOptions, set testMode to true. Some renaming add eae506b [BEAM-8470] Fix wrong expected results in CombineTest.testBinaryCombineWithSlidingWindows This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (2b3e99f) \ N -- N -- N refs/heads/spark-runner_structured-streaming (eae506b) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .../translation/batch/ParDoTranslatorBatch.java| 43 -- .../spark/translation/TransformTranslator.java | 21 ++- .../StructuredStreamingPipelineStateTest.java | 2 - .../nexmark/src/main/resources/log4j.properties| 2 +- 4 files changed, 60 insertions(+), 8 deletions(-)
[beam] branch spark-runner_structured-streaming updated (2883d6f -> 2b3e99f)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from 2883d6f [BEAM-8470] Fix tests: use correct SparkStructuredStreamingPipelineOptions, set testMode to true. Some renaming add 2b3e99f [BEAM-8470] Fix wrong expected results in CombineTest.testBinaryCombineWithSlidingWindows No new revisions were added by this update. Summary of changes: .../spark/structuredstreaming/translation/batch/CombineTest.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch spark-runner_structured-streaming updated (7df039b -> 2883d6f)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. omit 7df039b [BEAM-8470] Fix tests: use correct parkStructuredStreamingPipelineOptions, set testMode to true. Some renaming add 2883d6f [BEAM-8470] Fix tests: use correct SparkStructuredStreamingPipelineOptions, set testMode to true. Some renaming This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (7df039b) \ N -- N -- N refs/heads/spark-runner_structured-streaming (2883d6f) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes:
[beam] branch spark-runner_structured-streaming updated (8a3f75c -> 7df039b)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from 8a3f75c [BEAM-8470] input windows can arrive exploded (for sliding windows). As a result an input has multiple windows. So we need to consider that the accumulator can have multiple windows add 9e4235d [BEAM-8470] Add a combine test with sliding windows add 7ee091e [temp] debug pardo print input and output in both spark runners and put spark in debug mode add f668e06 [temp] nexmark run in new spark runner add 1768142 Revert "[temp] debug pardo print input and output in both spark runners and put spark in debug mode" add 9c41f65 [BEAM-8470] Add a test to test combine translation on binaryCombineFn with sliding windows add 7df039b [BEAM-8470] Fix tests: use correct parkStructuredStreamingPipelineOptions, set testMode to true. Some renaming No new revisions were added by this update. Summary of changes: .../StructuredStreamingPipelineStateTest.java | 7 ++- .../aggregators/metrics/sink/InMemoryMetrics.java | 3 ++ .../metrics/sink/SparkMetricsSinkTest.java | 20 ++- ...parkBeamMetricTest.java => BeamMetricTest.java} | 7 ++- ...tricsPusherTest.java => MetricsPusherTest.java} | 33 .../translation/batch/CombineTest.java | 61 ++ .../translation/batch/ComplexSourceTest.java | 14 ++--- .../translation/batch/FlattenTest.java | 16 +++--- .../translation/batch/GroupByKeyTest.java | 18 --- .../translation/batch/ParDoTest.java | 36 +++-- .../translation/batch/SimpleSourceTest.java| 15 +++--- .../translation/batch/WindowAssignTest.java| 14 ++--- .../translation/streaming/SimpleSourceTest.java| 14 ++--- .../apache/beam/sdk/nexmark/NexmarkLauncher.java | 2 + .../apache/beam/sdk/nexmark/queries/Query5.java| 2 +- 15 files changed, 178 insertions(+), 84 deletions(-) rename runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/metrics/{SparkBeamMetricTest.java => BeamMetricTest.java} (91%) rename runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/metrics/{SparkMetricsPusherTest.java => MetricsPusherTest.java} (68%)
[beam] branch spark-runner_structured-streaming updated (59ba063 -> 8a3f75c)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from 59ba063 [BEAM-8470] Fix combiner. Do not reuse instance of accumulator add 8a3f75c [BEAM-8470] input windows can arrive exploded (for sliding windows). As a result an input has multiple windows. So we need to consider that the accumulator can have multiple windows No new revisions were added by this update. Summary of changes: .../translation/batch/AggregatorCombiner.java | 31 +- 1 file changed, 18 insertions(+), 13 deletions(-)
[beam] branch spark-runner_structured-streaming updated (d224f1c -> 59ba063)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from d224f1c [BEAM-8470] simplify coders in combinePerKey translation add 59ba063 [BEAM-8470] Fix combiner. Do not reuse instance of accumulator No new revisions were added by this update. Summary of changes: .../structuredstreaming/translation/batch/AggregatorCombiner.java| 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-)
[beam] branch spark-runner_structured-streaming updated (bb62390 -> d224f1c)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from bb62390 Fix SpotBugs add d224f1c [BEAM-8470] simplify coders in combinePerKey translation No new revisions were added by this update. Summary of changes: .../translation/batch/CombinePerKeyTranslatorBatch.java| 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-)
[beam] branch spark-runner_structured-streaming updated (f6e2fb0 -> bb62390)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. omit f6e2fb0 Fix SpotBugs add bb62390 Fix SpotBugs This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (f6e2fb0) \ N -- N -- N refs/heads/spark-runner_structured-streaming (bb62390) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .../structuredstreaming/SparkStructuredStreamingPipelineResult.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-)
[beam] branch spark-runner_structured-streaming updated (e40bb83 -> f6e2fb0)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from e40bb83 [BEAM-8470] Rebase on master: pass sideInputMapping in SimpleDoFnRunner as needed now in the API add f6e2fb0 Fix SpotBugs No new revisions were added by this update. Summary of changes: .../metrics/sink/{CsvSink.java => CodahaleCsvSink.java} | 4 ++-- .../metrics/sink/{GraphiteSink.java => CodahaleGraphiteSink.java} | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) rename runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/sink/{CsvSink.java => CodahaleCsvSink.java} (93%) rename runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/metrics/sink/{GraphiteSink.java => CodahaleGraphiteSink.java} (92%)
[beam] branch master updated: Move sequence diagrams to PlantUML SVG.
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 6418fb6 Move sequence diagrams to PlantUML SVG. new 98eb81d Merge pull request #9870 from RyanSkraba/rskraba/svg-sequence-diags 6418fb6 is described below commit 6418fb6c4852ba4e07257c0b04c36d6fb9b28815 Author: Ryan Skraba AuthorDate: Thu Oct 24 16:23:52 2019 +0200 Move sequence diagrams to PlantUML SVG. The plantuml source used to generate the diagrams is embedded in the SVG resources and can be modified and regenerated (by hand, at this time). Remove annotation on Reader. --- .../src/documentation/io/developing-io-overview.md | 6 +- website/src/documentation/programming-guide.md | 4 +- website/src/images/dofn-sequence-diagram.png | Bin 126448 -> 0 bytes website/src/images/dofn-sequence-diagram.svg | 94 ++ website/src/images/source-sequence-diagram.png | Bin 73657 -> 0 bytes website/src/images/source-sequence-diagram.svg | 106 + 6 files changed, 205 insertions(+), 5 deletions(-) diff --git a/website/src/documentation/io/developing-io-overview.md b/website/src/documentation/io/developing-io-overview.md index 56f2755..b17a710 100644 --- a/website/src/documentation/io/developing-io-overview.md +++ b/website/src/documentation/io/developing-io-overview.md @@ -107,9 +107,9 @@ Here is a sequence diagram that shows the lifecycle of the Source during information to IO developers such as the constraints that apply to the objects or particular cases such as streaming mode. - - ![This is a sequence diagram that shows the lifecycle of the Source]( -{{ "/images/source-sequence-diagram.png" | prepend: site.baseurl }}) + +![This is a sequence diagram that shows the lifecycle of the Source]( +{{ "/images/source-sequence-diagram.svg" | prepend: site.baseurl }}) ### Using ParDo and GroupByKey diff --git a/website/src/documentation/programming-guide.md b/website/src/documentation/programming-guide.md index afb1163..d78b609 100644 --- a/website/src/documentation/programming-guide.md +++ b/website/src/documentation/programming-guide.md @@ -809,9 +809,9 @@ Here is a sequence diagram that shows the lifecycle of the DoFn during apply to the objects or particular cases such as failover or instance reuse. They also give instanciation use cases. - + ![This is a sequence diagram that shows the lifecycle of the DoFn]( - {{ "/images/dofn-sequence-diagram.png" | prepend: site.baseurl }}) + {{ "/images/dofn-sequence-diagram.svg" | prepend: site.baseurl }}) 4.2.2. GroupByKey {#groupbykey} diff --git a/website/src/images/dofn-sequence-diagram.png b/website/src/images/dofn-sequence-diagram.png deleted file mode 100644 index cf6570d..000 Binary files a/website/src/images/dofn-sequence-diagram.png and /dev/null differ diff --git a/website/src/images/dofn-sequence-diagram.svg b/website/src/images/dofn-sequence-diagram.svg new file mode 100644 index 000..898b1ae --- /dev/null +++ b/website/src/images/dofn-sequence-diagram.svg @@ -0,0 +1,94 @@ + + +http://www.w3.org/2000/svg"; xmlns:xlink="http://www.w3.org/1999/xlink"; contentScriptType="application/ecmascript" contentStyleType="text/css" height="863px" preserveAspectRatio="none" style="width:740px;height:863px;" version="1.1" viewBox="0 0 740 863" width="740px" zoomAndPan="magnify">> +note right of DoFn: can have non-transient instance\nvariable state that will be deserialized +note right of DoFn: do not include enclosing class serializable state; use static\nnested DoFn or define as anonymous class in static method +note right of DoFn: no shared (global) static variable access (no sync mechanism) but a beam\nstate (based on engine mechanisms) can be injected to processElement +note right of DoFn: keep as pure function as possible or idempotent side effects\nbecause DoFns can be retried on failed bundles + +participant Runner + +activate Pipeline +Pipeline -> DoFn: **create DoFn ** +DoFn -> Runner: **passed instance or deserialized on workers** + +note right Pipeline: If state variables are known at pipeline construction step\ninitialize state variables by constructor + +group DoFn Lifecycle + Runner -> DoFn: **call setup** + activate Runner + activate DoFn + note right DoFn: reused instance to process other bundles on the same worker + note right DoFn: If state variables do not depend on the main pipeline program and are the\nsame for all DoFn instances initialize them in setup + group For each bundle +Runner -> DoFn: **call startBundle** +group
[beam] branch master updated: Move sequence diagrams to PlantUML SVG.
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 6418fb6 Move sequence diagrams to PlantUML SVG. new 98eb81d Merge pull request #9870 from RyanSkraba/rskraba/svg-sequence-diags 6418fb6 is described below commit 6418fb6c4852ba4e07257c0b04c36d6fb9b28815 Author: Ryan Skraba AuthorDate: Thu Oct 24 16:23:52 2019 +0200 Move sequence diagrams to PlantUML SVG. The plantuml source used to generate the diagrams is embedded in the SVG resources and can be modified and regenerated (by hand, at this time). Remove annotation on Reader. --- .../src/documentation/io/developing-io-overview.md | 6 +- website/src/documentation/programming-guide.md | 4 +- website/src/images/dofn-sequence-diagram.png | Bin 126448 -> 0 bytes website/src/images/dofn-sequence-diagram.svg | 94 ++ website/src/images/source-sequence-diagram.png | Bin 73657 -> 0 bytes website/src/images/source-sequence-diagram.svg | 106 + 6 files changed, 205 insertions(+), 5 deletions(-) diff --git a/website/src/documentation/io/developing-io-overview.md b/website/src/documentation/io/developing-io-overview.md index 56f2755..b17a710 100644 --- a/website/src/documentation/io/developing-io-overview.md +++ b/website/src/documentation/io/developing-io-overview.md @@ -107,9 +107,9 @@ Here is a sequence diagram that shows the lifecycle of the Source during information to IO developers such as the constraints that apply to the objects or particular cases such as streaming mode. - - ![This is a sequence diagram that shows the lifecycle of the Source]( -{{ "/images/source-sequence-diagram.png" | prepend: site.baseurl }}) + +![This is a sequence diagram that shows the lifecycle of the Source]( +{{ "/images/source-sequence-diagram.svg" | prepend: site.baseurl }}) ### Using ParDo and GroupByKey diff --git a/website/src/documentation/programming-guide.md b/website/src/documentation/programming-guide.md index afb1163..d78b609 100644 --- a/website/src/documentation/programming-guide.md +++ b/website/src/documentation/programming-guide.md @@ -809,9 +809,9 @@ Here is a sequence diagram that shows the lifecycle of the DoFn during apply to the objects or particular cases such as failover or instance reuse. They also give instanciation use cases. - + ![This is a sequence diagram that shows the lifecycle of the DoFn]( - {{ "/images/dofn-sequence-diagram.png" | prepend: site.baseurl }}) + {{ "/images/dofn-sequence-diagram.svg" | prepend: site.baseurl }}) 4.2.2. GroupByKey {#groupbykey} diff --git a/website/src/images/dofn-sequence-diagram.png b/website/src/images/dofn-sequence-diagram.png deleted file mode 100644 index cf6570d..000 Binary files a/website/src/images/dofn-sequence-diagram.png and /dev/null differ diff --git a/website/src/images/dofn-sequence-diagram.svg b/website/src/images/dofn-sequence-diagram.svg new file mode 100644 index 000..898b1ae --- /dev/null +++ b/website/src/images/dofn-sequence-diagram.svg @@ -0,0 +1,94 @@ + + +http://www.w3.org/2000/svg"; xmlns:xlink="http://www.w3.org/1999/xlink"; contentScriptType="application/ecmascript" contentStyleType="text/css" height="863px" preserveAspectRatio="none" style="width:740px;height:863px;" version="1.1" viewBox="0 0 740 863" width="740px" zoomAndPan="magnify">> +note right of DoFn: can have non-transient instance\nvariable state that will be deserialized +note right of DoFn: do not include enclosing class serializable state; use static\nnested DoFn or define as anonymous class in static method +note right of DoFn: no shared (global) static variable access (no sync mechanism) but a beam\nstate (based on engine mechanisms) can be injected to processElement +note right of DoFn: keep as pure function as possible or idempotent side effects\nbecause DoFns can be retried on failed bundles + +participant Runner + +activate Pipeline +Pipeline -> DoFn: **create DoFn ** +DoFn -> Runner: **passed instance or deserialized on workers** + +note right Pipeline: If state variables are known at pipeline construction step\ninitialize state variables by constructor + +group DoFn Lifecycle + Runner -> DoFn: **call setup** + activate Runner + activate DoFn + note right DoFn: reused instance to process other bundles on the same worker + note right DoFn: If state variables do not depend on the main pipeline program and are the\nsame for all DoFn instances initialize them in setup + group For each bundle +Runner -> DoFn: **call startBundle** +group
[beam] branch spark-runner_structured-streaming updated (1e0be7d -> 37ae68a)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 1e0be7d [BEAM-8470] Apply spotless discard 178e6b3 [BEAM-8470] Add a jenkins job for validates runner tests in the new spark runner add 3417103 [BEAM-8470] Add a jenkins job for validates runner tests in the new spark runner add 37ae68a [BEAM-8470] Apply spotless This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (1e0be7d) \ N -- N -- N refs/heads/spark-runner_structured-streaming (37ae68a) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. No new revisions were added by this update. Summary of changes: .../job_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[beam] branch spark-runner_structured-streaming updated (178e6b3 -> 1e0be7d)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from 178e6b3 [BEAM-8470] Add a jenkins job for validates runner tests in the new spark runner add 1e0be7d [BEAM-8470] Apply spotless No new revisions were added by this update. Summary of changes: .../translation/batch/ParDoTranslatorBatch.java | 17 ++--- .../translation/helpers/EncoderHelpers.java | 4 +--- .../translation/helpers/KVHelpers.java | 2 +- .../translation/helpers/RowHelpers.java | 4 ++-- 4 files changed, 14 insertions(+), 13 deletions(-)
[beam] branch spark-runner_structured-streaming updated (a0e6ca4 -> 178e6b3)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. from a0e6ca4 [BEAM-8470] Remove Encoders based on kryo now that we call Beam coders in the runner add 178e6b3 [BEAM-8470] Add a jenkins job for validates runner tests in the new spark runner No new revisions were added by this update. Summary of changes: ...ostCommit_Java_ValidatesRunner_SparkStructuredStreaming.groovy} | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) copy .test-infra/jenkins/{job_PostCommit_Java_ValidatesRunner_Spark.groovy => job_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming.groovy} (88%)
[beam] branch spark-runner_structured-streaming updated (620a27a -> a0e6ca4)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 620a27a Remove Encoders based on kryo now that we call Beam coders in the runner discard 824b344 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard 27ef6de Remove unneeded cast discard 6a27839 Use beam encoders also in the output of the source translation discard 62a87b6 Apply spotless, fix typo and javadoc discard c5e78a0 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 039f58a Apply new Encoders to GroupByKey discard 21accab Create a Tuple2Coder to encode scala tuple2 discard 29f7e93 Apply new Encoders to AggregatorCombiner discard 7f1060a Apply new Encoders to Window assign translation discard c48d032 Ignore long time failing test: SparkMetricsSinkTest discard 68d3d67 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard 3cc256e Apply new Encoders to Read source discard 7d456b4 Apply new Encoders to CombinePerKey discard c33fdda Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard c8bfcf3 Put Encoders expressions serializable discard 72c267c Wrap exceptions in UserCoderExceptions discard c6f2ac9 Apply spotless and checkstyle and add javadocs discard 78b2d22 Add an assert of equality in the encoders test discard 34e8aa8 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard f48067b Fix equal and hashcode discard ca01777 Remove example code discard 50060a8 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 0cf2c87 Fix beam coder lazy init using reflexion: use .clas + try catch + cast discard d7c9a4a Fix getting the output value in code generation discard 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn discard fdba22d Fix warning in coder construction by reflexion discard e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst discard d5645ff Fix code generation in Beam coder wrapper discard e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities discard fff5092 Fix scala Product in Encoders to avoid StackEverflow discard c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class discard 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 93d425a After testing performance and correctness, launch pipeline with dataset.foreach(). Make both test mode and production mode use foreach for uniformity. Move dataset print as a utility method discard 61f487f Remove no more needed AggregatorCombinerPerKey (there is only AggregatorCombiner) discard f8a5046 fixup! Add PipelineResults to Spark structured streaming. discard 8aafd50 Print number of leaf datasets discard ec43374 Add spark execution plans extended debug messages. discard 3b15128 Update log4j configuration discard cde225a Add PipelineResults to Spark structured streaming. discard 0e36b19 Make spotless happy discard 4aaf456 Added metrics sinks and tests discard dc939c8 Persist all output Dataset if there are multiple outputs in pipeline Enabled Use*Metrics tests discard dab3c2e Add a test to check that CombineGlobally preserves windowing discard 6e9ccdd Fix accumulators initialization in Combine that prevented CombineGlobally to work. discard a797884 Fix javadoc discard 476bc20 Add setEnableSparkMetricSinks() method discard 51ca79a Add missing dependencies to run Spark Structured Streaming Runner on Nexmark discard 5ed3e03 Add metrics support in DoFn discard d29c64e Ignore for now not working test testCombineGlobally discard ff50ccb Add a test that combine per key preserves windowing discard 6638522 Clean groupByKeyTest discard 8c499a5 add comment in combine globally test discard f68ed7a Fixed immutable list bug discard 9601649 Fix javadoc of AggregatorCombiner discard 7784f30 Clean not more needed WindowingHelpers discard 3bd95df Clean not more needed RowHelpers discard 4b5af9c Clean no more needed KVHelper
[beam] branch spark-runner_structured-streaming updated (620a27a -> a0e6ca4)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 620a27a Remove Encoders based on kryo now that we call Beam coders in the runner discard 824b344 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard 27ef6de Remove unneeded cast discard 6a27839 Use beam encoders also in the output of the source translation discard 62a87b6 Apply spotless, fix typo and javadoc discard c5e78a0 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 039f58a Apply new Encoders to GroupByKey discard 21accab Create a Tuple2Coder to encode scala tuple2 discard 29f7e93 Apply new Encoders to AggregatorCombiner discard 7f1060a Apply new Encoders to Window assign translation discard c48d032 Ignore long time failing test: SparkMetricsSinkTest discard 68d3d67 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard 3cc256e Apply new Encoders to Read source discard 7d456b4 Apply new Encoders to CombinePerKey discard c33fdda Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard c8bfcf3 Put Encoders expressions serializable discard 72c267c Wrap exceptions in UserCoderExceptions discard c6f2ac9 Apply spotless and checkstyle and add javadocs discard 78b2d22 Add an assert of equality in the encoders test discard 34e8aa8 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard f48067b Fix equal and hashcode discard ca01777 Remove example code discard 50060a8 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 0cf2c87 Fix beam coder lazy init using reflexion: use .clas + try catch + cast discard d7c9a4a Fix getting the output value in code generation discard 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn discard fdba22d Fix warning in coder construction by reflexion discard e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst discard d5645ff Fix code generation in Beam coder wrapper discard e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities discard fff5092 Fix scala Product in Encoders to avoid StackEverflow discard c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class discard 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 93d425a After testing performance and correctness, launch pipeline with dataset.foreach(). Make both test mode and production mode use foreach for uniformity. Move dataset print as a utility method discard 61f487f Remove no more needed AggregatorCombinerPerKey (there is only AggregatorCombiner) discard f8a5046 fixup! Add PipelineResults to Spark structured streaming. discard 8aafd50 Print number of leaf datasets discard ec43374 Add spark execution plans extended debug messages. discard 3b15128 Update log4j configuration discard cde225a Add PipelineResults to Spark structured streaming. discard 0e36b19 Make spotless happy discard 4aaf456 Added metrics sinks and tests discard dc939c8 Persist all output Dataset if there are multiple outputs in pipeline Enabled Use*Metrics tests discard dab3c2e Add a test to check that CombineGlobally preserves windowing discard 6e9ccdd Fix accumulators initialization in Combine that prevented CombineGlobally to work. discard a797884 Fix javadoc discard 476bc20 Add setEnableSparkMetricSinks() method discard 51ca79a Add missing dependencies to run Spark Structured Streaming Runner on Nexmark discard 5ed3e03 Add metrics support in DoFn discard d29c64e Ignore for now not working test testCombineGlobally discard ff50ccb Add a test that combine per key preserves windowing discard 6638522 Clean groupByKeyTest discard 8c499a5 add comment in combine globally test discard f68ed7a Fixed immutable list bug discard 9601649 Fix javadoc of AggregatorCombiner discard 7784f30 Clean not more needed WindowingHelpers discard 3bd95df Clean not more needed RowHelpers discard 4b5af9c Clean no more needed KVHelper
[beam] branch spark-runner_structured-streaming updated (620a27a -> a0e6ca4)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 620a27a Remove Encoders based on kryo now that we call Beam coders in the runner discard 824b344 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard 27ef6de Remove unneeded cast discard 6a27839 Use beam encoders also in the output of the source translation discard 62a87b6 Apply spotless, fix typo and javadoc discard c5e78a0 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 039f58a Apply new Encoders to GroupByKey discard 21accab Create a Tuple2Coder to encode scala tuple2 discard 29f7e93 Apply new Encoders to AggregatorCombiner discard 7f1060a Apply new Encoders to Window assign translation discard c48d032 Ignore long time failing test: SparkMetricsSinkTest discard 68d3d67 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard 3cc256e Apply new Encoders to Read source discard 7d456b4 Apply new Encoders to CombinePerKey discard c33fdda Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard c8bfcf3 Put Encoders expressions serializable discard 72c267c Wrap exceptions in UserCoderExceptions discard c6f2ac9 Apply spotless and checkstyle and add javadocs discard 78b2d22 Add an assert of equality in the encoders test discard 34e8aa8 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard f48067b Fix equal and hashcode discard ca01777 Remove example code discard 50060a8 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 0cf2c87 Fix beam coder lazy init using reflexion: use .clas + try catch + cast discard d7c9a4a Fix getting the output value in code generation discard 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn discard fdba22d Fix warning in coder construction by reflexion discard e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst discard d5645ff Fix code generation in Beam coder wrapper discard e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities discard fff5092 Fix scala Product in Encoders to avoid StackEverflow discard c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class discard 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 93d425a After testing performance and correctness, launch pipeline with dataset.foreach(). Make both test mode and production mode use foreach for uniformity. Move dataset print as a utility method discard 61f487f Remove no more needed AggregatorCombinerPerKey (there is only AggregatorCombiner) discard f8a5046 fixup! Add PipelineResults to Spark structured streaming. discard 8aafd50 Print number of leaf datasets discard ec43374 Add spark execution plans extended debug messages. discard 3b15128 Update log4j configuration discard cde225a Add PipelineResults to Spark structured streaming. discard 0e36b19 Make spotless happy discard 4aaf456 Added metrics sinks and tests discard dc939c8 Persist all output Dataset if there are multiple outputs in pipeline Enabled Use*Metrics tests discard dab3c2e Add a test to check that CombineGlobally preserves windowing discard 6e9ccdd Fix accumulators initialization in Combine that prevented CombineGlobally to work. discard a797884 Fix javadoc discard 476bc20 Add setEnableSparkMetricSinks() method discard 51ca79a Add missing dependencies to run Spark Structured Streaming Runner on Nexmark discard 5ed3e03 Add metrics support in DoFn discard d29c64e Ignore for now not working test testCombineGlobally discard ff50ccb Add a test that combine per key preserves windowing discard 6638522 Clean groupByKeyTest discard 8c499a5 add comment in combine globally test discard f68ed7a Fixed immutable list bug discard 9601649 Fix javadoc of AggregatorCombiner discard 7784f30 Clean not more needed WindowingHelpers discard 3bd95df Clean not more needed RowHelpers discard 4b5af9c Clean no more needed KVHelper
[beam] branch spark-runner_structured-streaming updated (620a27a -> a0e6ca4)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 620a27a Remove Encoders based on kryo now that we call Beam coders in the runner discard 824b344 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard 27ef6de Remove unneeded cast discard 6a27839 Use beam encoders also in the output of the source translation discard 62a87b6 Apply spotless, fix typo and javadoc discard c5e78a0 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 039f58a Apply new Encoders to GroupByKey discard 21accab Create a Tuple2Coder to encode scala tuple2 discard 29f7e93 Apply new Encoders to AggregatorCombiner discard 7f1060a Apply new Encoders to Window assign translation discard c48d032 Ignore long time failing test: SparkMetricsSinkTest discard 68d3d67 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard 3cc256e Apply new Encoders to Read source discard 7d456b4 Apply new Encoders to CombinePerKey discard c33fdda Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard c8bfcf3 Put Encoders expressions serializable discard 72c267c Wrap exceptions in UserCoderExceptions discard c6f2ac9 Apply spotless and checkstyle and add javadocs discard 78b2d22 Add an assert of equality in the encoders test discard 34e8aa8 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard f48067b Fix equal and hashcode discard ca01777 Remove example code discard 50060a8 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 0cf2c87 Fix beam coder lazy init using reflexion: use .clas + try catch + cast discard d7c9a4a Fix getting the output value in code generation discard 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn discard fdba22d Fix warning in coder construction by reflexion discard e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst discard d5645ff Fix code generation in Beam coder wrapper discard e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities discard fff5092 Fix scala Product in Encoders to avoid StackEverflow discard c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class discard 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 93d425a After testing performance and correctness, launch pipeline with dataset.foreach(). Make both test mode and production mode use foreach for uniformity. Move dataset print as a utility method discard 61f487f Remove no more needed AggregatorCombinerPerKey (there is only AggregatorCombiner) discard f8a5046 fixup! Add PipelineResults to Spark structured streaming. discard 8aafd50 Print number of leaf datasets discard ec43374 Add spark execution plans extended debug messages. discard 3b15128 Update log4j configuration discard cde225a Add PipelineResults to Spark structured streaming. discard 0e36b19 Make spotless happy discard 4aaf456 Added metrics sinks and tests discard dc939c8 Persist all output Dataset if there are multiple outputs in pipeline Enabled Use*Metrics tests discard dab3c2e Add a test to check that CombineGlobally preserves windowing discard 6e9ccdd Fix accumulators initialization in Combine that prevented CombineGlobally to work. discard a797884 Fix javadoc discard 476bc20 Add setEnableSparkMetricSinks() method discard 51ca79a Add missing dependencies to run Spark Structured Streaming Runner on Nexmark discard 5ed3e03 Add metrics support in DoFn discard d29c64e Ignore for now not working test testCombineGlobally discard ff50ccb Add a test that combine per key preserves windowing discard 6638522 Clean groupByKeyTest discard 8c499a5 add comment in combine globally test discard f68ed7a Fixed immutable list bug discard 9601649 Fix javadoc of AggregatorCombiner discard 7784f30 Clean not more needed WindowingHelpers discard 3bd95df Clean not more needed RowHelpers discard 4b5af9c Clean no more needed KVHelper
[beam] branch spark-runner_structured-streaming updated (620a27a -> a0e6ca4)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 620a27a Remove Encoders based on kryo now that we call Beam coders in the runner discard 824b344 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard 27ef6de Remove unneeded cast discard 6a27839 Use beam encoders also in the output of the source translation discard 62a87b6 Apply spotless, fix typo and javadoc discard c5e78a0 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 039f58a Apply new Encoders to GroupByKey discard 21accab Create a Tuple2Coder to encode scala tuple2 discard 29f7e93 Apply new Encoders to AggregatorCombiner discard 7f1060a Apply new Encoders to Window assign translation discard c48d032 Ignore long time failing test: SparkMetricsSinkTest discard 68d3d67 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard 3cc256e Apply new Encoders to Read source discard 7d456b4 Apply new Encoders to CombinePerKey discard c33fdda Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard c8bfcf3 Put Encoders expressions serializable discard 72c267c Wrap exceptions in UserCoderExceptions discard c6f2ac9 Apply spotless and checkstyle and add javadocs discard 78b2d22 Add an assert of equality in the encoders test discard 34e8aa8 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard f48067b Fix equal and hashcode discard ca01777 Remove example code discard 50060a8 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 0cf2c87 Fix beam coder lazy init using reflexion: use .clas + try catch + cast discard d7c9a4a Fix getting the output value in code generation discard 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn discard fdba22d Fix warning in coder construction by reflexion discard e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst discard d5645ff Fix code generation in Beam coder wrapper discard e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities discard fff5092 Fix scala Product in Encoders to avoid StackEverflow discard c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class discard 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 93d425a After testing performance and correctness, launch pipeline with dataset.foreach(). Make both test mode and production mode use foreach for uniformity. Move dataset print as a utility method discard 61f487f Remove no more needed AggregatorCombinerPerKey (there is only AggregatorCombiner) discard f8a5046 fixup! Add PipelineResults to Spark structured streaming. discard 8aafd50 Print number of leaf datasets discard ec43374 Add spark execution plans extended debug messages. discard 3b15128 Update log4j configuration discard cde225a Add PipelineResults to Spark structured streaming. discard 0e36b19 Make spotless happy discard 4aaf456 Added metrics sinks and tests discard dc939c8 Persist all output Dataset if there are multiple outputs in pipeline Enabled Use*Metrics tests discard dab3c2e Add a test to check that CombineGlobally preserves windowing discard 6e9ccdd Fix accumulators initialization in Combine that prevented CombineGlobally to work. discard a797884 Fix javadoc discard 476bc20 Add setEnableSparkMetricSinks() method discard 51ca79a Add missing dependencies to run Spark Structured Streaming Runner on Nexmark discard 5ed3e03 Add metrics support in DoFn discard d29c64e Ignore for now not working test testCombineGlobally discard ff50ccb Add a test that combine per key preserves windowing discard 6638522 Clean groupByKeyTest discard 8c499a5 add comment in combine globally test discard f68ed7a Fixed immutable list bug discard 9601649 Fix javadoc of AggregatorCombiner discard 7784f30 Clean not more needed WindowingHelpers discard 3bd95df Clean not more needed RowHelpers discard 4b5af9c Clean no more needed KVHelper
[beam] 35/37: Remove unneeded cast
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 27ef6de3fa90db6d59027f9a6fa792fc5787f6e9 Author: Etienne Chauchot AuthorDate: Wed Oct 23 11:51:49 2019 +0200 Remove unneeded cast --- .../spark/structuredstreaming/translation/helpers/KVHelpers.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java index 1983eaa..2fa4b1a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/KVHelpers.java @@ -25,7 +25,7 @@ import org.apache.spark.api.java.function.MapFunction; public final class KVHelpers { /** A Spark {@link MapFunction} for extracting the key out of a {@link KV} for GBK for example. */ - public static MapFunction>, K> extractKey() { -return (MapFunction>, K>) wv -> wv.getValue().getKey(); + public static MapFunction>, K> extractKey() { +return wv -> wv.getValue().getKey(); } }
[beam] 18/37: Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 34e8aa8c31a561684eea2e2496757f9f3cae35d0 Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:14:32 2019 +0200 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations --- .../translation/helpers/EncoderHelpers.java| 18 ++ 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 91aaaf9..c9ab435 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -227,32 +227,34 @@ public class EncoderHelpers { /* CODE GENERATED: + final $javaType ${ev.value} try { - final $javaType ${ev.value} = + ${ev.value} = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); } */ List parts = new ArrayList<>(); - parts.add("try { final "); + parts.add("final "); parts.add(" "); - parts.add(" ="); - parts.add("?"); - parts.add(":"); - parts.add("("); + parts.add(";try { "); + parts.add(" = "); + parts.add("? "); + parts.add(": ("); parts.add(") "); parts.add(".decode(new java.io.ByteArrayInputStream("); - parts.add(")); } catch (IOException e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add(")); } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); args.add(javaType); args.add(ev.value()); + args.add(ev.value()); args.add(input.isNull()); args.add(CodeGenerator.defaultValue(dataType(), false)); args.add(javaType);
[beam] 07/37: Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 2aaf07a41155f35ab36bda4c3c02a7ffa7bd66db Author: Etienne Chauchot AuthorDate: Thu Aug 29 15:10:40 2019 +0200 Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities --- .../translation/helpers/EncoderHelpers.java| 64 +- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 8a4f1de..0765c78 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -100,13 +100,13 @@ public class EncoderHelpers { List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); +serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), coder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(claz, coder), +new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, coder), classTag); /* @@ -126,16 +126,14 @@ public class EncoderHelpers { */ } - private static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { -private Class claz; -private Coder beamCoder; private Expression child; +private Coder beamCoder; -private EncodeUsingBeamCoder( Class claz, Coder beamCoder) { - this.claz = claz; +public EncodeUsingBeamCoder(Expression child, Coder beamCoder) { + this.child = child; this.beamCoder = beamCoder; - this.child = new BoundReference(0, new ObjectType(claz), true); } @Override public Expression child() { @@ -175,11 +173,18 @@ public class EncoderHelpers { } @Override public Object productElement(int n) { - return null; + switch (n) { +case 0: + return child; +case 1: + return beamCoder; +default: + throw new ArrayIndexOutOfBoundsException("productElement out of bounds"); + } } @Override public int productArity() { - return 0; + return 2; } @Override public boolean canEqual(Object that) { @@ -194,11 +199,11 @@ public class EncoderHelpers { return false; } EncodeUsingBeamCoder that = (EncodeUsingBeamCoder) o; - return claz.equals(that.claz) && beamCoder.equals(that.beamCoder); + return beamCoder.equals(that.beamCoder); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), claz, beamCoder); + return Objects.hash(super.hashCode(), beamCoder); } } @@ -226,16 +231,16 @@ public class EncoderHelpers { override def dataType: DataType = BinaryType }*/ - private static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ + public static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ -private Class claz; -private Coder beamCoder; private Expression child; +private ClassTag classTag; +private Coder beamCoder; -private DecodeUsingBeamCoder(Class claz, Coder beamCoder) { - this.claz = claz; +public DecodeUsingBeamCoder(Expression child, ClassTag classTag, Coder beamCoder) { + this.child = child; + this.classTag = classTag; this.beamCoder = beamCoder; - this.child = new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType); } @Override public Expression child() { @@ -267,7 +272,7 @@ public class EncoderHelpers { args.add(new VariableValue("deserialize", String.class)); Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", claz)); + return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", classTag.runtimeClass())); } @@ -280,17 +285,24 @@ public class Encod
[beam] 08/37: Add a simple spark native test to test Beam coders wrapping into Spark Encoders
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit e4478ffc2a9fd35d76080ff8f33cc8d3340cba1c Author: Etienne Chauchot AuthorDate: Fri Aug 30 17:34:13 2019 +0200 Add a simple spark native test to test Beam coders wrapping into Spark Encoders --- .../structuredstreaming/utils/EncodersTest.java| 29 ++ 1 file changed, 29 insertions(+) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java new file mode 100644 index 000..490e3dc --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java @@ -0,0 +1,29 @@ +package org.apache.beam.runners.spark.structuredstreaming.utils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.spark.sql.SparkSession; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +/** + * Test of the wrapping of Beam Coders as Spark ExpressionEncoders. + */ +public class EncodersTest { + + @Test + public void beamCoderToSparkEncoderTest() { +SparkSession sparkSession = SparkSession.builder().appName("beamCoderToSparkEncoderTest") +.master("local[4]").getOrCreate(); +List data = new ArrayList<>(); +data.add(1); +data.add(2); +data.add(3); +//sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); +sparkSession.createDataset(data, EncoderHelpers.genericEncoder()); + } +}
[beam] 32/37: Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c5e78a0f4552a094ba3914ef490629e136ac1beb Author: Etienne Chauchot AuthorDate: Tue Oct 1 17:52:32 2019 +0200 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner --- .../translation/batch/ParDoTranslatorBatch.java| 42 +-- .../translation/helpers/EncoderHelpers.java| 6 ++- .../translation/helpers/MultiOuputCoder.java | 49 + .../translation/helpers/Tuple2Coder.java | 62 -- 4 files changed, 81 insertions(+), 78 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 255adc8..f5a109e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -31,8 +31,10 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.MultiOuputCoder; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.PTransform; @@ -84,12 +86,15 @@ class ParDoTranslatorBatch ParDoTranslation.getSchemaInformation(context.getCurrentTransform()); // Init main variables -Dataset> inputDataSet = context.getDataset(context.getInput()); +PValue input = context.getInput(); +Dataset> inputDataSet = context.getDataset(input); Map, PValue> outputs = context.getOutputs(); TupleTag mainOutputTag = getTupleTag(context); List> outputTags = new ArrayList<>(outputs.keySet()); WindowingStrategy windowingStrategy = -((PCollection) context.getInput()).getWindowingStrategy(); +((PCollection) input).getWindowingStrategy(); +Coder inputCoder = ((PCollection) input).getCoder(); +Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); // construct a map from side input to WindowingStrategy so that // the DoFn runner can map main-input windows to side input windows @@ -102,8 +107,6 @@ class ParDoTranslatorBatch SideInputBroadcast broadcastStateData = createBroadcastSideInputs(sideInputs, context); Map, Coder> outputCoderMap = context.getOutputCoders(); -Coder inputCoder = ((PCollection) context.getInput()).getCoder(); - MetricsContainerStepMapAccumulator metricsAccum = MetricsAccumulator.getInstance(); List> additionalOutputTags = new ArrayList<>(); @@ -129,19 +132,25 @@ class ParDoTranslatorBatch broadcastStateData, doFnSchemaInformation); +MultiOuputCoder multipleOutputCoder = MultiOuputCoder +.of(SerializableCoder.of(TupleTag.class), outputCoderMap, windowCoder); Dataset, WindowedValue>> allOutputs = -inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder()); +inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.fromBeamCoder(multipleOutputCoder)); if (outputs.entrySet().size() > 1) { allOutputs.persist(); for (Map.Entry, PValue> output : outputs.entrySet()) { -pruneOutputFilteredByTag(context, allOutputs, output); +pruneOutputFilteredByTag(context, allOutputs, output, windowCoder); } } else { + Coder outputCoder = ((PCollection) outputs.get(mainOutputTag)).getCoder(); + Coder> windowedValueCoder = + (Coder>) + (Coder) WindowedValue.getFullCoder(outputCoder, windowCoder); Dataset> outputDataset = allOutputs.map( (MapFunction, WindowedValue>, WindowedValue>) value -> value._2, - EncoderHelpers.windowedValueEncoder()); + EncoderHelpers.fromBeamCoder(windowedValueCoder)); context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset);
[beam] 34/37: Use beam encoders also in the output of the source translation
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 6a278395d77b3578da10a9621c85883a2d6f2ded Author: Etienne Chauchot AuthorDate: Wed Oct 23 11:45:39 2019 +0200 Use beam encoders also in the output of the source translation --- .../translation/batch/ReadSourceTranslatorBatch.java | 4 +--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index ceb87cf..6af7f55 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -77,9 +77,7 @@ class ReadSourceTranslatorBatch Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -// using kryo bytes serialization because the mapper already calls -// windowedValueCoder.decode, no need to call it also in the Spark encoder -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 9f1e34d..ea10272 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -77,9 +77,7 @@ class ReadSourceTranslatorStreaming Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -// using kryo bytes serialization because the mapper already calls -// windowedValueCoder.decode, no need to call it also in the Spark encoder -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset);
[beam] 31/37: Apply new Encoders to GroupByKey
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 039f58a6a07e567bb8c5636caecebc61dec9129e Author: Etienne Chauchot AuthorDate: Mon Sep 30 12:13:25 2019 +0200 Apply new Encoders to GroupByKey --- .../batch/GroupByKeyTranslatorBatch.java | 25 -- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 3e203a8..2970aa7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -29,6 +29,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.Translation import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; @@ -54,11 +56,21 @@ class GroupByKeyTranslatorBatch Dataset>> input = context.getDataset(inputPCollection); +WindowingStrategy windowingStrategy = inputPCollection.getWindowingStrategy(); +KvCoder kvCoder = (KvCoder) inputPCollection.getCoder(); + // group by key only +Coder keyCoder = kvCoder.getKeyCoder(); KeyValueGroupedDataset>> groupByKeyOnly = -input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder()); +input.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder( +keyCoder)); // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable +Coder valueCoder = kvCoder.getValueCoder(); +WindowedValue.WindowedValueCoder wvCoder = +WindowedValue.FullWindowedValueCoder.of( +valueCoder, inputPCollection.getWindowingStrategy().getWindowFn().windowCoder()); +IterableCoder> iterableCoder = IterableCoder.of(wvCoder); Dataset>>> materialized = groupByKeyOnly.mapGroups( (MapGroupsFunction>, KV>>>) @@ -77,19 +89,20 @@ class GroupByKeyTranslatorBatch KV.of(key, Iterables.unmodifiableIterable(values)); return kv; }, -EncoderHelpers.kvEncoder()); +EncoderHelpers.fromBeamCoder(KvCoder.of(keyCoder, iterableCoder))); -WindowingStrategy windowingStrategy = inputPCollection.getWindowingStrategy(); -KvCoder coder = (KvCoder) inputPCollection.getCoder(); // group also by windows +WindowedValue.FullWindowedValueCoder>> outputCoder = WindowedValue.FullWindowedValueCoder +.of(KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), +windowingStrategy.getWindowFn().windowCoder()); Dataset>>> output = materialized.flatMap( new GroupAlsoByWindowViaOutputBufferFn<>( windowingStrategy, new InMemoryStateInternalsFactory<>(), -SystemReduceFn.buffering(coder.getValueCoder()), +SystemReduceFn.buffering(valueCoder), context.getSerializableOptions()), -EncoderHelpers.windowedValueEncoder()); +EncoderHelpers.fromBeamCoder(outputCoder)); context.putDataset(context.getOutput(), output); }
[beam] 29/37: Apply new Encoders to AggregatorCombiner
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 29f7e93c954cc26425a052c0f1c19ec6e6c9fe66 Author: Etienne Chauchot AuthorDate: Fri Sep 27 11:55:20 2019 +0200 Apply new Encoders to AggregatorCombiner --- .../translation/batch/AggregatorCombiner.java | 22 +- .../batch/CombinePerKeyTranslatorBatch.java| 20 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java index 0e3229e..d14569a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -52,13 +54,25 @@ class AggregatorCombiner private final Combine.CombineFn combineFn; private WindowingStrategy windowingStrategy; private TimestampCombiner timestampCombiner; + private IterableCoder> accumulatorCoder; + private IterableCoder> outputCoder; public AggregatorCombiner( Combine.CombineFn combineFn, - WindowingStrategy windowingStrategy) { + WindowingStrategy windowingStrategy, + Coder accumulatorCoder, + Coder outputCoder) { this.combineFn = combineFn; this.windowingStrategy = (WindowingStrategy) windowingStrategy; this.timestampCombiner = windowingStrategy.getTimestampCombiner(); +this.accumulatorCoder = +IterableCoder.of( +WindowedValue.FullWindowedValueCoder.of( +accumulatorCoder, windowingStrategy.getWindowFn().windowCoder())); +this.outputCoder = +IterableCoder.of( +WindowedValue.FullWindowedValueCoder.of( +outputCoder, windowingStrategy.getWindowFn().windowCoder())); } @Override @@ -142,14 +156,12 @@ class AggregatorCombiner @Override public Encoder>> bufferEncoder() { -// TODO replace with accumulatorCoder if possible -return EncoderHelpers.genericEncoder(); +return EncoderHelpers.fromBeamCoder(accumulatorCoder); } @Override public Encoder>> outputEncoder() { -// TODO replace with outputCoder if possible -return EncoderHelpers.genericEncoder(); +return EncoderHelpers.fromBeamCoder(outputCoder); } private Set collectAccumulatorsWindows(Iterable> accumulators) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java index 33b037a..be238b5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; @@ -58,20 +59,31 @@ class CombinePerKeyTranslatorBatch Dataset>> inputDataset = context.getDataset(input); -Coder keyCoder = (Coder) input.getCoder().getCoderArguments().get(0); -Coder outputTCoder = (Coder) output.getCoder().getCoderArguments().get(1); +KvCoder inputCoder = (KvCoder) input.getCoder(); +Coder keyCoder = inputCoder.getKeyCoder(); +KvCoder outputKVCoder = (KvCoder) output.getCoder(); +Coder outputCoder = outputKVCoder.getValueCoder(); KeyValueGroupedDataset>> groupedDataset = inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromB
[beam] 23/37: Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c33fddadcc3f38474e0aeb440c0d3fac718ee5a6 Author: Etienne Chauchot AuthorDate: Fri Sep 6 10:42:00 2019 +0200 Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index e7c5bb7..218dc0a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -149,7 +149,7 @@ public class EncoderHelpers { $beamCoder.encode(${input.value}, baos); ${ev.value} = baos.toByteArray(); } -} catch (java.io.IOException e) { +} catch (Exception e) { throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ @@ -162,7 +162,7 @@ public class EncoderHelpers { parts.add(".encode("); parts.add(", baos); "); parts.add( - " = baos.toByteArray();}} catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); + " = baos.toByteArray();}} catch (Exception e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); @@ -265,7 +265,7 @@ public class EncoderHelpers { ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); - } catch (java.io.IOException e) { + } catch (Exception e) { throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ @@ -280,7 +280,7 @@ public class EncoderHelpers { parts.add(") "); parts.add(".decode(new java.io.ByteArrayInputStream("); parts.add( - ")); } catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); + ")); } catch (Exception e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
[beam] 04/37: Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 5fa6331e0356953870e6ed614b0ce5e5c801fab1 Author: Etienne Chauchot AuthorDate: Mon Aug 26 15:22:12 2019 +0200 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part + Fix EncoderHelpers.fromBeamCoder() visibility --- .../translation/helpers/EncoderHelpers.java| 64 ++ 1 file changed, 52 insertions(+), 12 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index b072803..ab24e37 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; @@ -94,7 +96,7 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - private Encoder fromBeamCoder(Coder coder, Class claz){ + public static Encoder fromBeamCoder(Coder coder, Class claz){ List serialiserList = new ArrayList<>(); serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); @@ -103,7 +105,8 @@ public class EncoderHelpers { SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(classTag, coder), classTag); +new DecodeUsingBeamCoder<>(claz, coder), +classTag); /* ExpressionEncoder[T]( @@ -150,8 +153,8 @@ public class EncoderHelpers { List instructions = new ArrayList<>(); instructions.add(outside); - Seq parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq(); + StringContext stringContext = new StringContext(parts); Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext); List args = new ArrayList<>(); @@ -160,7 +163,7 @@ public class EncoderHelpers { args.add(new VariableValue("javaType", String.class)); args.add(new SimpleExprValue("input.isNull", Boolean.class)); args.add(new SimpleExprValue("CodeGenerator.defaultValue(dataType)", String.class)); - args.add(new VariableValue("$serialize", String.class)); + args.add(new VariableValue("serialize", String.class)); Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq()); return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", Array.class)); @@ -229,24 +232,61 @@ public class EncoderHelpers { private static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ -private ClassTag classTag; +private Class claz; private Coder beamCoder; +private Expression child; -private DecodeUsingBeamCoder(ClassTag classTag, Coder beamCoder) { - this.classTag = classTag; +private DecodeUsingBeamCoder(Class claz, Coder beamCoder) { + this.claz = claz; this.beamCoder = beamCoder; + this.child = new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType); } @Override public Expression child() { - return new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType); + return child; } @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { - return null; + // Code to deserialize. + ExprCode input = child.genCode(ctx); + String javaType = CodeGenerator.javaType(dataType()); + + String inputStream = "ByteArrayInputStream bais = new ByteArrayInputStream(${input.value});"; + String deserialize = inputStream + "($javaType) $beamCoder.decode(bais);"; + + String outside = "final $javaType output = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $deserialize;"; + + List instructions = new ArrayList<>(); + instructions.add(outside); + Seq parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq(); + + StringContext stringContext = new StringContext(parts); + Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext); + List args = new ArrayList<>(); + args.add(new SimpleExprValue("input.valu
[beam] branch spark-runner_structured-streaming updated (46ed555 -> 620a27a)
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a change to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git. discard 46ed555 Remove Encoders based on kryo now that we call Beam coders in the runner discard 25d0401 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders discard ebc53fd Remove unneeded cast discard ad29daf Use beam encoders also in the output of the source translation discard 507bbd8 Fix javadoc discard c980d4c Fix typo discard fb3aa34 Add missing windowedValue Encoder call in Pardo discard ee2c0e6 Apply spotless discard 31c91a9 Apply new Encoders to Pardo. Replace Tuple2Coder with MultiOutputCoder to deal with multiple output to use in Spark Encoder for DoFnRunner discard 868204f Apply new Encoders to GroupByKey discard 30c662a Create a Tuple2Coder to encode scale tuple2 discard d093ffe Apply spotless discard 6edcfa2 Apply new Encoders to AggregatorCombiner discard 5beb435 Apply new Encoders to Window assign translation discard ab7d24c Ignore long time failing test: SparkMetricsSinkTest discard 3ac3c71 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder discard bcbb697 Apply new Encoders to Read source discard aa25e85 Apply new Encoders to CombinePerKey discard f0f2078 Catch Exception instead of IOException because some coders to not throw Exceptions at all (e.g.VoidCoder) discard 3a333fb Put Encoders expressions serializable discard cfdf4a4 Improve exceptions catching discard b879123 Apply spotless and checkstyle and add javadocs discard 0fe6f9b Add an assert of equality in the encoders test discard d8b8b42 Fix generated code: uniform exceptions catching, fix parenthesis and variable declarations discard ef69410 Fix equal and hashcode discard 4351304 Remove example code discard c4a4464 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder discard 91e923c Cast coder instanciated by reflection discard 723c004 Add try catch around reflexion call in lazy init of beam coder discard 8bbf991 Fix beam coder lazy init using reflexion discard 959664f Fix getting the output value in code generation discard 668227b Fix ExpressionEncoder generated code: typos, try catch, fqcn discard cbd7c2b Fix warning in coder construction by reflexion discard 2c94eef Fix call to scala Fucntion1 in coder lazy init discard a758985 Lazy init coder because coder instance cannot be interpolated by catalyst discard b11e100 Fix code generation in Beam coder wrapper discard 2bf4cd9 Add a simple spark native test to test Beam coders wrapping into Spark Encoders discard e96af88 Fix visibility of serializer and deserializer discard 23735e4 Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder discard a5d49f5 Fix scala Product in Encoders to avoid StackEverflow discard 95fd272 type erasure: spark encoders require a Class, pass Object and cast to Class discard 84f2cbd9 Fix EncoderHelpers.fromBeamCoder() visibility discard d613d6b Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part discard 031754c Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part discard c350188 Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply discard a524036 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag discard 0cedc7a Add a TODO on perf improvement of Pardo translation new 22d6466 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag new 20d5bbd Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply new a5c7da3 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part new 5fa6331 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: deserialization part new c9e3534 type erasure: spark encoders require a Class, pass Object and cast to Class new fff5092 Fix scala Product in Encoders to avoid StackEverflow new 2aaf07a Conform to spark ExpressionEncoders: pass classTags, implement scala Product, pass children from within the ExpressionEncoder, fix visibilities new e4478ff Add a simple spark native test to test Beam coders wrapping into Spark Encoders new d5645ff Fix code generation in Beam coder wrapper new e6b68a8 Lazy init coder because coder instance cannot be interpolated by catalyst new fdba22d Fix warning in coder construction by reflexion new 8b07ec8 Fix ExpressionEncoder generated code: typos, try catch, fqcn new d7c9a4a Fix getting the output value in code generation
[beam] 26/37: Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 68d3d6798950888590fca915782d5288fe2d1e5a Author: Etienne Chauchot AuthorDate: Thu Sep 19 17:20:31 2019 +0200 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder --- .../translation/batch/ReadSourceTranslatorBatch.java | 9 ++--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 9 ++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 2dcf66f..ceb87cf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -70,13 +70,16 @@ class ReadSourceTranslatorBatch .load(); // extract windowedValue from Row -WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder -.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); +WindowedValue.FullWindowedValueCoder windowedValueCoder = +WindowedValue.FullWindowedValueCoder.of( +source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -EncoderHelpers.fromBeamCoder(windowedValueCoder)); +// using kryo bytes serialization because the mapper already calls +// windowedValueCoder.decode, no need to call it also in the Spark encoder +EncoderHelpers.windowedValueEncoder()); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 9e03d96..9f1e34d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -71,12 +71,15 @@ class ReadSourceTranslatorStreaming .load(); // extract windowedValue from Row -WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder -.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); +WindowedValue.FullWindowedValueCoder windowedValueCoder = +WindowedValue.FullWindowedValueCoder.of( +source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), -EncoderHelpers.fromBeamCoder(windowedValueCoder)); +// using kryo bytes serialization because the mapper already calls +// windowedValueCoder.decode, no need to call it also in the Spark encoder +EncoderHelpers.windowedValueEncoder()); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset);
[beam] 13/37: Fix getting the output value in code generation
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit d7c9a4a59768687ff051ab0f28462e6376648e43 Author: Etienne Chauchot AuthorDate: Wed Sep 4 16:50:17 2019 +0200 Fix getting the output value in code generation --- .../translation/helpers/EncoderHelpers.java| 37 +- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index dff308a..a452da0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -21,7 +21,6 @@ import static org.apache.spark.sql.types.DataTypes.BinaryType; import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; -import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -42,7 +41,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block; import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator; import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext; import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; -import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; import scala.StringContext; @@ -144,34 +142,42 @@ public class EncoderHelpers { /* CODE GENERATED +byte[] ${ev.value}; try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); -final byte[] output; if ({input.isNull}) -output = null; -else -output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); +${ev.value} = null; +else{ +$beamCoder.encode(${input.value}, baos); +${ev.value} = baos.toByteArray(); +} } catch (Exception e) { throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); } */ List parts = new ArrayList<>(); - parts.add("try {java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); final byte[] output; if ("); - parts.add(") output = null; else output ="); + parts.add("byte[] "); + parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); + parts.add(") "); + parts.add(" = null; else{"); parts.add(".encode("); - parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add(", baos); "); + parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); + + args.add(ev.value()); args.add(input.isNull()); + args.add(ev.value()); args.add(beamCoder); args.add(input.value()); + args.add(ev.value()); Block code = (new Block.BlockHelper(sc)) .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code), input.isNull(), - new VariableValue("output", Array.class)); + return ev.copy(input.code().$plus(code), input.isNull(),ev.value()); } @@ -263,7 +269,7 @@ public class EncoderHelpers { /* CODE GENERATED: try { - final $javaType output = + final $javaType ${ev.value} = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); @@ -274,7 +280,8 @@ public class EncoderHelpers { List parts = new ArrayList<>(); parts.add("try { final "); - parts.add(" output ="); + parts.add(" "); + parts.add(" ="); parts.add("?"); parts.add(":"); parts.add("("); @@ -286,6 +293,7 @@ public class EncoderHelpers { List args = new ArrayList<>(); args.add(javaType); + args.add(ev.value()); args.add(input.isNull()); args.add(CodeGenerator.defaultValue(dataType(),
[beam] 17/37: Fix equal and hashcode
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit f48067b87be26773de91d076c4ad249f54890db0 Author: Etienne Chauchot AuthorDate: Thu Sep 5 14:49:37 2019 +0200 Fix equal and hashcode --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 83243b3..91aaaf9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -195,11 +195,11 @@ public class EncoderHelpers { return false; } EncodeUsingBeamCoder that = (EncodeUsingBeamCoder) o; - return beamCoder.equals(that.beamCoder); + return beamCoder.equals(that.beamCoder) && child.equals(that.child); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), beamCoder); + return Objects.hash(super.hashCode(), child, beamCoder); } } @@ -306,11 +306,11 @@ public class EncoderHelpers { return false; } DecodeUsingBeamCoder that = (DecodeUsingBeamCoder) o; - return classTag.equals(that.classTag) && beamCoder.equals(that.beamCoder); + return child.equals(that.child) && classTag.equals(that.classTag) && beamCoder.equals(that.beamCoder); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), classTag, beamCoder); + return Objects.hash(super.hashCode(), child, classTag, beamCoder); } } }
[beam] 20/37: Apply spotless and checkstyle and add javadocs
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c6f2ac9b21f7cfb9e1e81675cdf7f511b794559d Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:35:34 2019 +0200 Apply spotless and checkstyle and add javadocs --- .../translation/helpers/EncoderHelpers.java| 137 + .../structuredstreaming/utils/EncodersTest.java| 32 +++-- 2 files changed, 113 insertions(+), 56 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index c9ab435..f990121 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -89,21 +89,31 @@ public class EncoderHelpers { - Bridges from Beam Coders to Spark Encoders */ - /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Coder beamCoder/*, Class claz*/){ + /** + * Wrap a Beam coder into a Spark Encoder using Catalyst Expression Encoders (which uses java code + * generation). + */ + public static Encoder fromBeamCoder(Coder beamCoder) { List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); +serialiserList.add( +new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), +new DecodeUsingBeamCoder<>( +new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); } + /** + * Catalyst Expression that serializes elements using Beam {@link Coder}. + * + * @param : Type of elements ot be serialized. + */ public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { private Expression child; @@ -114,13 +124,16 @@ public class EncoderHelpers { this.beamCoder = beamCoder; } -@Override public Expression child() { +@Override +public Expression child() { return child; } -@Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { +@Override +public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. - String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); + String accessCode = + ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); ExprCode input = child.genCode(ctx); /* @@ -140,14 +153,17 @@ public class EncoderHelpers { */ List parts = new ArrayList<>(); parts.add("byte[] "); - parts.add(";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); + parts.add( + ";try { java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); if ("); parts.add(") "); parts.add(" = null; else{"); parts.add(".encode("); parts.add(", baos); "); - parts.add(" = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + parts.add( + " = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); - StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); + StringContext sc = + new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); @@ -157,18 +173,19 @@ public class EncoderHelpers { args.add(accessCode); args.add(input.value()); args.add(ev.value()); - Block code = (new Block.BlockHelper(sc)) - .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); + Block code = + (new Block.BlockHelper(sc)).code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - return ev.copy(input.code().$plus(code
[beam] 06/37: Fix scala Product in Encoders to avoid StackEverflow
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit fff509246b4ed9810c137ba2c9bd7811e3d95079 Author: Etienne Chauchot AuthorDate: Thu Aug 29 10:58:32 2019 +0200 Fix scala Product in Encoders to avoid StackEverflow --- .../translation/helpers/EncoderHelpers.java| 18 -- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 9cb8f29..8a4f1de 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -175,16 +175,11 @@ public class EncoderHelpers { } @Override public Object productElement(int n) { - if (n == 0) { -return this; - } else { -throw new IndexOutOfBoundsException(String.valueOf(n)); - } + return null; } @Override public int productArity() { - //TODO test with spark Encoders if the arity of 1 is ok - return 1; + return 0; } @Override public boolean canEqual(Object that) { @@ -291,16 +286,11 @@ public class EncoderHelpers { } @Override public Object productElement(int n) { - if (n == 0) { -return this; - } else { -throw new IndexOutOfBoundsException(String.valueOf(n)); - } + return null; } @Override public int productArity() { - //TODO test with spark Encoders if the arity of 1 is ok - return 1; + return 0; } @Override public boolean canEqual(Object that) {
[beam] 22/37: Put Encoders expressions serializable
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c8bfcf367c6a4ac855fa2b9d549fa26c39b8be81 Author: Etienne Chauchot AuthorDate: Fri Sep 6 10:31:36 2019 +0200 Put Encoders expressions serializable --- .../structuredstreaming/translation/helpers/EncoderHelpers.java| 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index f4ea6fa..e7c5bb7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; import java.io.ByteArrayInputStream; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -114,7 +115,8 @@ public class EncoderHelpers { * * @param : Type of elements ot be serialized. */ - public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + public static class EncodeUsingBeamCoder extends UnaryExpression + implements NonSQLExpression, Serializable { private Expression child; private Coder beamCoder; @@ -229,7 +231,8 @@ public class EncoderHelpers { * * @param : Type of elements ot be serialized. */ - public static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + public static class DecodeUsingBeamCoder extends UnaryExpression + implements NonSQLExpression, Serializable { private Expression child; private ClassTag classTag;
[beam] 03/37: Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit a5c7da32d46d74ab4b79ebb34dcad4842f225c62 Author: Etienne Chauchot AuthorDate: Mon Aug 26 14:32:17 2019 +0200 Wrap Beam Coders into Spark Encoders using ExpressionEncoder: serialization part --- .../translation/helpers/EncoderHelpers.java| 245 + 1 file changed, 245 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index d44fe27..b072803 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -17,11 +17,40 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; +import static org.apache.spark.sql.types.DataTypes.BinaryType; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.expressions.BoundReference; +import org.apache.spark.sql.catalyst.expressions.Cast; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.NonSQLExpression; +import org.apache.spark.sql.catalyst.expressions.UnaryExpression; +import org.apache.spark.sql.catalyst.expressions.codegen.Block; +import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator; +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext; +import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; +import org.apache.spark.sql.catalyst.expressions.codegen.ExprValue; +import org.apache.spark.sql.catalyst.expressions.codegen.SimpleExprValue; +import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.ObjectType; +import scala.StringContext; import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.Seq; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; /** {@link Encoders} utility class. */ public class EncoderHelpers { @@ -64,4 +93,220 @@ public class EncoderHelpers { - Bridges from Beam Coders to Spark Encoders */ + /** A way to construct encoders using generic serializers. */ + private Encoder fromBeamCoder(Coder coder, Class claz){ + +List serialiserList = new ArrayList<>(); +serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); +ClassTag classTag = ClassTag$.MODULE$.apply(claz); +return new ExpressionEncoder<>( +SchemaHelpers.binarySchema(), +false, +JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), +new DecodeUsingBeamCoder<>(classTag, coder), classTag); + +/* +ExpressionEncoder[T]( +schema = new StructType().add("value", BinaryType), +flat = true, +serializer = Seq( +EncodeUsingSerializer( +BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), +deserializer = +DecodeUsingSerializer[T]( +Cast(GetColumnByOrdinal(0, BinaryType), BinaryType), +classTag[T], +kryo = useKryo), +clsTag = classTag[T] +) +*/ + } + + private static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { + +private Class claz; +private Coder beamCoder; +private Expression child; + +private EncodeUsingBeamCoder( Class claz, Coder beamCoder) { + this.claz = claz; + this.beamCoder = beamCoder; + this.child = new BoundReference(0, new ObjectType(claz), true); +} + +@Override public Expression child() { + return child; +} + +@Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { + // Code to serialize. + ExprCode input = child.genCode(ctx); + String javaType = CodeGenerator.javaType(dataType()); + String outputStream = "ByteArrayOutputStream baos = new ByteArrayOutputStream();"; + + String serialize = outputStream + "$beamCoder.encode(${input.value}, baos); baos.toByteArra
[beam] 01/37: Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 22d6466cae94cf482f8151a5fe6e7dde68d28d58 Author: Etienne Chauchot AuthorDate: Thu Jul 18 10:58:35 2019 +0200 Improve Pardo translation performance: avoid calling a filter transform when there is only one output tag --- .../translation/batch/ParDoTranslatorBatch.java | 12 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 46808b7..742c1b0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -133,10 +133,14 @@ class ParDoTranslatorBatch inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder()); if (outputs.entrySet().size() > 1) { allOutputs.persist(); -} - -for (Map.Entry, PValue> output : outputs.entrySet()) { - pruneOutputFilteredByTag(context, allOutputs, output); + for (Map.Entry, PValue> output : outputs.entrySet()) { +pruneOutputFilteredByTag(context, allOutputs, output); + } +} else { + Dataset> outputDataset = allOutputs.map( + (MapFunction, WindowedValue>, WindowedValue>) value -> value._2, + EncoderHelpers.windowedValueEncoder()); + context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } }
[beam] 11/37: Fix warning in coder construction by reflexion
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit fdba22d33205db9b039e82204e6e95f9c0e69d50 Author: Etienne Chauchot AuthorDate: Wed Sep 4 14:55:32 2019 +0200 Fix warning in coder construction by reflexion --- .../structuredstreaming/translation/helpers/EncoderHelpers.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 694bc24..1d89101 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -293,7 +293,7 @@ public class EncoderHelpers { @Override public Object nullSafeEval(Object input) { try { -Coder beamCoder = coderClass.newInstance(); +Coder beamCoder = coderClass.getDeclaredConstructor().newInstance(); return beamCoder.decode(new ByteArrayInputStream((byte[]) input)); } catch (Exception e) { throw new IllegalStateException("Error decoding bytes for coder: " + coderClass, e); @@ -373,13 +373,13 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> { /* CODE GENERATED -v = (coderClass) coderClass.newInstance(); +v = (coderClass) coderClass.getDeclaredConstructor().newInstance(); */ List parts = new ArrayList<>(); parts.add(""); parts.add(" = ("); parts.add(") "); -parts.add(".newInstance();"); +parts.add(".getDeclaredConstructor().newInstance();"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); args.add(v1);
[beam] 21/37: Wrap exceptions in UserCoderExceptions
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 72c267cc91f75a446a949825a216d4101bbca37d Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:53:10 2019 +0200 Wrap exceptions in UserCoderExceptions --- .../translation/helpers/EncoderHelpers.java | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index f990121..f4ea6fa 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -147,8 +147,8 @@ public class EncoderHelpers { $beamCoder.encode(${input.value}, baos); ${ev.value} = baos.toByteArray(); } -} catch (Exception e) { - throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); +} catch (java.io.IOException e) { + throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ List parts = new ArrayList<>(); @@ -160,7 +160,7 @@ public class EncoderHelpers { parts.add(".encode("); parts.add(", baos); "); parts.add( - " = baos.toByteArray();}} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + " = baos.toByteArray();}} catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); @@ -262,8 +262,8 @@ public class EncoderHelpers { ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); - } catch (Exception e) { -throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); + } catch (java.io.IOException e) { +throw org.apache.beam.sdk.util.UserCodeException.wrap(e); } */ @@ -277,7 +277,7 @@ public class EncoderHelpers { parts.add(") "); parts.add(".decode(new java.io.ByteArrayInputStream("); parts.add( - ")); } catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); + ")); } catch (java.io.IOException e) {throw org.apache.beam.sdk.util.UserCodeException.wrap(e);}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());
[beam] 36/37: Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 824b3445e99a0fc084b612b790c7d458689a4fd4 Author: Etienne Chauchot AuthorDate: Wed Oct 23 11:52:14 2019 +0200 Fix: Remove generic hack of using object. Use actual Coder encodedType in Encoders --- .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 2f3bced..c07c9dd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -101,7 +101,8 @@ public class EncoderHelpers { public static Encoder fromBeamCoder(Coder beamCoder) { List serialiserList = new ArrayList<>(); -Class claz = (Class) Object.class; +Class claz = beamCoder.getEncodedTypeDescriptor().getRawType(); + serialiserList.add( new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz);
[beam] 05/37: type erasure: spark encoders require a Class, pass Object and cast to Class
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c9e3534029811aabc00d09471ec78f943ba34028 Author: Etienne Chauchot AuthorDate: Thu Aug 29 10:57:53 2019 +0200 type erasure: spark encoders require a Class, pass Object and cast to Class --- .../spark/structuredstreaming/translation/helpers/EncoderHelpers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index ab24e37..9cb8f29 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -96,9 +96,10 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Coder coder, Class claz){ + public static Encoder fromBeamCoder(Coder coder/*, Class claz*/){ List serialiserList = new ArrayList<>(); +Class claz = (Class) Object.class; serialiserList.add(new EncodeUsingBeamCoder<>(claz, coder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>(
[beam] 10/37: Lazy init coder because coder instance cannot be interpolated by catalyst
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit e6b68a8f21aba2adcb7543eae806d71e08c0bff3 Author: Etienne Chauchot AuthorDate: Mon Sep 2 17:55:24 2019 +0200 Lazy init coder because coder instance cannot be interpolated by catalyst --- runners/spark/build.gradle | 1 + .../translation/helpers/EncoderHelpers.java| 63 +++--- .../structuredstreaming/utils/EncodersTest.java| 3 +- 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle index 73a710b..a948ef1 100644 --- a/runners/spark/build.gradle +++ b/runners/spark/build.gradle @@ -77,6 +77,7 @@ dependencies { provided "com.esotericsoftware.kryo:kryo:2.21" runtimeOnly library.java.jackson_module_scala runtimeOnly "org.scala-lang:scala-library:2.11.8" + compile "org.scala-lang.modules:scala-java8-compat_2.11:0.9.0" testCompile project(":sdks:java:io:kafka") testCompile project(path: ":sdks:java:core", configuration: "shadowTest") // SparkStateInternalsTest extends abstract StateInternalsTest diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index cc862cd..694bc24 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; +import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.List; @@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; -import scala.Function1; import scala.StringContext; import scala.Tuple2; import scala.collection.JavaConversions; @@ -94,17 +93,17 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Coder coder/*, Class claz*/){ + public static Encoder fromBeamCoder(Class> coderClass/*, Class claz*/){ List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), coder)); +serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), (Class>)coderClass)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, coder), +new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, (Class>)coderClass), classTag); /* @@ -127,11 +126,11 @@ public class EncoderHelpers { public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { private Expression child; -private Coder beamCoder; +private Class> coderClass; -public EncodeUsingBeamCoder(Expression child, Coder beamCoder) { +public EncodeUsingBeamCoder(Expression child, Class> coderClass) { this.child = child; - this.beamCoder = beamCoder; + this.coderClass = coderClass; } @Override public Expression child() { @@ -140,6 +139,7 @@ public class EncoderHelpers { @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. + String beamCoder = lazyInitBeamCoder(ctx, coderClass); ExprCode input = child.genCode(ctx); /* @@ -170,6 +170,7 @@ public class EncoderHelpers { new VariableValue("output", Array.class)); } + @Override public DataType dataType() { return BinaryType; } @@ -179,7 +180,7 @@ public class EncoderHelpers { case 0: return child; case 1: - return beamCoder; + return coderClass; default: throw new ArrayIndexOutOfBoundsExcep
[beam] 27/37: Ignore long time failing test: SparkMetricsSinkTest
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit c48d03213e5848aec8217d9b136ecc153d1d1d3c Author: Etienne Chauchot AuthorDate: Fri Sep 27 10:41:55 2019 +0200 Ignore long time failing test: SparkMetricsSinkTest --- .../aggregators/metrics/sink/SparkMetricsSinkTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java index dd23c05..9d56f0c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExternalResource; @@ -40,6 +41,7 @@ import org.junit.rules.ExternalResource; * A test that verifies Beam metrics are reported to Spark's metrics sink in both batch and * streaming modes. */ +@Ignore ("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a") public class SparkMetricsSinkTest { @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule(); @Rule public final TestPipeline pipeline = TestPipeline.create();
[beam] 25/37: Apply new Encoders to Read source
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 3cc256e5f81616d8b4126cef6ae8d049fb03460f Author: Etienne Chauchot AuthorDate: Fri Sep 6 17:49:10 2019 +0200 Apply new Encoders to Read source --- .../translation/batch/ReadSourceTranslatorBatch.java | 8 ++-- .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 7 +-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 6ae6646..2dcf66f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -69,10 +70,13 @@ class ReadSourceTranslatorBatch .load(); // extract windowedValue from Row +WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder +.of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); + Dataset> dataset = rowDataset.map( - RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()), -EncoderHelpers.windowedValueEncoder()); + RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), +EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection output = (PCollection) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java index 6ee0e07..ac74c29 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java @@ -43,13 +43,11 @@ public final class RowHelpers { * @return A {@link MapFunction} that accepts a {@link Row} and returns its {@link WindowedValue}. */ public static MapFunction> extractWindowedValueFromRowMapFunction( - Coder coder) { + WindowedValue.WindowedValueCoder windowedValueCoder) { return (MapFunction>) value -> { // there is only one value put in each Row by the InputPartitionReader byte[] bytes = (byte[]) value.get(0); - WindowedValue.FullWindowedValueCoder windowedValueCoder = - WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE); return windowedValueCoder.decode(new ByteArrayInputStream(bytes)); }; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index c3d07ff..9e03d96 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -70,10 +71,12 @@ class ReadSourceTranslatorStreaming .load(); // extract windowedValue from Row +WindowedValue.FullWindowedValueCoder windowedValueCoder = WindowedValue.FullWindowedValueCoder +.of(source.ge
[beam] 02/37: Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 20d5bbd4e8d2d7d7b4dc9639d716b1e3403f91eb Author: Alexey Romanenko AuthorDate: Fri Jul 19 15:48:32 2019 +0200 Use "sparkMaster" in local mode to obtain number of shuffle partitions + spotless apply --- .../translation/TranslationContext.java | 15 +++ .../translation/batch/ParDoTranslatorBatch.java | 8 +--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index f1bafd33..75f3ddf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -78,6 +78,21 @@ public class TranslationContext { sparkConf.setJars(options.getFilesToStage().toArray(new String[0])); } +// By default, Spark defines 200 as a number of sql partitions. This seems too much for local +// mode, so try to align with value of "sparkMaster" option in this case. +// We should not overwrite this value (or any user-defined spark configuration value) if the +// user has already configured it. +String sparkMaster = options.getSparkMaster(); +if (sparkMaster != null +&& sparkMaster.startsWith("local[") +&& System.getProperty("spark.sql.shuffle.partitions") == null) { + int numPartitions = + Integer.parseInt(sparkMaster.substring("local[".length(), sparkMaster.length() - 1)); + if (numPartitions > 0) { +sparkConf.set("spark.sql.shuffle.partitions", String.valueOf(numPartitions)); + } +} + this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); this.serializablePipelineOptions = new SerializablePipelineOptions(options); this.datasets = new HashMap<>(); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java index 742c1b0..255adc8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java @@ -137,9 +137,11 @@ class ParDoTranslatorBatch pruneOutputFilteredByTag(context, allOutputs, output); } } else { - Dataset> outputDataset = allOutputs.map( - (MapFunction, WindowedValue>, WindowedValue>) value -> value._2, - EncoderHelpers.windowedValueEncoder()); + Dataset> outputDataset = + allOutputs.map( + (MapFunction, WindowedValue>, WindowedValue>) + value -> value._2, + EncoderHelpers.windowedValueEncoder()); context.putDatasetWildcard(outputs.entrySet().iterator().next().getValue(), outputDataset); } }
[beam] 09/37: Fix code generation in Beam coder wrapper
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit d5645ff60aa99608a9ee3b8a5be6c58f9ac3903b Author: Etienne Chauchot AuthorDate: Mon Sep 2 15:45:24 2019 +0200 Fix code generation in Beam coder wrapper --- .../translation/helpers/EncoderHelpers.java| 93 -- .../structuredstreaming/utils/EncodersTest.java| 4 +- 2 files changed, 55 insertions(+), 42 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 0765c78..cc862cd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -42,15 +42,13 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block; import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator; import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext; import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode; -import org.apache.spark.sql.catalyst.expressions.codegen.ExprValue; -import org.apache.spark.sql.catalyst.expressions.codegen.SimpleExprValue; import org.apache.spark.sql.catalyst.expressions.codegen.VariableValue; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.ObjectType; +import scala.Function1; import scala.StringContext; import scala.Tuple2; import scala.collection.JavaConversions; -import scala.collection.Seq; import scala.reflect.ClassTag; import scala.reflect.ClassTag$; @@ -143,29 +141,33 @@ public class EncoderHelpers { @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. ExprCode input = child.genCode(ctx); - String javaType = CodeGenerator.javaType(dataType()); - String outputStream = "ByteArrayOutputStream baos = new ByteArrayOutputStream();"; - - String serialize = outputStream + "$beamCoder.encode(${input.value}, baos); baos.toByteArray();"; - - String outside = "final $javaType output = ${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $serialize;"; - List instructions = new ArrayList<>(); - instructions.add(outside); - Seq parts = JavaConversions.collectionAsScalaIterable(instructions).toSeq(); + /* +CODE GENERATED + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final bytes[] output; + if ({input.isNull}) + output = null; + else + output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); + */ + List parts = new ArrayList<>(); + parts.add("ByteArrayOutputStream baos = new ByteArrayOutputStream(); final bytes[] output; if ("); + parts.add(") output = null; else output ="); + parts.add(".encode("); + parts.add(", baos); baos.toByteArray();"); + + StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); - StringContext stringContext = new StringContext(parts); - Block.BlockHelper blockHelper = new Block.BlockHelper(stringContext); List args = new ArrayList<>(); - args.add(new VariableValue("beamCoder", Coder.class)); - args.add(new SimpleExprValue("input.value", ExprValue.class)); - args.add(new VariableValue("javaType", String.class)); - args.add(new SimpleExprValue("input.isNull", Boolean.class)); - args.add(new SimpleExprValue("CodeGenerator.defaultValue(dataType)", String.class)); - args.add(new VariableValue("serialize", String.class)); - Block code = blockHelper.code(JavaConversions.collectionAsScalaIterable(args).toSeq()); - - return ev.copy(input.code().$plus(code), input.isNull(), new VariableValue("output", Array.class)); + args.add(input.isNull()); + args.add(beamCoder); + args.add(input.value()); + Block code = (new Block.BlockHelper(sc)) + .code(JavaConversions.collectionAsScalaIterable(args).toSeq()); + + return ev.copy(input.code().$plus(code), input.isNull(), + new VariableValue("output", Array.class)); } @Override public DataType dataType() { @@ -252,27 +254,38 @@ public class EncoderHelpers { ExprCode input = child.genCode(ctx); String javaType = CodeGenerator.javaType(dataType()); - String inputStream = "ByteArrayInputStream bais = new ByteArrayInputStream(${input.value});"; - String deseri
[beam] 14/37: Fix beam coder lazy init using reflexion: use .clas + try catch + cast
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 0cf2c8759a64c81c1d4f83f74a759ae3dafd1f83 Author: Etienne Chauchot AuthorDate: Thu Sep 5 10:07:32 2019 +0200 Fix beam coder lazy init using reflexion: use .clas + try catch + cast --- .../translation/helpers/EncoderHelpers.java | 19 --- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index a452da0..0751c4c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Objects; import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.Encoder; @@ -388,18 +389,22 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B ctx.addImmutableStateIfNotExists(coderClass.getName(), beamCoderInstance, func(v1 -> { /* CODE GENERATED -v = (coderClass) coderClass.getDeclaredConstructor().newInstance(); +try { +v1 = coderClass.class.getDeclaredConstructor().newInstance(); +} catch (Exception e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); +} */ -List parts = new ArrayList<>(); -parts.add(""); + List parts = new ArrayList<>(); +parts.add("try {"); parts.add(" = ("); -parts.add(") "); -parts.add(".getDeclaredConstructor().newInstance();"); + parts.add(") "); + parts.add(".class.getDeclaredConstructor().newInstance();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); List args = new ArrayList<>(); args.add(v1); -args.add(coderClass.getName()); -args.add(coderClass.getName()); + args.add(coderClass.getName()); + args.add(coderClass.getName()); return sc.s(JavaConversions.collectionAsScalaIterable(args).toSeq()); })); return beamCoderInstance;
[beam] 30/37: Create a Tuple2Coder to encode scala tuple2
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 21accab89a4333b32003121269ab31b436e0dd2c Author: Etienne Chauchot AuthorDate: Mon Sep 30 11:25:04 2019 +0200 Create a Tuple2Coder to encode scala tuple2 --- .../translation/helpers/Tuple2Coder.java | 62 ++ 1 file changed, 62 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java new file mode 100644 index 000..1743a01 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/Tuple2Coder.java @@ -0,0 +1,62 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import scala.Tuple2; + +/** + * Beam coder to encode/decode Tuple2 scala types. + * @param first field type parameter + * @param second field type parameter + */ +public class Tuple2Coder extends StructuredCoder> { + private final Coder firstFieldCoder; + private final Coder secondFieldCoder; + + public static Tuple2Coder of(Coder firstFieldCoder, Coder secondFieldCoder) { +return new Tuple2Coder<>(firstFieldCoder, secondFieldCoder); + } + + private Tuple2Coder(Coder firstFieldCoder, Coder secondFieldCoder) { +this.firstFieldCoder = firstFieldCoder; +this.secondFieldCoder = secondFieldCoder; + } + + + @Override public void encode(Tuple2 value, OutputStream outStream) + throws IOException { +firstFieldCoder.encode(value._1(), outStream); +secondFieldCoder.encode(value._2(), outStream); + } + + @Override public Tuple2 decode(InputStream inStream) throws IOException { +T1 firstField = firstFieldCoder.decode(inStream); +T2 secondField = secondFieldCoder.decode(inStream); +return Tuple2.apply(firstField, secondField); + } + + @Override public List> getCoderArguments() { +return Arrays.asList(firstFieldCoder, secondFieldCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { +verifyDeterministic(this, "First field coder must be deterministic", firstFieldCoder); +verifyDeterministic(this, "Second field coder must be deterministic", secondFieldCoder); + } + + /** Returns the coder for first field. */ + public Coder getFirstFieldCoder() { +return firstFieldCoder; + } + + /** Returns the coder for second field. */ + public Coder getSecondFieldCoder() { +return secondFieldCoder; + } +}
[beam] 16/37: Remove example code
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit ca01777b5bd593c7caa5a6be6136abe662b8a4e5 Author: Etienne Chauchot AuthorDate: Thu Sep 5 14:33:23 2019 +0200 Remove example code --- .../translation/helpers/EncoderHelpers.java| 69 -- 1 file changed, 69 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 3f7c102..83243b3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -102,22 +102,6 @@ public class EncoderHelpers { JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); - -/* -ExpressionEncoder[T]( -schema = new StructType().add("value", BinaryType), -flat = true, -serializer = Seq( -EncodeUsingSerializer( -BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), kryo = useKryo)), -deserializer = -DecodeUsingSerializer[T]( -Cast(GetColumnByOrdinal(0, BinaryType), BinaryType), -classTag[T], -kryo = useKryo), -clsTag = classTag[T] -) -*/ } public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { @@ -219,30 +203,6 @@ public class EncoderHelpers { } } - /*case class EncodeUsingSerializer(child: Expression, kryo: Boolean) - extends UnaryExpression with NonSQLExpression with SerializerSupport { - -override def nullSafeEval(input: Any): Any = { -serializerInstance.serialize(input).array() -} - -override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val serializer = addImmutableScodererializerIfNeeded(ctx) -// Code to serialize. -val input = child.genCode(ctx) -val javaType = CodeGenerator.javaType(dataType) -val serialize = s"$serializer.serialize(${input.value}, null).array()" - -val code = input.code + code""" -final $javaType ${ev.value} = -${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $serialize; -""" -ev.copy(code = code, isNull = input.isNull) - } - -override def dataType: DataType = BinaryType - }*/ - public static class DecodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression{ private Expression child; @@ -353,33 +313,4 @@ public class EncoderHelpers { return Objects.hash(super.hashCode(), classTag, beamCoder); } } -/* -case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: Boolean) - extends UnaryExpression with NonSQLExpression with SerializerSupport { - -override def nullSafeEval(input: Any): Any = { -val inputBytes = java.nio.ByteBuffer.wrap(input.asInstanceOf[Array[Byte]]) -serializerInstance.deserialize(inputBytes) -} - -override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val serializer = addImmutableSerializerIfNeeded(ctx) -// Code to deserialize. -val input = child.genCode(ctx) -val javaType = CodeGenerator.javaType(dataType) -val deserialize = -s"($javaType) $serializer.deserialize(java.nio.ByteBuffer.wrap(${input.value}), null)" - -val code = input.code + code""" -final $javaType ${ev.value} = -${input.isNull} ? ${CodeGenerator.defaultValue(dataType)} : $deserialize; -""" -ev.copy(code = code, isNull = input.isNull) - } - -override def dataType: DataType = ObjectType(tag.runtimeClass) - } -*/ - - }
[beam] 28/37: Apply new Encoders to Window assign translation
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 7f1060aa189a625400a1fbcfc2503d3e721ade8f Author: Etienne Chauchot AuthorDate: Fri Sep 27 11:22:15 2019 +0200 Apply new Encoders to Window assign translation --- .../translation/batch/WindowAssignTranslatorBatch.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index fb37f97..576b914 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.sql.Dataset; @@ -44,10 +45,13 @@ class WindowAssignTranslatorBatch if (WindowingHelpers.skipAssignWindows(assignTransform, context)) { context.putDataset(output, inputDataset); } else { + WindowFn windowFn = assignTransform.getWindowFn(); + WindowedValue.FullWindowedValueCoder windoweVdalueCoder = WindowedValue.FullWindowedValueCoder + .of(input.getCoder(), windowFn.windowCoder()); Dataset> outputDataset = inputDataset.map( - WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()), - EncoderHelpers.windowedValueEncoder()); + WindowingHelpers.assignWindowsMapFunction(windowFn), + EncoderHelpers.fromBeamCoder(windoweVdalueCoder)); context.putDataset(output, outputDataset); } }
[beam] 15/37: Remove lazy init of beam coder because there is no generic way on instanciating a beam coder
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 50060a804d95ed1006db98d1fd2c4243ba1fc532 Author: Etienne Chauchot AuthorDate: Thu Sep 5 14:20:30 2019 +0200 Remove lazy init of beam coder because there is no generic way on instanciating a beam coder --- .../translation/helpers/EncoderHelpers.java| 68 +++--- .../structuredstreaming/utils/EncodersTest.java| 2 +- 2 files changed, 21 insertions(+), 49 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 0751c4c..3f7c102 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; import static org.apache.spark.sql.types.DataTypes.BinaryType; -import static scala.compat.java8.JFunction.func; import java.io.ByteArrayInputStream; import java.util.ArrayList; @@ -26,7 +25,6 @@ import java.util.List; import java.util.Objects; import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.sql.Encoder; @@ -92,17 +90,17 @@ public class EncoderHelpers { */ /** A way to construct encoders using generic serializers. */ - public static Encoder fromBeamCoder(Class> coderClass/*, Class claz*/){ + public static Encoder fromBeamCoder(Coder beamCoder/*, Class claz*/){ List serialiserList = new ArrayList<>(); Class claz = (Class) Object.class; -serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), (Class>)coderClass)); +serialiserList.add(new EncodeUsingBeamCoder<>(new BoundReference(0, new ObjectType(claz), true), beamCoder)); ClassTag classTag = ClassTag$.MODULE$.apply(claz); return new ExpressionEncoder<>( SchemaHelpers.binarySchema(), false, JavaConversions.collectionAsScalaIterable(serialiserList).toSeq(), -new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, (Class>)coderClass), +new DecodeUsingBeamCoder<>(new Cast(new GetColumnByOrdinal(0, BinaryType), BinaryType), classTag, beamCoder), classTag); /* @@ -125,11 +123,11 @@ public class EncoderHelpers { public static class EncodeUsingBeamCoder extends UnaryExpression implements NonSQLExpression { private Expression child; -private Class> coderClass; +private Coder beamCoder; -public EncodeUsingBeamCoder(Expression child, Class> coderClass) { +public EncodeUsingBeamCoder(Expression child, Coder beamCoder) { this.child = child; - this.coderClass = coderClass; + this.beamCoder = beamCoder; } @Override public Expression child() { @@ -138,7 +136,7 @@ public class EncoderHelpers { @Override public ExprCode doGenCode(CodegenContext ctx, ExprCode ev) { // Code to serialize. - String beamCoder = lazyInitBeamCoder(ctx, coderClass); + String accessCode = ctx.addReferenceObj("beamCoder", beamCoder, beamCoder.getClass().getName()); ExprCode input = child.genCode(ctx); /* @@ -172,7 +170,7 @@ public class EncoderHelpers { args.add(ev.value()); args.add(input.isNull()); args.add(ev.value()); - args.add(beamCoder); + args.add(accessCode); args.add(input.value()); args.add(ev.value()); Block code = (new Block.BlockHelper(sc)) @@ -191,7 +189,7 @@ public class EncoderHelpers { case 0: return child; case 1: - return coderClass; + return beamCoder; default: throw new ArrayIndexOutOfBoundsException("productElement out of bounds"); } @@ -213,11 +211,11 @@ public class EncoderHelpers { return false; } EncodeUsingBeamCoder that = (EncodeUsingBeamCoder) o; - return coderClass.equals(that.coderClass); + return beamCoder.equals(that.beamCoder); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), coderClass); + return Objects.hash(super.hashCode(), beamCoder); } } @@ -249,12 +247,12 @@ public class EncoderHelpers { private Expression child; privat
[beam] 19/37: Add an assert of equality in the encoders test
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 78b2d2243f0732dd802d9e6f855607d2c2f06e59 Author: Etienne Chauchot AuthorDate: Thu Sep 5 15:28:05 2019 +0200 Add an assert of equality in the encoders test --- .../runners/spark/structuredstreaming/utils/EncodersTest.java | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java index b3a6273..c6b8631 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/utils/EncodersTest.java @@ -1,10 +1,12 @@ package org.apache.beam.runners.spark.structuredstreaming.utils; +import static org.junit.Assert.assertEquals; + import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession; import org.junit.Test; import org.junit.runner.RunWith; @@ -24,7 +26,9 @@ public class EncodersTest { data.add(1); data.add(2); data.add(3); -sparkSession.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); -//sparkSession.createDataset(data, EncoderHelpers.genericEncoder()); +Dataset dataset = sparkSession +.createDataset(data, EncoderHelpers.fromBeamCoder(VarIntCoder.of())); +List results = dataset.collectAsList(); +assertEquals(data, results); } }
[beam] 12/37: Fix ExpressionEncoder generated code: typos, try catch, fqcn
This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git commit 8b07ec8ad0a22732aa6096c24135d942c3928787 Author: Etienne Chauchot AuthorDate: Wed Sep 4 15:38:41 2019 +0200 Fix ExpressionEncoder generated code: typos, try catch, fqcn --- .../translation/helpers/EncoderHelpers.java| 38 +- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java index 1d89101..dff308a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpers.java @@ -144,18 +144,22 @@ public class EncoderHelpers { /* CODE GENERATED - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final bytes[] output; - if ({input.isNull}) - output = null; - else - output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); + try { +java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); +final byte[] output; +if ({input.isNull}) +output = null; +else +output = $beamCoder.encode(${input.value}, baos); baos.toByteArray(); +} catch (Exception e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); +} */ List parts = new ArrayList<>(); - parts.add("ByteArrayOutputStream baos = new ByteArrayOutputStream(); final bytes[] output; if ("); + parts.add("try {java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); final byte[] output; if ("); parts.add(") output = null; else output ="); parts.add(".encode("); - parts.add(", baos); baos.toByteArray();"); + parts.add(", baos); baos.toByteArray();} catch (Exception e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq()); @@ -258,21 +262,25 @@ public class EncoderHelpers { /* CODE GENERATED: - final $javaType output = - ${input.isNull} ? - ${CodeGenerator.defaultValue(dataType)} : - ($javaType) $beamCoder.decode(new ByteArrayInputStream(${input.value})); + try { + final $javaType output = + ${input.isNull} ? + ${CodeGenerator.defaultValue(dataType)} : + ($javaType) $beamCoder.decode(new java.io.ByteArrayInputStream(${input.value})); + } catch (IOException e) { + throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e)); + } */ List parts = new ArrayList<>(); - parts.add("final "); + parts.add("try { final "); parts.add(" output ="); parts.add("?"); parts.add(":"); parts.add("("); parts.add(") "); - parts.add(".decode(new ByteArrayInputStream("); - parts.add("));"); + parts.add(".decode(new java.io.ByteArrayInputStream("); + parts.add(")); } catch (IOException e) {throw new RuntimeException(org.apache.beam.sdk.util.UserCodeException.wrap(e));}"); StringContext sc = new StringContext(JavaConversions.collectionAsScalaIterable(parts).toSeq());