Repository: incubator-beam Updated Branches: refs/heads/master c5329f9b4 -> a2c342cfd
Build in eclipse/eclipse-jdt Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/957c545e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/957c545e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/957c545e Branch: refs/heads/master Commit: 957c545eaa33c861b561418b1c7dadf4c31f92f3 Parents: c5329f9 Author: Daniel Kulp <dk...@apache.org> Authored: Thu Oct 13 12:41:32 2016 -0400 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Tue Oct 18 16:53:10 2016 +0200 ---------------------------------------------------------------------- .travis.yml | 2 ++ .../beam/runners/direct/BoundedReadEvaluatorFactory.java | 3 ++- .../java/org/apache/beam/runners/direct/DirectMetrics.java | 7 +++++++ .../beam/runners/direct/TestStreamEvaluatorFactory.java | 2 +- .../beam/runners/direct/UnboundedReadEvaluatorFactory.java | 5 +++-- .../main/java/org/apache/beam/runners/spark/io/SourceRDD.java | 6 ++++-- 6 files changed, 19 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/957c545e/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 786b370..5133a43 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,6 +39,8 @@ matrix: - os: linux env: CUSTOM_JDK="oraclejdk7" MAVEN_OVERRIDE="-DbeamSurefireArgline='-Xmx512m'" - os: linux + env: CUSTOM_JDK="oraclejdk7" MAVEN_OVERRIDE="-DbeamSurefireArgline='-Xmx512m' -Peclipse-jdt" + - os: linux env: CUSTOM_JDK="openjdk7" MAVEN_OVERRIDE="-DbeamSurefireArgline='-Xmx512m'" before_install: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/957c545e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 843dcd6..add1e8a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,7 +138,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { private <OutputT> Collection<CommittedBundle<BoundedSourceShard<OutputT>>> createInitialSplits( - AppliedPTransform<?, ?, Bounded<OutputT>> transform, int targetParallelism) + AppliedPTransform<PBegin, ?, Bounded<OutputT>> transform, int targetParallelism) throws Exception { BoundedSource<OutputT> source = transform.getTransform().getSource(); PipelineOptions options = evaluationContext.getPipelineOptions(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/957c545e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index a749a76..145326f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -224,6 +224,13 @@ class DirectMetrics extends MetricResults { @AutoValue abstract static class DirectMetricResult<T> implements MetricResult<T> { + // need to define these here so they appear in the correct order + // and the generated constructor is usable and consistent + public abstract MetricName name(); + public abstract String step(); + public abstract T committed(); + public abstract T attempted(); + public static <T> MetricResult<T> create(MetricName name, String scope, T committed, T attempted) { return new AutoValue_DirectMetrics_DirectMetricResult<T>( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/957c545e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 065adc1..97e6b4d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -205,7 +205,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { } private <T> Collection<CommittedBundle<?>> createInputBundle( - AppliedPTransform<?, ?, TestStream<T>> transform) { + AppliedPTransform<PBegin, ?, TestStream<T>> transform) { CommittedBundle<TestStreamIndex<T>> initialBundle = evaluationContext .<TestStreamIndex<T>>createRootBundle() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/957c545e/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 18d3d0a..28f88b3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Instant; import org.slf4j.Logger; @@ -74,7 +75,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } private <OutputT> TransformEvaluator<?> createEvaluator( - AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> application) { + AppliedPTransform<PBegin, PCollection<OutputT>, Read.Unbounded<OutputT>> application) { return new UnboundedReadEvaluator<>( application, evaluationContext, readerReuseChance); } @@ -263,7 +264,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } private <OutputT> Collection<CommittedBundle<?>> createInitialSplits( - AppliedPTransform<?, ?, Unbounded<OutputT>> transform, int targetParallelism) + AppliedPTransform<PBegin, ?, Unbounded<OutputT>> transform, int targetParallelism) throws Exception { UnboundedSource<OutputT, ?> source = transform.getTransform().getSource(); List<? extends UnboundedSource<OutputT, ?>> splits = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/957c545e/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index ec2d2cf..679b8b1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.spark.io; import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; @@ -56,8 +57,9 @@ public class SourceRDD { private final int numPartitions; // to satisfy Scala API. - private static final scala.collection.immutable.List<Dependency<?>> NIL = - scala.collection.immutable.List.empty(); + private static final scala.collection.immutable.Seq<Dependency<?>> NIL = + scala.collection.JavaConversions + .asScalaBuffer(Collections.<Dependency<?>>emptyList()).toList(); public Bounded(SparkContext sc, BoundedSource<T> source,