Change benchmark workload settings Update configuration of events generation to add some variation Update execution matrix (issue #45)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dbd1b155 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dbd1b155 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dbd1b155 Branch: refs/heads/master Commit: dbd1b155c32c19ce7a6d0c0f0dffb318c9ccdde7 Parents: 683680b Author: Etienne Chauchot <echauc...@gmail.com> Authored: Tue May 9 11:48:00 2017 +0200 Committer: Ismaël MejÃa <ieme...@gmail.com> Committed: Wed Aug 23 19:07:28 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/README.md | 207 +++++++++++-------- .../nexmark/NexmarkConfiguration.java | 10 +- 2 files changed, 128 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dbd1b155/integration/java/nexmark/README.md ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md index a3549f4..a9acd63 100644 --- a/integration/java/nexmark/README.md +++ b/integration/java/nexmark/README.md @@ -30,14 +30,14 @@ These are multiple queries over a three entities model representing on online au - **Auction** represents an item under auction. - **Bid** represents a bid for an item under auction. -The queries exercise many aspects of dataflow model on Beam: +The queries exercise many aspects of Beam model: * **Query1**: What are the bid values in Euro's? Illustrates a simple map. * **Query2**: What are the auctions with particular auction numbers? Illustrates a simple filter. * **Query3**: Who is selling in particular US states? - Illustrates an incremental join (using per-key state) and filter. + Illustrates an incremental join (using per-key state and timer) and filter. * **Query4**: What is the average selling price for each auction category? Illustrates complex join (using custom window functions) and @@ -71,19 +71,17 @@ We have augmented the original queries with five more: compared with event time in non-Global windows for all the other queries. -The queries can be executed using a 'Driver' for a given backend. -Currently the supported drivers are: +We can specify the Beam runner to use with maven profiles, available profiles are: -* **NexmarkApexDriver** for running via the Apex runner. -* **NexmarkDirectDriver** for running locally on a single machine. -* **NexmarkGoogleDriver** for running on the Google Cloud Dataflow service. - Requires a Google Cloud account. -* **NexmarkFlinkDriver** for running on a Flink cluster. Requires the - cluster to be established and the Nexmark jar to be distributed to - each worker. -* **NexmarkSparkDriver** for running on a Spark cluster. +* direct-runner +* spark-runner +* flink-runner +* apex-runner + +The runner must also be specified like in any other Beam pipeline using + + --runner -Other drivers are straightforward. Test data is deterministically synthesized on demand. The test data may be synthesized in the same pipeline as the query itself, @@ -97,11 +95,6 @@ The query results may be: * Send to BigQuery. * Discarded. -Options are provided for measuring progress, measuring overall -pipeline performance, and comparing that performance against a known -baseline. However that machinery has only been implemented against -the Google Cloud Dataflow driver. - # Configuration ## Common configuration parameters @@ -119,45 +112,48 @@ Run query N --query=N ## Available Suites +The suite to run can be chosen using this configuration parameter: -- DEFAULT: Test default configuration with query 0. -- SMOKE: Run the 12 default configurations. -- STRESS: Like smoke but for 1m events. -- FULL_THROTTLE: Like SMOKE but 100m events. + --suite=SUITE - --suite=SMOKE +Available suites are: +* DEFAULT: Test default configuration with query 0. +* SMOKE: Run the 12 default configurations. +* STRESS: Like smoke but for 1m events. +* FULL_THROTTLE: Like SMOKE but 100m events. -### Apex specific configuration + - --suite=SMOKE --manageResources=false --monitorJobs=true +## Apex specific configuration -### Dataflow specific configuration + --manageResources=false --monitorJobs=false - --query=0 --suite=SMOKE --manageResources=false --monitorJobs=true \ +## Dataflow specific configuration + + --manageResources=false --monitorJobs=true \ --enforceEncodability=false --enforceImmutability=false --project=<your project> \ --zone=<your zone> \ --workerMachineType=n1-highmem-8 \ - --stagingLocation=<a gs path for staging> - - --runner=BlockingDataflowRunner \ + --stagingLocation=<a gs path for staging> \ + --runner=DataflowRunner \ --tempLocation=gs://talend-imejia/nexmark/temp/ \ --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \ --filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar -### Direct specific configuration +## Direct specific configuration - --suite=SMOKE --manageResources=false --monitorJobs=true \ + --manageResources=false --monitorJobs=true \ --enforceEncodability=false --enforceImmutability=false -### Flink specific configuration +## Flink specific configuration - --suite=SMOKE --manageResources=false --monitorJobs=true \ - --flinkMaster=[local] --parallelism=#numcores + --manageResources=false --monitorJobs=true \ + --flinkMaster=local --parallelism=#numcores -### Spark specific configuration +## Spark specific configuration - --suite=SMOKE --manageResources=false --monitorJobs=true \ + --manageResources=false --monitorJobs=true \ --sparkMaster=local \ -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true @@ -167,39 +163,39 @@ Open issues are tracked [here](https://github.com../../../../../issues): ## Batch / Synthetic / Local -| Query | Direct | Spark | Flink | Apex | -| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------- | -| 0 | ok | ok | ok | ok | -| 1 | ok | ok | ok | ok | -| 2 | ok | ok | ok | ok | -| 3 | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | -| 4 | ok | ok | [#2](../../../../../issues/2) | ok | -| 5 | ok | ok | ok | ok | -| 6 | ok | ok | [#2](../../../../../issues/2) | ok | -| 7 | ok | ok | ok | [#24](../../../../../issues/24) | -| 8 | ok | ok | ok | ok | -| 9 | ok | ok | [#2](../../../../../issues/2) | ok | -| 10 | [#5](../../../../../issues/5) | ok | ok | ok | -| 11 | ok | ok | ok | ok | -| 12 | ok | ok | ok | ok | +| Query | Direct | Spark | Flink | Apex | +| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ | +| 0 | ok | ok | ok | ok | +| 1 | ok | ok | ok | ok | +| 2 | ok | ok | ok | ok | +| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | ok | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) | +| 4 | ok | ok | ok | ok | +| 5 | ok | ok | ok | ok | +| 6 | ok | ok | ok | ok | +| 7 | ok | ok | ok | ok | +| 8 | ok | ok | ok | ok | +| 9 | ok | ok | ok | ok | +| 10 | ok | ok | ok | ok | +| 11 | ok | ok | ok | ok | +| 12 | ok | ok | ok | ok | ## Streaming / Synthetic / Local -| Query | Direct | Spark | Flink | Apex | -| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ | -| 0 | ok | | | ok | -| 1 | ok | | | ok | -| 2 | ok | | | ok | -| 3 | [#7](../../../../../issues/7) | | | [#7](../../../../../issues/7) | -| 4 | ok | | | ok | -| 5 | ok | | | ok | -| 6 | ok | | | ok | -| 7 | ok | | | ? | -| 8 | ok | | | ok | -| 9 | ok | | | ok | -| 10 | [#5](../../../../../issues/5) | | | ? | -| 11 | ok | | | Ok | -| 12 | ok | | | Ok | +| Query | Direct | Spark | Flink | Apex | +| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ | +| 0 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 1 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 2 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) | +| 4 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 5 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 6 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 7 | ok | [BEAM-2112](https://issues.apache.org/jira/browse/BEAM-2112) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 8 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 9 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 10 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 11 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | +| 12 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | ## Batch / Synthetic / Cluster @@ -219,26 +215,63 @@ TODO # Running Nexmark -## Running on the DirectRunner (local) +## Running SMOKE suite on the DirectRunner (local) Batch Mode --Dexec.classpathScope="test" + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true" + +Streaming Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=true --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true" + - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.drivers.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=false --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false" +## Running SMOKE suite on the SparkRunner (local) + +Batch Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true" Streaming Mode - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.drivers.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=true --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false" + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true" + + +## Running SMOKE suite on the FlinkRunner (local) -## Running on Google Cloud Dataflow +Batch Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true --flinkMaster=local" + +Streaming Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true --flinkMaster=local" + + +## Running SMOKE suite on the ApexRunner (local) + +Batch Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=false" + +Streaming Mode + + mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=false" + + +## Running SMOKE suite on Google Cloud Dataflow + +Building package + + mvn clean package -Pdataflow-runner + +Submit to Google Dataflow service -An example invocation for **Query10** on the Google Cloud Dataflow -service. ``` -java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \ - org.apache.beam.integration.nexmark.drivers.NexmarkGoogleDriver \ +java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \ + org.apache.beam.integration.nexmark.Main \ + --runner=DataflowRunner --project=<your project> \ --zone=<your zone> \ --workerMachineType=n1-highmem-8 \ @@ -253,7 +286,7 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S --numEventGenerators=64 \ --numWorkers=16 \ --maxNumWorkers=16 \ - --query=10 \ + --suite=SMOKE \ --firstEventRate=100000 \ --nextEventRate=100000 \ --ratePeriodSec=3600 \ @@ -270,8 +303,9 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S ``` ``` -java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \ - org.apache.beam.integration.nexmark.drivers.NexmarkGoogleDriver \ +java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \ + org.apache.beam.integration.nexmark.Main \ + --runner=DataflowRunner --project=<your project> \ --zone=<your zone> \ --workerMachineType=n1-highmem-8 \ @@ -285,7 +319,7 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S --monitorJobs=false \ --numWorkers=64 \ --maxNumWorkers=64 \ - --query=10 \ + --suite=SMOKE \ --usePubsubPublishTime=true \ --outputPath=<a gs path under which log files will be written> \ --windowSizeSec=600 \ @@ -294,8 +328,13 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S --experiments=enable_custom_pubsub_source ``` -## Running on Flink +## Running query 0 on a Spark cluster with yarn + +Building package + + mvn clean package -Pspark-runner + +Submit to the cluster + + spark-submit --master yarn-client --class org.apache.beam.integration.nexmark.Main --driver-memory 512m --executor-memory 512m --executor-cores 1 beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar --runner=SparkRunner --query=0 --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true -See [BEAM_ON_FLINK_ON_GCP](./BEAM_ON_FLINK_ON_GCP.md) for instructions -on running a NexMark pipeline using Flink hosted on a Google Compute -Platform cluster. http://git-wip-us.apache.org/repos/asf/beam/blob/dbd1b155/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java index 1da08b4..5a8cb71 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java @@ -140,15 +140,15 @@ public class NexmarkConfiguration implements Serializable { /** Ratio of bids to 'hot' auctions compared to all other auctions. */ @JsonProperty - public int hotAuctionRatio = 1; + public int hotAuctionRatio = 2; /** Ratio of auctions for 'hot' sellers compared to all other people. */ @JsonProperty - public int hotSellersRatio = 1; + public int hotSellersRatio = 4; /** Ratio of bids for 'hot' bidders compared to all other people. */ @JsonProperty - public int hotBiddersRatio = 1; + public int hotBiddersRatio = 4; /** Window size, in seconds, for queries 3, 5, 7 and 8. */ @JsonProperty @@ -211,13 +211,13 @@ public class NexmarkConfiguration implements Serializable { * Length of occasional delay to impose on events (in seconds). */ @JsonProperty - public long occasionalDelaySec = 0; + public long occasionalDelaySec = 3; /** * Probability that an event will be delayed by delayS. */ @JsonProperty - public double probDelayedEvent = 0.0; + public double probDelayedEvent = 0.1; /** * Maximum size of each log file (in events). For Query10 only.