Move module beam-integration-java-nexmark to beam-sdks-java-nexmark
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4333df7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4333df7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4333df7 Branch: refs/heads/master Commit: f4333df77267d5207f0f23ae62e79b171a00e8a7 Parents: 2f9b494 Author: Etienne Chauchot <echauc...@gmail.com> Authored: Thu Jun 15 11:55:26 2017 +0200 Committer: Ismaël MejÃa <ieme...@gmail.com> Committed: Wed Aug 23 19:07:29 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/README.md | 340 ----- integration/java/nexmark/pom.xml | 292 ----- .../apache/beam/integration/nexmark/Main.java | 304 ----- .../beam/integration/nexmark/Monitor.java | 79 -- .../nexmark/NexmarkConfiguration.java | 721 ----------- .../integration/nexmark/NexmarkLauncher.java | 1158 ------------------ .../integration/nexmark/NexmarkOptions.java | 403 ------ .../beam/integration/nexmark/NexmarkPerf.java | 208 ---- .../beam/integration/nexmark/NexmarkSuite.java | 112 -- .../beam/integration/nexmark/NexmarkUtils.java | 672 ---------- .../beam/integration/nexmark/model/Auction.java | 187 --- .../integration/nexmark/model/AuctionBid.java | 84 -- .../integration/nexmark/model/AuctionCount.java | 84 -- .../integration/nexmark/model/AuctionPrice.java | 88 -- .../beam/integration/nexmark/model/Bid.java | 177 --- .../nexmark/model/BidsPerSession.java | 87 -- .../nexmark/model/CategoryPrice.java | 97 -- .../beam/integration/nexmark/model/Done.java | 80 -- .../beam/integration/nexmark/model/Event.java | 171 --- .../nexmark/model/IdNameReserve.java | 98 -- .../integration/nexmark/model/KnownSize.java | 26 - .../nexmark/model/NameCityStateId.java | 103 -- .../beam/integration/nexmark/model/Person.java | 163 --- .../integration/nexmark/model/SellerPrice.java | 89 -- .../integration/nexmark/model/package-info.java | 22 - .../beam/integration/nexmark/package-info.java | 21 - .../nexmark/queries/AbstractSimulator.java | 211 ---- .../nexmark/queries/NexmarkQuery.java | 270 ---- .../nexmark/queries/NexmarkQueryModel.java | 118 -- .../integration/nexmark/queries/Query0.java | 71 -- .../nexmark/queries/Query0Model.java | 64 - .../integration/nexmark/queries/Query1.java | 67 - .../integration/nexmark/queries/Query10.java | 367 ------ .../integration/nexmark/queries/Query11.java | 79 -- .../integration/nexmark/queries/Query12.java | 80 -- .../nexmark/queries/Query1Model.java | 76 -- .../integration/nexmark/queries/Query2.java | 79 -- .../nexmark/queries/Query2Model.java | 80 -- .../integration/nexmark/queries/Query3.java | 301 ----- .../nexmark/queries/Query3Model.java | 124 -- .../integration/nexmark/queries/Query4.java | 116 -- .../nexmark/queries/Query4Model.java | 186 --- .../integration/nexmark/queries/Query5.java | 138 --- .../nexmark/queries/Query5Model.java | 176 --- .../integration/nexmark/queries/Query6.java | 155 --- .../nexmark/queries/Query6Model.java | 133 -- .../integration/nexmark/queries/Query7.java | 90 -- .../nexmark/queries/Query7Model.java | 130 -- .../integration/nexmark/queries/Query8.java | 97 -- .../nexmark/queries/Query8Model.java | 148 --- .../integration/nexmark/queries/Query9.java | 44 - .../nexmark/queries/Query9Model.java | 44 - .../nexmark/queries/WinningBids.java | 412 ------- .../nexmark/queries/WinningBidsSimulator.java | 206 ---- .../nexmark/queries/package-info.java | 22 - .../nexmark/sources/BoundedEventSource.java | 190 --- .../integration/nexmark/sources/Generator.java | 609 --------- .../nexmark/sources/GeneratorConfig.java | 301 ----- .../nexmark/sources/UnboundedEventSource.java | 330 ----- .../nexmark/sources/package-info.java | 22 - .../nexmark/src/main/resources/log4j.properties | 55 - .../integration/nexmark/queries/QueryTest.java | 185 --- .../nexmark/sources/BoundedEventSourceTest.java | 71 -- .../nexmark/sources/GeneratorTest.java | 111 -- .../sources/UnboundedEventSourceTest.java | 107 -- integration/java/pom.xml | 37 - integration/pom.xml | 51 - pom.xml | 1 - sdks/java/nexmark/README.md | 340 +++++ sdks/java/nexmark/pom.xml | 292 +++++ .../java/org/apache/beam/sdk/nexmark/Main.java | 303 +++++ .../org/apache/beam/sdk/nexmark/Monitor.java | 78 ++ .../beam/sdk/nexmark/NexmarkConfiguration.java | 721 +++++++++++ .../beam/sdk/nexmark/NexmarkLauncher.java | 1157 +++++++++++++++++ .../apache/beam/sdk/nexmark/NexmarkOptions.java | 403 ++++++ .../apache/beam/sdk/nexmark/NexmarkPerf.java | 207 ++++ .../apache/beam/sdk/nexmark/NexmarkSuite.java | 112 ++ .../apache/beam/sdk/nexmark/NexmarkUtils.java | 674 ++++++++++ .../apache/beam/sdk/nexmark/model/Auction.java | 187 +++ .../beam/sdk/nexmark/model/AuctionBid.java | 85 ++ .../beam/sdk/nexmark/model/AuctionCount.java | 84 ++ .../beam/sdk/nexmark/model/AuctionPrice.java | 88 ++ .../org/apache/beam/sdk/nexmark/model/Bid.java | 177 +++ .../beam/sdk/nexmark/model/BidsPerSession.java | 87 ++ .../beam/sdk/nexmark/model/CategoryPrice.java | 97 ++ .../org/apache/beam/sdk/nexmark/model/Done.java | 80 ++ .../apache/beam/sdk/nexmark/model/Event.java | 171 +++ .../beam/sdk/nexmark/model/IdNameReserve.java | 98 ++ .../beam/sdk/nexmark/model/KnownSize.java | 26 + .../beam/sdk/nexmark/model/NameCityStateId.java | 103 ++ .../apache/beam/sdk/nexmark/model/Person.java | 163 +++ .../beam/sdk/nexmark/model/SellerPrice.java | 89 ++ .../beam/sdk/nexmark/model/package-info.java | 22 + .../apache/beam/sdk/nexmark/package-info.java | 21 + .../sdk/nexmark/queries/AbstractSimulator.java | 211 ++++ .../beam/sdk/nexmark/queries/NexmarkQuery.java | 270 ++++ .../sdk/nexmark/queries/NexmarkQueryModel.java | 118 ++ .../apache/beam/sdk/nexmark/queries/Query0.java | 70 ++ .../beam/sdk/nexmark/queries/Query0Model.java | 64 + .../apache/beam/sdk/nexmark/queries/Query1.java | 67 + .../beam/sdk/nexmark/queries/Query10.java | 367 ++++++ .../beam/sdk/nexmark/queries/Query11.java | 79 ++ .../beam/sdk/nexmark/queries/Query12.java | 80 ++ .../beam/sdk/nexmark/queries/Query1Model.java | 76 ++ .../apache/beam/sdk/nexmark/queries/Query2.java | 79 ++ .../beam/sdk/nexmark/queries/Query2Model.java | 80 ++ .../apache/beam/sdk/nexmark/queries/Query3.java | 301 +++++ .../beam/sdk/nexmark/queries/Query3Model.java | 124 ++ .../apache/beam/sdk/nexmark/queries/Query4.java | 116 ++ .../beam/sdk/nexmark/queries/Query4Model.java | 186 +++ .../apache/beam/sdk/nexmark/queries/Query5.java | 138 +++ .../beam/sdk/nexmark/queries/Query5Model.java | 176 +++ .../apache/beam/sdk/nexmark/queries/Query6.java | 155 +++ .../beam/sdk/nexmark/queries/Query6Model.java | 133 ++ .../apache/beam/sdk/nexmark/queries/Query7.java | 90 ++ .../beam/sdk/nexmark/queries/Query7Model.java | 130 ++ .../apache/beam/sdk/nexmark/queries/Query8.java | 97 ++ .../beam/sdk/nexmark/queries/Query8Model.java | 148 +++ .../apache/beam/sdk/nexmark/queries/Query9.java | 44 + .../beam/sdk/nexmark/queries/Query9Model.java | 44 + .../beam/sdk/nexmark/queries/WinningBids.java | 412 +++++++ .../nexmark/queries/WinningBidsSimulator.java | 206 ++++ .../beam/sdk/nexmark/queries/package-info.java | 22 + .../sdk/nexmark/sources/BoundedEventSource.java | 190 +++ .../beam/sdk/nexmark/sources/Generator.java | 609 +++++++++ .../sdk/nexmark/sources/GeneratorConfig.java | 298 +++++ .../nexmark/sources/UnboundedEventSource.java | 329 +++++ .../beam/sdk/nexmark/sources/package-info.java | 22 + .../nexmark/src/main/resources/log4j.properties | 55 + .../beam/sdk/nexmark/queries/QueryTest.java | 185 +++ .../nexmark/sources/BoundedEventSourceTest.java | 70 ++ .../beam/sdk/nexmark/sources/GeneratorTest.java | 110 ++ .../sources/UnboundedEventSourceTest.java | 105 ++ sdks/java/pom.xml | 1 + 134 files changed, 11922 insertions(+), 12020 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/README.md ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md deleted file mode 100644 index a9acd63..0000000 --- a/integration/java/nexmark/README.md +++ /dev/null @@ -1,340 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> - -# NEXMark integration suite - -This is a suite of pipelines inspired by the 'continuous data stream' -queries in [http://datalab.cs.pdx.edu/niagaraST/NEXMark/] -(http://datalab.cs.pdx.edu/niagaraST/NEXMark/). - -These are multiple queries over a three entities model representing on online auction system: - - - **Person** represents a person submitting an item for auction and/or making a bid - on an auction. - - **Auction** represents an item under auction. - - **Bid** represents a bid for an item under auction. - -The queries exercise many aspects of Beam model: - -* **Query1**: What are the bid values in Euro's? - Illustrates a simple map. -* **Query2**: What are the auctions with particular auction numbers? - Illustrates a simple filter. -* **Query3**: Who is selling in particular US states? - Illustrates an incremental join (using per-key state and timer) and filter. -* **Query4**: What is the average selling price for each auction - category? - Illustrates complex join (using custom window functions) and - aggregation. -* **Query5**: Which auctions have seen the most bids in the last period? - Illustrates sliding windows and combiners. -* **Query6**: What is the average selling price per seller for their - last 10 closed auctions. - Shares the same 'winning bids' core as for **Query4**, and - illustrates a specialized combiner. -* **Query7**: What are the highest bids per period? - Deliberately implemented using a side input to illustrate fanout. -* **Query8**: Who has entered the system and created an auction in - the last period? - Illustrates a simple join. - -We have augmented the original queries with five more: - -* **Query0**: Pass-through. - Allows us to measure the monitoring overhead. -* **Query9**: Winning-bids. - A common sub-query shared by **Query4** and **Query6**. -* **Query10**: Log all events to GCS files. - Illustrates windows with large side effects on firing. -* **Query11**: How many bids did a user make in each session they - were active? - Illustrates session windows. -* **Query12**: How many bids does a user make within a fixed - processing time limit? - Illustrates working in processing time in the Global window, as - compared with event time in non-Global windows for all the other - queries. - -We can specify the Beam runner to use with maven profiles, available profiles are: - -* direct-runner -* spark-runner -* flink-runner -* apex-runner - -The runner must also be specified like in any other Beam pipeline using - - --runner - - -Test data is deterministically synthesized on demand. The test -data may be synthesized in the same pipeline as the query itself, -or may be published to Pubsub. - -The query results may be: - -* Published to Pubsub. -* Written to text files as plain text. -* Written to text files using an Avro encoding. -* Send to BigQuery. -* Discarded. - -# Configuration - -## Common configuration parameters - -Decide if batch or streaming: - - --streaming=true - -Number of events generators - - --numEventGenerators=4 - -Run query N - - --query=N - -## Available Suites -The suite to run can be chosen using this configuration parameter: - - --suite=SUITE - -Available suites are: -* DEFAULT: Test default configuration with query 0. -* SMOKE: Run the 12 default configurations. -* STRESS: Like smoke but for 1m events. -* FULL_THROTTLE: Like SMOKE but 100m events. - - - -## Apex specific configuration - - --manageResources=false --monitorJobs=false - -## Dataflow specific configuration - - --manageResources=false --monitorJobs=true \ - --enforceEncodability=false --enforceImmutability=false - --project=<your project> \ - --zone=<your zone> \ - --workerMachineType=n1-highmem-8 \ - --stagingLocation=<a gs path for staging> \ - --runner=DataflowRunner \ - --tempLocation=gs://talend-imejia/nexmark/temp/ \ - --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \ - --filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar - -## Direct specific configuration - - --manageResources=false --monitorJobs=true \ - --enforceEncodability=false --enforceImmutability=false - -## Flink specific configuration - - --manageResources=false --monitorJobs=true \ - --flinkMaster=local --parallelism=#numcores - -## Spark specific configuration - - --manageResources=false --monitorJobs=true \ - --sparkMaster=local \ - -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true - -# Current Status - -Open issues are tracked [here](https://github.com../../../../../issues): - -## Batch / Synthetic / Local - -| Query | Direct | Spark | Flink | Apex | -| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ | -| 0 | ok | ok | ok | ok | -| 1 | ok | ok | ok | ok | -| 2 | ok | ok | ok | ok | -| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | ok | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) | -| 4 | ok | ok | ok | ok | -| 5 | ok | ok | ok | ok | -| 6 | ok | ok | ok | ok | -| 7 | ok | ok | ok | ok | -| 8 | ok | ok | ok | ok | -| 9 | ok | ok | ok | ok | -| 10 | ok | ok | ok | ok | -| 11 | ok | ok | ok | ok | -| 12 | ok | ok | ok | ok | - -## Streaming / Synthetic / Local - -| Query | Direct | Spark | Flink | Apex | -| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ | -| 0 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 1 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 2 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) | -| 4 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 5 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 6 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 7 | ok | [BEAM-2112](https://issues.apache.org/jira/browse/BEAM-2112) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 8 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 9 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 10 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 11 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | -| 12 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok | - -## Batch / Synthetic / Cluster - -TODO - -| Query | Dataflow | Spark | Flink | Apex | -| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ | -| 0 | | | | | - -## Streaming / Synthetic / Cluster - -TODO - -| Query | Dataflow | Spark | Flink | Apex | -| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ | -| 0 | | | | | - -# Running Nexmark - -## Running SMOKE suite on the DirectRunner (local) - -Batch Mode - - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true" - -Streaming Mode - - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=true --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true" - - -## Running SMOKE suite on the SparkRunner (local) - -Batch Mode - - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true" - -Streaming Mode - - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true" - - -## Running SMOKE suite on the FlinkRunner (local) - -Batch Mode - - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true --flinkMaster=local" - -Streaming Mode - - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true --flinkMaster=local" - - -## Running SMOKE suite on the ApexRunner (local) - -Batch Mode - - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=false" - -Streaming Mode - - mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=false" - - -## Running SMOKE suite on Google Cloud Dataflow - -Building package - - mvn clean package -Pdataflow-runner - -Submit to Google Dataflow service - - -``` -java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \ - org.apache.beam.integration.nexmark.Main \ - --runner=DataflowRunner - --project=<your project> \ - --zone=<your zone> \ - --workerMachineType=n1-highmem-8 \ - --stagingLocation=<a gs path for staging> \ - --streaming=true \ - --sourceType=PUBSUB \ - --pubSubMode=PUBLISH_ONLY \ - --pubsubTopic=<an existing Pubsub topic> \ - --resourceNameMode=VERBATIM \ - --manageResources=false \ - --monitorJobs=false \ - --numEventGenerators=64 \ - --numWorkers=16 \ - --maxNumWorkers=16 \ - --suite=SMOKE \ - --firstEventRate=100000 \ - --nextEventRate=100000 \ - --ratePeriodSec=3600 \ - --isRateLimited=true \ - --avgPersonByteSize=500 \ - --avgAuctionByteSize=500 \ - --avgBidByteSize=500 \ - --probDelayedEvent=0.000001 \ - --occasionalDelaySec=3600 \ - --numEvents=0 \ - --useWallclockEventTime=true \ - --usePubsubPublishTime=true \ - --experiments=enable_custom_pubsub_sink -``` - -``` -java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \ - org.apache.beam.integration.nexmark.Main \ - --runner=DataflowRunner - --project=<your project> \ - --zone=<your zone> \ - --workerMachineType=n1-highmem-8 \ - --stagingLocation=<a gs path for staging> \ - --streaming=true \ - --sourceType=PUBSUB \ - --pubSubMode=SUBSCRIBE_ONLY \ - --pubsubSubscription=<an existing Pubsub subscription to above topic> \ - --resourceNameMode=VERBATIM \ - --manageResources=false \ - --monitorJobs=false \ - --numWorkers=64 \ - --maxNumWorkers=64 \ - --suite=SMOKE \ - --usePubsubPublishTime=true \ - --outputPath=<a gs path under which log files will be written> \ - --windowSizeSec=600 \ - --occasionalDelaySec=3600 \ - --maxLogEvents=10000 \ - --experiments=enable_custom_pubsub_source -``` - -## Running query 0 on a Spark cluster with yarn - -Building package - - mvn clean package -Pspark-runner - -Submit to the cluster - - spark-submit --master yarn-client --class org.apache.beam.integration.nexmark.Main --driver-memory 512m --executor-memory 512m --executor-cores 1 beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar --runner=SparkRunner --query=0 --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true - http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml deleted file mode 100644 index 664a410..0000000 --- a/integration/java/nexmark/pom.xml +++ /dev/null @@ -1,292 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.beam</groupId> - <artifactId>beam-integration-java-parent</artifactId> - <version>2.1.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>beam-integration-java-nexmark</artifactId> - <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name> - <packaging>jar</packaging> - - <profiles> - - <!-- - The direct runner is available by default. - You can also include it on the classpath explicitly with -P direct-runner - --> - <profile> - <id>direct-runner</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-direct-java</artifactId> - <scope>runtime</scope> - </dependency> - </dependencies> - </profile> - - <!-- Include the Apache Apex runner with -P apex-runner --> - <profile> - <id>apex-runner</id> - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-apex</artifactId> - <scope>runtime</scope> - </dependency> - <!-- - Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from - google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This - can be removed when the project no longer has a dependency on a different httpclient version. - --> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <version>4.3.5</version> - <scope>runtime</scope> - <exclusions> - <exclusion> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - </profile> - - <!-- Include the Apache Flink runner with -P flink-runner --> - <profile> - <id>flink-runner</id> - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-flink_2.10</artifactId> - <scope>runtime</scope> - </dependency> - </dependencies> - </profile> - - <!-- Include the Apache Spark runner -P spark-runner --> - <profile> - <id>spark-runner</id> - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-spark</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.10</artifactId> - <version>${spark.version}</version> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.10</artifactId> - <version>${spark.version}</version> - <scope>runtime</scope> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>jul-to-slf4j</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - </profile> - - <!-- Include the Google Cloud Dataflow runner -P dataflow-runner --> - <profile> - <id>dataflow-runner</id> - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> - <scope>runtime</scope> - </dependency> - </dependencies> - </profile> - </profiles> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-bundled-${project.version}</finalName> - <artifactSet> - <includes> - <include>*:*</include> - </includes> - </artifactSet> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - </execution> - </executions> - </plugin> - - <!-- Avro plugin for automatic code generation --> - <plugin> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <executions> - <execution> - <id>schemas</id> - <phase>generate-sources</phase> - <goals> - <goal>schema</goal> - </goals> - <configuration> - <sourceDirectory>${project.basedir}/src/main/</sourceDirectory> - <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory> - </configuration> - </execution> - </executions> - </plugin> - - <!-- Coverage analysis for unit tests. --> - <plugin> - <groupId>org.jacoco</groupId> - <artifactId>jacoco-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - - <dependencies> - <!-- Java SDK --> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-core</artifactId> - </dependency> - - <!-- IOs --> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId> - </dependency> - - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-bigquery</artifactId> - </dependency> - - <!-- Extra libraries --> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </dependency> - - <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - <scope>runtime</scope> - </dependency> - - <dependency> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-core</artifactId> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - </dependency> - - <!-- Test --> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-direct-java</artifactId> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java deleted file mode 100644 index 4c23651..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; - -import org.apache.beam.integration.nexmark.model.Auction; -import org.apache.beam.integration.nexmark.model.Bid; -import org.apache.beam.integration.nexmark.model.Person; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * An implementation of the 'NEXMark queries' for Beam. - * These are multiple queries over a three table schema representing an online auction system: - * <ul> - * <li>{@link Person} represents a person submitting an item for auction and/or making a bid - * on an auction. - * <li>{@link Auction} represents an item under auction. - * <li>{@link Bid} represents a bid for an item under auction. - * </ul> - * The queries exercise many aspects of the Beam model. - * - * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not - * particularly sensible. - * - * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/"> - * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a> - */ -public class Main<OptionT extends NexmarkOptions> { - - /** - * Entry point. - */ - void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) { - Instant start = Instant.now(); - Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename()); - Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>(); - Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options); - - boolean successful = true; - try { - // Run all the configurations. - for (NexmarkConfiguration configuration : configurations) { - NexmarkPerf perf = nexmarkLauncher.run(configuration); - if (perf != null) { - if (perf.errors == null || perf.errors.size() > 0) { - successful = false; - } - appendPerf(options.getPerfFilename(), configuration, perf); - actual.put(configuration, perf); - // Summarize what we've run so far. - saveSummary(null, configurations, actual, baseline, start); - } - } - } finally { - if (options.getMonitorJobs()) { - // Report overall performance. - saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start); - saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start); - } - } - - if (!successful) { - throw new RuntimeException("Execution was not successful"); - } - } - - /** - * Append the pair of {@code configuration} and {@code perf} to perf file. - */ - private void appendPerf( - @Nullable String perfFilename, NexmarkConfiguration configuration, - NexmarkPerf perf) { - if (perfFilename == null) { - return; - } - List<String> lines = new ArrayList<>(); - lines.add(""); - lines.add(String.format("# %s", Instant.now())); - lines.add(String.format("# %s", configuration.toShortString())); - lines.add(configuration.toString()); - lines.add(perf.toString()); - try { - Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE, - StandardOpenOption.APPEND); - } catch (IOException e) { - throw new RuntimeException("Unable to write perf file: ", e); - } - NexmarkUtils.console("appended results to perf file %s.", perfFilename); - } - - /** - * Load the baseline perf. - */ - @Nullable - private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline( - @Nullable String baselineFilename) { - if (baselineFilename == null) { - return null; - } - Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>(); - List<String> lines; - try { - lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException("Unable to read baseline perf file: ", e); - } - for (int i = 0; i < lines.size(); i++) { - if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) { - continue; - } - NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++)); - NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i)); - baseline.put(configuration, perf); - } - NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(), - baselineFilename); - return baseline; - } - - private static final String LINE = - "=========================================================================================="; - - /** - * Print summary of {@code actual} vs (if non-null) {@code baseline}. - */ - private static void saveSummary( - @Nullable String summaryFilename, - Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual, - @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) { - List<String> lines = new ArrayList<>(); - - lines.add(""); - lines.add(LINE); - - lines.add( - String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now()))); - lines.add(""); - - lines.add("Default configuration:"); - lines.add(NexmarkConfiguration.DEFAULT.toString()); - lines.add(""); - - lines.add("Configurations:"); - lines.add(" Conf Description"); - int conf = 0; - for (NexmarkConfiguration configuration : configurations) { - lines.add(String.format(" %04d %s", conf++, configuration.toShortString())); - NexmarkPerf actualPerf = actual.get(configuration); - if (actualPerf != null && actualPerf.jobId != null) { - lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId)); - } - } - - lines.add(""); - lines.add("Performance:"); - lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)", - "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)")); - conf = 0; - for (NexmarkConfiguration configuration : configurations) { - String line = String.format(" %04d ", conf++); - NexmarkPerf actualPerf = actual.get(configuration); - if (actualPerf == null) { - line += "*** not run ***"; - } else { - NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration); - double runtimeSec = actualPerf.runtimeSec; - line += String.format("%12.1f ", runtimeSec); - if (baselinePerf == null) { - line += String.format("%12s ", ""); - } else { - double baselineRuntimeSec = baselinePerf.runtimeSec; - double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0; - line += String.format("%+11.2f%% ", diff); - } - - double eventsPerSec = actualPerf.eventsPerSec; - line += String.format("%12.1f ", eventsPerSec); - if (baselinePerf == null) { - line += String.format("%12s ", ""); - } else { - double baselineEventsPerSec = baselinePerf.eventsPerSec; - double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0; - line += String.format("%+11.2f%% ", diff); - } - - long numResults = actualPerf.numResults; - line += String.format("%12d ", numResults); - if (baselinePerf == null) { - line += String.format("%12s", ""); - } else { - long baselineNumResults = baselinePerf.numResults; - long diff = numResults - baselineNumResults; - line += String.format("%+12d", diff); - } - } - lines.add(line); - - if (actualPerf != null) { - List<String> errors = actualPerf.errors; - if (errors == null) { - errors = new ArrayList<>(); - errors.add("NexmarkGoogleRunner returned null errors list"); - } - for (String error : errors) { - lines.add(String.format(" %4s *** %s ***", "", error)); - } - } - } - - lines.add(LINE); - lines.add(""); - - for (String line : lines) { - System.out.println(line); - } - - if (summaryFilename != null) { - try { - Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8, - StandardOpenOption.CREATE, StandardOpenOption.APPEND); - } catch (IOException e) { - throw new RuntimeException("Unable to save summary file: ", e); - } - NexmarkUtils.console("appended summary to summary file %s.", summaryFilename); - } - } - - /** - * Write all perf data and any baselines to a javascript file which can be used by - * graphing page etc. - */ - private static void saveJavascript( - @Nullable String javascriptFilename, - Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual, - @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) { - if (javascriptFilename == null) { - return; - } - - List<String> lines = new ArrayList<>(); - lines.add(String.format( - "// Run started %s and ran for %s", start, new Duration(start, Instant.now()))); - lines.add("var all = ["); - - for (NexmarkConfiguration configuration : configurations) { - lines.add(" {"); - lines.add(String.format(" config: %s", configuration)); - NexmarkPerf actualPerf = actual.get(configuration); - if (actualPerf != null) { - lines.add(String.format(" ,perf: %s", actualPerf)); - } - NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration); - if (baselinePerf != null) { - lines.add(String.format(" ,baseline: %s", baselinePerf)); - } - lines.add(" },"); - } - - lines.add("];"); - - try { - Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8, - StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); - } catch (IOException e) { - throw new RuntimeException("Unable to save javascript file: ", e); - } - NexmarkUtils.console("saved javascript to file %s.", javascriptFilename); - } - - public static void main(String[] args) { - NexmarkOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(NexmarkOptions.class); - NexmarkLauncher<NexmarkOptions> nexmarkLauncher = new NexmarkLauncher<>(options); - new Main<>().runAll(options, nexmarkLauncher); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java deleted file mode 100644 index 2f0c56a..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import java.io.Serializable; - -import org.apache.beam.integration.nexmark.model.KnownSize; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Distribution; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; - -/** - * A monitor of elements with support for later retrieving their metrics. - * - * @param <T> Type of element we are monitoring. - */ -public class Monitor<T extends KnownSize> implements Serializable { - private class MonitorDoFn extends DoFn<T, T> { - final Counter elementCounter = - Metrics.counter(name , prefix + ".elements"); - final Counter bytesCounter = - Metrics.counter(name , prefix + ".bytes"); - final Distribution startTime = - Metrics.distribution(name , prefix + ".startTime"); - final Distribution endTime = - Metrics.distribution(name , prefix + ".endTime"); - final Distribution startTimestamp = - Metrics.distribution(name , prefix + ".startTimestamp"); - final Distribution endTimestamp = - Metrics.distribution(name , prefix + ".endTimestamp"); - - @ProcessElement - public void processElement(ProcessContext c) { - elementCounter.inc(); - bytesCounter.inc(c.element().sizeInBytes()); - long now = System.currentTimeMillis(); - startTime.update(now); - endTime.update(now); - startTimestamp.update(c.timestamp().getMillis()); - endTimestamp.update(c.timestamp().getMillis()); - c.output(c.element()); - } - } - - public final String name; - public final String prefix; - private final MonitorDoFn doFn; - private final PTransform<PCollection<? extends T>, PCollection<T>> transform; - - public Monitor(String name, String prefix) { - this.name = name; - this.prefix = prefix; - doFn = new MonitorDoFn(); - transform = ParDo.of(doFn); - } - - public PTransform<PCollection<? extends T>, PCollection<T>> getTransform() { - return transform; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java deleted file mode 100644 index 2faf3f5..0000000 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java +++ /dev/null @@ -1,721 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.integration.nexmark; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; -import java.io.Serializable; -import java.util.Objects; - -/** - * Configuration controlling how a query is run. May be supplied by command line or - * programmatically. We only capture properties which may influence the resulting - * pipeline performance, as captured by {@link NexmarkPerf}. - */ -public class NexmarkConfiguration implements Serializable { - public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration(); - - /** If {@literal true}, include additional debugging and monitoring stats. */ - @JsonProperty - public boolean debug = true; - - /** Which query to run, in [0,9]. */ - @JsonProperty - public int query = 0; - - /** Where events come from. */ - @JsonProperty - public NexmarkUtils.SourceType sourceType = NexmarkUtils.SourceType.DIRECT; - - /** Where results go to. */ - @JsonProperty - public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL; - - /** - * Control whether pub/sub publishing is done in a stand-alone pipeline or is integrated - * into the overall query pipeline. - */ - @JsonProperty - public NexmarkUtils.PubSubMode pubSubMode = NexmarkUtils.PubSubMode.COMBINED; - - /** - * Number of events to generate. If zero, generate as many as possible without overflowing - * internal counters etc. - */ - @JsonProperty - public long numEvents = 100000; - - /** - * Number of event generators to use. Each generates events in its own timeline. - */ - @JsonProperty - public int numEventGenerators = 100; - - /** - * Shape of event rate curve. - */ - @JsonProperty - public NexmarkUtils.RateShape rateShape = NexmarkUtils.RateShape.SINE; - - /** - * Initial overall event rate (in {@link #rateUnit}). - */ - @JsonProperty - public int firstEventRate = 10000; - - /** - * Next overall event rate (in {@link #rateUnit}). - */ - @JsonProperty - public int nextEventRate = 10000; - - /** - * Unit for rates. - */ - @JsonProperty - public NexmarkUtils.RateUnit rateUnit = NexmarkUtils.RateUnit.PER_SECOND; - - /** - * Overall period of rate shape, in seconds. - */ - @JsonProperty - public int ratePeriodSec = 600; - - /** - * Time in seconds to preload the subscription with data, at the initial input rate of the - * pipeline. - */ - @JsonProperty - public int preloadSeconds = 0; - - /** - * Timeout for stream pipelines to stop in seconds. - */ - @JsonProperty - public int streamTimeout = 240; - - /** - * If true, and in streaming mode, generate events only when they are due according to their - * timestamp. - */ - @JsonProperty - public boolean isRateLimited = false; - - /** - * If true, use wallclock time as event time. Otherwise, use a deterministic - * time in the past so that multiple runs will see exactly the same event streams - * and should thus have exactly the same results. - */ - @JsonProperty - public boolean useWallclockEventTime = false; - - /** Average idealized size of a 'new person' event, in bytes. */ - @JsonProperty - public int avgPersonByteSize = 200; - - /** Average idealized size of a 'new auction' event, in bytes. */ - @JsonProperty - public int avgAuctionByteSize = 500; - - /** Average idealized size of a 'bid' event, in bytes. */ - @JsonProperty - public int avgBidByteSize = 100; - - /** Ratio of bids to 'hot' auctions compared to all other auctions. */ - @JsonProperty - public int hotAuctionRatio = 2; - - /** Ratio of auctions for 'hot' sellers compared to all other people. */ - @JsonProperty - public int hotSellersRatio = 4; - - /** Ratio of bids for 'hot' bidders compared to all other people. */ - @JsonProperty - public int hotBiddersRatio = 4; - - /** Window size, in seconds, for queries 3, 5, 7 and 8. */ - @JsonProperty - public long windowSizeSec = 10; - - /** Sliding window period, in seconds, for query 5. */ - @JsonProperty - public long windowPeriodSec = 5; - - /** Number of seconds to hold back events according to their reported timestamp. */ - @JsonProperty - public long watermarkHoldbackSec = 0; - - /** Average number of auction which should be inflight at any time, per generator. */ - @JsonProperty - public int numInFlightAuctions = 100; - - /** Maximum number of people to consider as active for placing auctions or bids. */ - @JsonProperty - public int numActivePeople = 1000; - - /** Coder strategy to follow. */ - @JsonProperty - public NexmarkUtils.CoderStrategy coderStrategy = NexmarkUtils.CoderStrategy.HAND; - - /** - * Delay, in milliseconds, for each event. This will peg one core for this number - * of milliseconds to simulate CPU-bound computation. - */ - @JsonProperty - public long cpuDelayMs = 0; - - /** - * Extra data, in bytes, to save to persistent state for each event. This will force - * i/o all the way to durable storage to simulate an I/O-bound computation. - */ - @JsonProperty - public long diskBusyBytes = 0; - - /** - * Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction. - */ - @JsonProperty - public int auctionSkip = 123; - - /** - * Fanout for queries 4 (groups by category id), 5 and 7 (find a global maximum). - */ - @JsonProperty - public int fanout = 5; - - /** - * Maximum waiting time to clean personState in query3 - * (ie maximum waiting of the auctions related to person in state in seconds in event time). - */ - @JsonProperty - public int maxAuctionsWaitingTime = 600; - - /** - * Length of occasional delay to impose on events (in seconds). - */ - @JsonProperty - public long occasionalDelaySec = 3; - - /** - * Probability that an event will be delayed by delayS. - */ - @JsonProperty - public double probDelayedEvent = 0.1; - - /** - * Maximum size of each log file (in events). For Query10 only. - */ - @JsonProperty - public int maxLogEvents = 100_000; - - /** - * If true, use pub/sub publish time instead of event time. - */ - @JsonProperty - public boolean usePubsubPublishTime = false; - - /** - * Number of events in out-of-order groups. 1 implies no out-of-order events. 1000 implies - * every 1000 events per generator are emitted in pseudo-random order. - */ - @JsonProperty - public long outOfOrderGroupSize = 1; - - /** - * Replace any properties of this configuration which have been supplied by the command line. - */ - public void overrideFromOptions(NexmarkOptions options) { - if (options.getDebug() != null) { - debug = options.getDebug(); - } - if (options.getQuery() != null) { - query = options.getQuery(); - } - if (options.getSourceType() != null) { - sourceType = options.getSourceType(); - } - if (options.getSinkType() != null) { - sinkType = options.getSinkType(); - } - if (options.getPubSubMode() != null) { - pubSubMode = options.getPubSubMode(); - } - if (options.getNumEvents() != null) { - numEvents = options.getNumEvents(); - } - if (options.getNumEventGenerators() != null) { - numEventGenerators = options.getNumEventGenerators(); - } - if (options.getRateShape() != null) { - rateShape = options.getRateShape(); - } - if (options.getFirstEventRate() != null) { - firstEventRate = options.getFirstEventRate(); - } - if (options.getNextEventRate() != null) { - nextEventRate = options.getNextEventRate(); - } - if (options.getRateUnit() != null) { - rateUnit = options.getRateUnit(); - } - if (options.getRatePeriodSec() != null) { - ratePeriodSec = options.getRatePeriodSec(); - } - if (options.getPreloadSeconds() != null) { - preloadSeconds = options.getPreloadSeconds(); - } - if (options.getStreamTimeout() != null) { - streamTimeout = options.getStreamTimeout(); - } - if (options.getIsRateLimited() != null) { - isRateLimited = options.getIsRateLimited(); - } - if (options.getUseWallclockEventTime() != null) { - useWallclockEventTime = options.getUseWallclockEventTime(); - } - if (options.getAvgPersonByteSize() != null) { - avgPersonByteSize = options.getAvgPersonByteSize(); - } - if (options.getAvgAuctionByteSize() != null) { - avgAuctionByteSize = options.getAvgAuctionByteSize(); - } - if (options.getAvgBidByteSize() != null) { - avgBidByteSize = options.getAvgBidByteSize(); - } - if (options.getHotAuctionRatio() != null) { - hotAuctionRatio = options.getHotAuctionRatio(); - } - if (options.getHotSellersRatio() != null) { - hotSellersRatio = options.getHotSellersRatio(); - } - if (options.getHotBiddersRatio() != null) { - hotBiddersRatio = options.getHotBiddersRatio(); - } - if (options.getWindowSizeSec() != null) { - windowSizeSec = options.getWindowSizeSec(); - } - if (options.getWindowPeriodSec() != null) { - windowPeriodSec = options.getWindowPeriodSec(); - } - if (options.getWatermarkHoldbackSec() != null) { - watermarkHoldbackSec = options.getWatermarkHoldbackSec(); - } - if (options.getNumInFlightAuctions() != null) { - numInFlightAuctions = options.getNumInFlightAuctions(); - } - if (options.getNumActivePeople() != null) { - numActivePeople = options.getNumActivePeople(); - } - if (options.getCoderStrategy() != null) { - coderStrategy = options.getCoderStrategy(); - } - if (options.getCpuDelayMs() != null) { - cpuDelayMs = options.getCpuDelayMs(); - } - if (options.getDiskBusyBytes() != null) { - diskBusyBytes = options.getDiskBusyBytes(); - } - if (options.getAuctionSkip() != null) { - auctionSkip = options.getAuctionSkip(); - } - if (options.getFanout() != null) { - fanout = options.getFanout(); - } - if (options.getMaxAuctionsWaitingTime() != null) { - fanout = options.getMaxAuctionsWaitingTime(); - } - if (options.getOccasionalDelaySec() != null) { - occasionalDelaySec = options.getOccasionalDelaySec(); - } - if (options.getProbDelayedEvent() != null) { - probDelayedEvent = options.getProbDelayedEvent(); - } - if (options.getMaxLogEvents() != null) { - maxLogEvents = options.getMaxLogEvents(); - } - if (options.getUsePubsubPublishTime() != null) { - usePubsubPublishTime = options.getUsePubsubPublishTime(); - } - if (options.getOutOfOrderGroupSize() != null) { - outOfOrderGroupSize = options.getOutOfOrderGroupSize(); - } - } - - /** - * Return copy of configuration with given label. - */ - public NexmarkConfiguration copy() { - NexmarkConfiguration result; - result = new NexmarkConfiguration(); - result.debug = debug; - result.query = query; - result.sourceType = sourceType; - result.sinkType = sinkType; - result.pubSubMode = pubSubMode; - result.numEvents = numEvents; - result.numEventGenerators = numEventGenerators; - result.rateShape = rateShape; - result.firstEventRate = firstEventRate; - result.nextEventRate = nextEventRate; - result.rateUnit = rateUnit; - result.ratePeriodSec = ratePeriodSec; - result.preloadSeconds = preloadSeconds; - result.streamTimeout = streamTimeout; - result.isRateLimited = isRateLimited; - result.useWallclockEventTime = useWallclockEventTime; - result.avgPersonByteSize = avgPersonByteSize; - result.avgAuctionByteSize = avgAuctionByteSize; - result.avgBidByteSize = avgBidByteSize; - result.hotAuctionRatio = hotAuctionRatio; - result.hotSellersRatio = hotSellersRatio; - result.hotBiddersRatio = hotBiddersRatio; - result.windowSizeSec = windowSizeSec; - result.windowPeriodSec = windowPeriodSec; - result.watermarkHoldbackSec = watermarkHoldbackSec; - result.numInFlightAuctions = numInFlightAuctions; - result.numActivePeople = numActivePeople; - result.coderStrategy = coderStrategy; - result.cpuDelayMs = cpuDelayMs; - result.diskBusyBytes = diskBusyBytes; - result.auctionSkip = auctionSkip; - result.fanout = fanout; - result.maxAuctionsWaitingTime = maxAuctionsWaitingTime; - result.occasionalDelaySec = occasionalDelaySec; - result.probDelayedEvent = probDelayedEvent; - result.maxLogEvents = maxLogEvents; - result.usePubsubPublishTime = usePubsubPublishTime; - result.outOfOrderGroupSize = outOfOrderGroupSize; - return result; - } - - /** - * Return short description of configuration (suitable for use in logging). We only render - * the core fields plus those which do not have default values. - */ - public String toShortString() { - StringBuilder sb = new StringBuilder(); - sb.append(String.format("query:%d", query)); - if (debug != DEFAULT.debug) { - sb.append(String.format("; debug:%s", debug)); - } - if (sourceType != DEFAULT.sourceType) { - sb.append(String.format("; sourceType:%s", sourceType)); - } - if (sinkType != DEFAULT.sinkType) { - sb.append(String.format("; sinkType:%s", sinkType)); - } - if (pubSubMode != DEFAULT.pubSubMode) { - sb.append(String.format("; pubSubMode:%s", pubSubMode)); - } - if (numEvents != DEFAULT.numEvents) { - sb.append(String.format("; numEvents:%d", numEvents)); - } - if (numEventGenerators != DEFAULT.numEventGenerators) { - sb.append(String.format("; numEventGenerators:%d", numEventGenerators)); - } - if (rateShape != DEFAULT.rateShape) { - sb.append(String.format("; rateShape:%s", rateShape)); - } - if (firstEventRate != DEFAULT.firstEventRate || nextEventRate != DEFAULT.nextEventRate) { - sb.append(String.format("; firstEventRate:%d", firstEventRate)); - sb.append(String.format("; nextEventRate:%d", nextEventRate)); - } - if (rateUnit != DEFAULT.rateUnit) { - sb.append(String.format("; rateUnit:%s", rateUnit)); - } - if (ratePeriodSec != DEFAULT.ratePeriodSec) { - sb.append(String.format("; ratePeriodSec:%d", ratePeriodSec)); - } - if (preloadSeconds != DEFAULT.preloadSeconds) { - sb.append(String.format("; preloadSeconds:%d", preloadSeconds)); - } - if (streamTimeout != DEFAULT.streamTimeout) { - sb.append(String.format("; streamTimeout:%d", streamTimeout)); - } - if (isRateLimited != DEFAULT.isRateLimited) { - sb.append(String.format("; isRateLimited:%s", isRateLimited)); - } - if (useWallclockEventTime != DEFAULT.useWallclockEventTime) { - sb.append(String.format("; useWallclockEventTime:%s", useWallclockEventTime)); - } - if (avgPersonByteSize != DEFAULT.avgPersonByteSize) { - sb.append(String.format("; avgPersonByteSize:%d", avgPersonByteSize)); - } - if (avgAuctionByteSize != DEFAULT.avgAuctionByteSize) { - sb.append(String.format("; avgAuctionByteSize:%d", avgAuctionByteSize)); - } - if (avgBidByteSize != DEFAULT.avgBidByteSize) { - sb.append(String.format("; avgBidByteSize:%d", avgBidByteSize)); - } - if (hotAuctionRatio != DEFAULT.hotAuctionRatio) { - sb.append(String.format("; hotAuctionRatio:%d", hotAuctionRatio)); - } - if (hotSellersRatio != DEFAULT.hotSellersRatio) { - sb.append(String.format("; hotSellersRatio:%d", hotSellersRatio)); - } - if (hotBiddersRatio != DEFAULT.hotBiddersRatio) { - sb.append(String.format("; hotBiddersRatio:%d", hotBiddersRatio)); - } - if (windowSizeSec != DEFAULT.windowSizeSec) { - sb.append(String.format("; windowSizeSec:%d", windowSizeSec)); - } - if (windowPeriodSec != DEFAULT.windowPeriodSec) { - sb.append(String.format("; windowPeriodSec:%d", windowPeriodSec)); - } - if (watermarkHoldbackSec != DEFAULT.watermarkHoldbackSec) { - sb.append(String.format("; watermarkHoldbackSec:%d", watermarkHoldbackSec)); - } - if (numInFlightAuctions != DEFAULT.numInFlightAuctions) { - sb.append(String.format("; numInFlightAuctions:%d", numInFlightAuctions)); - } - if (numActivePeople != DEFAULT.numActivePeople) { - sb.append(String.format("; numActivePeople:%d", numActivePeople)); - } - if (coderStrategy != DEFAULT.coderStrategy) { - sb.append(String.format("; coderStrategy:%s", coderStrategy)); - } - if (cpuDelayMs != DEFAULT.cpuDelayMs) { - sb.append(String.format("; cpuSlowdownMs:%d", cpuDelayMs)); - } - if (diskBusyBytes != DEFAULT.diskBusyBytes) { - sb.append(String.format("; diskBuysBytes:%d", diskBusyBytes)); - } - if (auctionSkip != DEFAULT.auctionSkip) { - sb.append(String.format("; auctionSkip:%d", auctionSkip)); - } - if (fanout != DEFAULT.fanout) { - sb.append(String.format("; fanout:%d", fanout)); - } - if (maxAuctionsWaitingTime != DEFAULT.maxAuctionsWaitingTime) { - sb.append(String.format("; maxAuctionsWaitingTime:%d", fanout)); - } - if (occasionalDelaySec != DEFAULT.occasionalDelaySec) { - sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec)); - } - if (probDelayedEvent != DEFAULT.probDelayedEvent) { - sb.append(String.format("; probDelayedEvent:%f", probDelayedEvent)); - } - if (maxLogEvents != DEFAULT.maxLogEvents) { - sb.append(String.format("; maxLogEvents:%d", maxLogEvents)); - } - if (usePubsubPublishTime != DEFAULT.usePubsubPublishTime) { - sb.append(String.format("; usePubsubPublishTime:%s", usePubsubPublishTime)); - } - if (outOfOrderGroupSize != DEFAULT.outOfOrderGroupSize) { - sb.append(String.format("; outOfOrderGroupSize:%d", outOfOrderGroupSize)); - } - return sb.toString(); - } - - /** - * Return full description as a string. - */ - @Override - public String toString() { - try { - return NexmarkUtils.MAPPER.writeValueAsString(this); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - /** - * Parse an object from {@code string}. - */ - public static NexmarkConfiguration fromString(String string) { - try { - return NexmarkUtils.MAPPER.readValue(string, NexmarkConfiguration.class); - } catch (IOException e) { - throw new RuntimeException("Unable to parse nexmark configuration: ", e); - } - } - - @Override - public int hashCode() { - return Objects.hash( - debug, - query, - sourceType, - sinkType, - pubSubMode, - numEvents, - numEventGenerators, - rateShape, - firstEventRate, - nextEventRate, - rateUnit, - ratePeriodSec, - preloadSeconds, - streamTimeout, - isRateLimited, - useWallclockEventTime, - avgPersonByteSize, - avgAuctionByteSize, - avgBidByteSize, - hotAuctionRatio, - hotSellersRatio, - hotBiddersRatio, - windowSizeSec, - windowPeriodSec, - watermarkHoldbackSec, - numInFlightAuctions, - numActivePeople, - coderStrategy, - cpuDelayMs, - diskBusyBytes, - auctionSkip, - fanout, - maxAuctionsWaitingTime, - occasionalDelaySec, - probDelayedEvent, - maxLogEvents, - usePubsubPublishTime, - outOfOrderGroupSize); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - NexmarkConfiguration other = (NexmarkConfiguration) obj; - if (debug != other.debug) { - return false; - } - if (auctionSkip != other.auctionSkip) { - return false; - } - if (avgAuctionByteSize != other.avgAuctionByteSize) { - return false; - } - if (avgBidByteSize != other.avgBidByteSize) { - return false; - } - if (avgPersonByteSize != other.avgPersonByteSize) { - return false; - } - if (coderStrategy != other.coderStrategy) { - return false; - } - if (cpuDelayMs != other.cpuDelayMs) { - return false; - } - if (diskBusyBytes != other.diskBusyBytes) { - return false; - } - if (fanout != other.fanout) { - return false; - } - if (maxAuctionsWaitingTime != other.maxAuctionsWaitingTime) { - return false; - } - if (firstEventRate != other.firstEventRate) { - return false; - } - if (hotAuctionRatio != other.hotAuctionRatio) { - return false; - } - if (hotBiddersRatio != other.hotBiddersRatio) { - return false; - } - if (hotSellersRatio != other.hotSellersRatio) { - return false; - } - if (isRateLimited != other.isRateLimited) { - return false; - } - if (maxLogEvents != other.maxLogEvents) { - return false; - } - if (nextEventRate != other.nextEventRate) { - return false; - } - if (rateUnit != other.rateUnit) { - return false; - } - if (numEventGenerators != other.numEventGenerators) { - return false; - } - if (numEvents != other.numEvents) { - return false; - } - if (numInFlightAuctions != other.numInFlightAuctions) { - return false; - } - if (numActivePeople != other.numActivePeople) { - return false; - } - if (occasionalDelaySec != other.occasionalDelaySec) { - return false; - } - if (preloadSeconds != other.preloadSeconds) { - return false; - } - if (streamTimeout != other.streamTimeout) { - return false; - } - if (Double.doubleToLongBits(probDelayedEvent) - != Double.doubleToLongBits(other.probDelayedEvent)) { - return false; - } - if (pubSubMode != other.pubSubMode) { - return false; - } - if (ratePeriodSec != other.ratePeriodSec) { - return false; - } - if (rateShape != other.rateShape) { - return false; - } - if (query != other.query) { - return false; - } - if (sinkType != other.sinkType) { - return false; - } - if (sourceType != other.sourceType) { - return false; - } - if (useWallclockEventTime != other.useWallclockEventTime) { - return false; - } - if (watermarkHoldbackSec != other.watermarkHoldbackSec) { - return false; - } - if (windowPeriodSec != other.windowPeriodSec) { - return false; - } - if (windowSizeSec != other.windowSizeSec) { - return false; - } - if (usePubsubPublishTime != other.usePubsubPublishTime) { - return false; - } - if (outOfOrderGroupSize != other.outOfOrderGroupSize) { - return false; - } - return true; - } -}