Fix Apex driver and update execution matrix
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ce9bf07 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ce9bf07 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ce9bf07 Branch: refs/heads/master Commit: 9ce9bf076032e1c9aeb3a6dce806ad4b96127157 Parents: 1bd5735 Author: Ismaël MejÃa <ieme...@apache.org> Authored: Tue Mar 21 18:29:20 2017 +0100 Committer: Ismaël MejÃa <ieme...@gmail.com> Committed: Wed Aug 23 19:07:27 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/README.md | 109 +++++++++++-------- integration/java/nexmark/pom.xml | 27 ++++- .../integration/nexmark/NexmarkApexRunner.java | 5 - .../nexmark/NexmarkDirectRunner.java | 5 - .../integration/nexmark/NexmarkFlinkRunner.java | 5 - .../nexmark/NexmarkGoogleRunner.java | 5 - .../beam/integration/nexmark/NexmarkRunner.java | 9 -- .../integration/nexmark/NexmarkSparkRunner.java | 5 - .../apache/beam/integration/nexmark/Query5.java | 1 - .../nexmark/src/main/resources/log4j.properties | 9 ++ .../nexmark/UnboundedEventSourceTest.java | 4 +- 11 files changed, 100 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/README.md ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md index 4c08c28..7a91ab2 100644 --- a/integration/java/nexmark/README.md +++ b/integration/java/nexmark/README.md @@ -122,63 +122,80 @@ Number of events generators --numEventGenerators=4 -## Flink specific configuration +## Apex specific configuration ---suite=SMOKE --manageResources=false --monitorJobs=false \ ---flinkMaster=local +--suite=SMOKE --manageResources=false --monitorJobs=true ## Direct specific configuration ---suite=SMOKE --manageResources=false --monitorJobs=false \ +--suite=SMOKE --manageResources=false --monitorJobs=true \ --enforceEncodability=false --enforceImmutability=false +## Flink specific configuration + +--suite=SMOKE --manageResources=false --monitorJobs=true \ +--flinkMaster=local + ## Spark specific configuration ---suite=SMOKE ---manageResources=false --monitorJobs=false --sparkMaster=local --Dspark.ui.enabled=false --DSPARK_LOCAL_IP=localhost --Dsun.io.serialization.extendedDebugInfo=true +--suite=SMOKE --manageResources=false --monitorJobs=true --sparkMaster=local \ +-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true # Current Status -Open issues are currently opened on [github](https://github.com/iemejia/beam/issues): - -## Batch Mode / Synthetic / Local - -| Query | Direct | Spark | Flink | Apex | -| ----: | ------ | ------ | ------ | ------ | -| 0 | Ok | #1 | Ok | | -| 1 | Ok | #1 | Ok | | -| 2 | Ok | NEX-01 | Ok | | -| 3 | NEX-07 | NEX-07 | NEX-07 | | -| 4 | Ok | Ok | NEX-02 | | -| 5 | Ok | NEX-03 | Ok | | -| 6 | Ok | OK | NEX-02 | | -| 7 | Ok | NEX-01 | Ok | | -| 8 | Ok | NEX-01 | Ok | | -| 9 | Ok | OK | NEX-02 | | -| 10 | NEX-05 | NEX-04 | Ok | | -| 11 | Ok | NEX-01 | Ok | | -| 12 | Ok | NEX-01 | Ok | | - -## Streaming Mode / Synthetic / Local - -| Query | Direct | Spark | Flink | Apex | -| ----: | ------ | ------ | ------ | ------ | -| 0 | Ok | | | | -| 1 | Ok | | | | -| 2 | Ok | | | | -| 3 | NEX-07 | | | | -| 4 | Ok | | | | -| 5 | Ok | | | | -| 6 | Ok | | | | -| 7 | Ok | | | | -| 8 | Ok | | | | -| 9 | Ok | | | | -| 10 | NEX-05 | | | | -| 11 | Ok | | | | -| 12 | Ok | | | | +Open issues are tracked [here](https://github.com../../../../../issues): + +## Batch / Synthetic / Local + +| Query | Direct | Spark | Flink | Apex | +| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------- | +| 0 | ok | [#1](../../../../../issues/1) | ok | ok | +| 1 | ok | [#1](../../../../../issues/1) | ok | ok | +| 2 | ok | [#1](../../../../../issues/1) | ok | ok | +| 3 | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | +| 4 | ok | ok | [#2](../../../../../issues/2) | ok | +| 5 | ok | [#3](../../../../../issues/3) | ok | ok | +| 6 | ok | ok | [#2](../../../../../issues/2) | ok | +| 7 | ok | [#1](../../../../../issues/1) | ok | [#24](../../../../../issues/24) | +| 8 | ok | [#1](../../../../../issues/1) | ok | ok | +| 9 | ok | ok | [#2](../../../../../issues/2) | ok | +| 10 | [#5](../../../../../issues/5) | [#4](../../../../../issues/4) | ok | ok | +| 11 | ok | [#1](../../../../../issues/1) | ok | ok | +| 12 | ok | [#1](../../../../../issues/1) | ok | ok | + +## Streaming / Synthetic / Local + +| Query | Direct | Spark | Flink | Apex | +| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ | +| 0 | ok | | | ok | +| 1 | ok | | | ok | +| 2 | ok | | | ok | +| 3 | [#7](../../../../../issues/7) | | | [#7](../../../../../issues/7) | +| 4 | ok | | | ok | +| 5 | ok | | | ok | +| 6 | ok | | | ok | +| 7 | ok | | | ? | +| 8 | ok | | | ok | +| 9 | ok | | | ok | +| 10 | [#5](../../../../../issues/5) | | | ? | +| 11 | ok | | | Ok | +| 12 | ok | | | Ok | + +## Batch / Synthetic / Cluster + +TODO + +| Query | Dataflow | Spark | Flink | Apex | +| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ | +| 0 | | | | | + +## Streaming / Synthetic / Cluster + +TODO + +| Query | Dataflow | Spark | Flink | Apex | +| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ | +| 0 | | | | | # Running Nexmark http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml index 27abb0e..0ecc298 100644 --- a/integration/java/nexmark/pom.xml +++ b/integration/java/nexmark/pom.xml @@ -28,7 +28,7 @@ </parent> <artifactId>beam-integration-java</artifactId> - <name>Apache Beam :: Integration Tests :: Java All</name> + <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name> <packaging>jar</packaging> @@ -37,6 +37,7 @@ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <flink.version>1.2.0</flink.version> <spark.version>1.6.3</spark.version> + <apex.codehaus.jackson.version>1.9.3</apex.codehaus.jackson.version> <skipITs>true</skipITs> </properties> @@ -207,6 +208,30 @@ <groupId>org.apache.beam</groupId> <artifactId>beam-runners-apex</artifactId> </dependency> + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>${apex.kryo.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>${apex.codehaus.jackson.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + <version>${apex.codehaus.jackson.version}</version> + <scope>runtime</scope> + </dependency> <!-- IOs --> <dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java index ea46082..3b8993a 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java @@ -39,11 +39,6 @@ public class NexmarkApexRunner extends NexmarkRunner<NexmarkApexDriver.NexmarkAp } @Override - protected boolean canMonitor() { - return false; - } - - @Override protected void invokeBuilderForPublishOnlyPipeline( PipelineBuilder builder) { builder.build(options); http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java index c70e41e..0119bbc 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java @@ -41,11 +41,6 @@ class NexmarkDirectRunner extends NexmarkRunner<NexmarkDirectDriver.NexmarkDirec } @Override - protected boolean canMonitor() { - return true; - } - - @Override protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) { throw new UnsupportedOperationException( "Cannot use --pubSubMode=COMBINED with DirectRunner"); http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java index 8e22917..95ab1ad 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java @@ -37,11 +37,6 @@ public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.Nexmark } @Override - protected boolean canMonitor() { - return true; - } - - @Override protected void invokeBuilderForPublishOnlyPipeline( PipelineBuilder builder) { builder.build(options); http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java index 135d428..f4bfb1e 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java @@ -60,11 +60,6 @@ class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogl } @Override - protected boolean canMonitor() { - return true; - } - - @Override protected String getJobId(PipelineResult job) { return ((DataflowPipelineJob) job).getJobId(); } http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index 8d4c1f1..d311dc4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -158,11 +158,6 @@ public abstract class NexmarkRunner<OptionT extends Options> { protected abstract int maxNumWorkers(); /** - * Return true if runner can monitor running jobs. - */ - protected abstract boolean canMonitor(); - - /** * Return the current value for a long counter, or -1 if can't be retrieved. */ protected long getLong(PipelineResult job, Aggregator<Long, Long> aggregator) { @@ -1089,10 +1084,6 @@ public abstract class NexmarkRunner<OptionT extends Options> { */ @Nullable public NexmarkPerf run(NexmarkConfiguration runConfiguration) { - if (options.getMonitorJobs() && !canMonitor()) { - throw new RuntimeException("Cannot use --monitorJobs with this runner since it does not " - + "support monitoring."); - } if (options.getManageResources() && !options.getMonitorJobs()) { throw new RuntimeException("If using --manageResources then must also use --monitorJobs."); } http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java index 32fee30..30ae9ca 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java @@ -37,11 +37,6 @@ public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.Nexmark } @Override - protected boolean canMonitor() { - return true; - } - - @Override protected void invokeBuilderForPublishOnlyPipeline( PipelineBuilder builder) { builder.build(options); http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java index 9020494..2c9fb9b 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java @@ -67,7 +67,6 @@ class Query5 extends NexmarkQuery { // Count the number of bids per auction id. .apply(Count.<Long>perElement()) - //TODO replace by simple key // We'll want to keep all auctions with the maximal number of bids. // Start by lifting each into a singleton list. .apply(name + ".ToSingletons", http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/resources/log4j.properties b/integration/java/nexmark/src/main/resources/log4j.properties index 9d20aea..bc09794 100644 --- a/integration/java/nexmark/src/main/resources/log4j.properties +++ b/integration/java/nexmark/src/main/resources/log4j.properties @@ -35,8 +35,17 @@ log4j.logger.org.apache.spark=WARN log4j.logger.org.spark-project=WARN log4j.logger.io.netty=INFO +# Settings to quiet flink logs log4j.logger.org.apache.flink=WARN +# Settings to quiet apex logs +log4j.logger.org.apache.beam.runners.apex=INFO +log4j.logger.com.datatorrent=ERROR +log4j.logger.org.apache.hadoop.metrics2=WARN +log4j.logger.org.apache.commons=WARN +log4j.logger.org.apache.hadoop.security=WARN +log4j.logger.org.apache.hadoop.util=WARN + # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java index 02761d6..35b3aed 100644 --- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java +++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java @@ -87,8 +87,8 @@ public class UnboundedEventSourceTest { Generator modelGenerator = new Generator(config); EventIdChecker checker = new EventIdChecker(); - Pipeline p = TestPipeline.create(); - PipelineOptions options = p.getOptions(); + PipelineOptions options = TestPipeline.testingPipelineOptions(); + Pipeline p = TestPipeline.create(options); UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false); UnboundedReader<Event> reader = source.createReader(options, null);