This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ecf9ee6 [BEAM-3213] MongodbIO performance test (#4859) ecf9ee6 is described below commit ecf9ee6ac336398530954456fcbd6b9baa0239a1 Author: Ćukasz Gajowy <lukasz.gaj...@gmail.com> AuthorDate: Mon Apr 2 07:57:00 2018 +0200 [BEAM-3213] MongodbIO performance test (#4859) * [BEAM-3213] Add MongoDBIOIT * [BEAM-3213] Add perfkit support for mongodb. Enable dataflow runner * [BEAM-3213] Add hashes for larger datasets * [BEAM-3213] Add jenkins job for MongoDBIOIT * [BEAM-3213] Refactor: remove test code duplication --- .../job_beam_PerformanceTests_MongoDBIO_IT.groovy | 66 +++++++ .../kubernetes/mongodb/load-balancer/mongo.yml | 49 +++++ .../mongodb/load-balancer/pkb-config.yml | 32 ++++ .test-infra/kubernetes/mongodb/node-port/mongo.yml | 50 +++++ .../kubernetes/mongodb/node-port/pkb-config.yml | 30 +++ .../org/apache/beam/sdk/io/common/IOITHelper.java | 39 ++++ .../beam/sdk/io/common/IOTestPipelineOptions.java | 19 ++ .../org/apache/beam/sdk/io/common/TestRow.java | 8 +- .../beam/sdk/io/common/FileBasedIOITHelper.java | 13 +- .../java/org/apache/beam/sdk/io/xml/XmlIOIT.java | 2 +- sdks/java/io/mongodb/build.gradle | 11 ++ sdks/java/io/mongodb/pom.xml | 206 ++++++++++++++++++++- .../apache/beam/sdk/io/mongodb/MongoDBIOIT.java | 143 ++++++++++++++ 13 files changed, 651 insertions(+), 17 deletions(-) diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy new file mode 100644 index 0000000..1cd469c --- /dev/null +++ b/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import common_job_properties + +job('beam_PerformanceTests_MongoDBIO_IT') { + // Set default Beam job properties. + common_job_properties.setTopLevelMainJobProperties(delegate) + + // Run job in postcommit every 6 hours, don't trigger every push, and + // don't email individual committers. + common_job_properties.setPostCommit( + delegate, + '0 */6 * * *', + false, + 'commits@beam.apache.org', + false) + + common_job_properties.enablePhraseTriggeringFromPullRequest( + delegate, + 'Java MongoDBIO Performance Test', + 'Run Java MongoDBIO Performance Test') + + def pipelineOptions = [ + tempRoot : 'gs://temp-storage-for-perf-tests', + project : 'apache-beam-testing', + numberOfRecords: '10000000' + ] + + String namespace = common_job_properties.getKubernetesNamespace('mongodbioit') + String kubeconfig = common_job_properties.getKubeconfigLocationForNamespace(namespace) + + def testArgs = [ + kubeconfig : kubeconfig, + beam_it_timeout : '1800', + benchmarks : 'beam_integration_benchmark', + beam_it_profile : 'io-it', + beam_prebuilt : 'true', + beam_sdk : 'java', + beam_it_module : 'sdks/java/io/mongodb', + beam_it_class : 'org.apache.beam.sdk.io.mongodb.MongoDBIOIT', + beam_it_options : common_job_properties.joinPipelineOptions(pipelineOptions), + beam_kubernetes_scripts : common_job_properties.makePathAbsolute('src/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml'), + beam_options_config_file: common_job_properties.makePathAbsolute('src/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml'), + bigquery_table : 'beam_performance.mongodbioit_pkb_results' + ] + + common_job_properties.setupKubernetes(delegate, namespace, kubeconfig) + common_job_properties.buildPerformanceTest(delegate, testArgs) + common_job_properties.cleanupKubernetes(delegate, namespace, kubeconfig) +} diff --git a/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml b/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml new file mode 100644 index 0000000..70e1965 --- /dev/null +++ b/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Service +metadata: + name: mongo-load-balancer-service + labels: + name: mongo +spec: + ports: + - port: 27017 + selector: + name: mongo + type: LoadBalancer + +--- + +apiVersion: v1 +kind: ReplicationController +metadata: + name: mongo +spec: + replicas: 1 + selector: + name: mongo + template: + metadata: + name: mongo + labels: + name: mongo + spec: + containers: + - name: mongo + image: mongo + ports: + - containerPort: 27017 diff --git a/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml b/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml new file mode 100644 index 0000000..299de0d --- /dev/null +++ b/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file is a pkb benchmark configuration file, used when running the IO ITs +# that use this data store. It allows users to run tests when they are on a +# separate network from the kubernetes cluster by reading the mongo IP +# address from the LoadBalancer service. +# +# This file defines pipeline options to pass to beam, as well as how to derive +# the values for those pipeline options from kubernetes (where appropriate.) + +static_pipeline_options: + - mongoDBDatabaseName: beam + - mongoDBPort: 27017 +dynamic_pipeline_options: + - name: mongoDBHostName + type: LoadBalancerIp + serviceName: mongo-load-balancer-service diff --git a/.test-infra/kubernetes/mongodb/node-port/mongo.yml b/.test-infra/kubernetes/mongodb/node-port/mongo.yml new file mode 100644 index 0000000..d640687 --- /dev/null +++ b/.test-infra/kubernetes/mongodb/node-port/mongo.yml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: Service +metadata: + name: mongo-node-port-service + labels: + name: mongo +spec: + ports: + - port: 27017 + nodePort: 31235 + selector: + name: mongo + type: NodePort + +--- + +apiVersion: v1 +kind: ReplicationController +metadata: + name: mongo +spec: + replicas: 1 + selector: + name: mongo + template: + metadata: + name: mongo + labels: + name: mongo + spec: + containers: + - name: mongo + image: mongo + ports: + - containerPort: 27017 diff --git a/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml b/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml new file mode 100644 index 0000000..1436b75 --- /dev/null +++ b/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file is a pkb benchmark configuration file, used when running the IO ITs +# that use this data store. +# +# This file defines pipeline options to pass to beam, as well as how to derive +# the values for those pipeline options from kubernetes (where appropriate.) + +static_pipeline_options: + - mongoDBDatabaseName: beam + - mongoDBPort: 27017 +dynamic_pipeline_options: + - name: mongoDBHostName + type: NodePortIp + podLabel: name=mongo diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java new file mode 100644 index 0000000..819bc9a --- /dev/null +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.common; + +import java.util.Map; + +/** + * Methods common to all types of IOITs. + */ +public class IOITHelper { + + private IOITHelper() { + } + + public static String getHashForRecordCount(int recordCount, Map<Integer, String> hashes) { + String hash = hashes.get(recordCount); + if (hash == null) { + throw new UnsupportedOperationException( + String.format("No hash for that record count: %s", recordCount) + ); + } + return hash; + } +} diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index 89b7ae8..1260167 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -114,4 +114,23 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { String getCharset(); void setCharset(String charset); + + /* MongoDB */ + @Description("MongoDB host (host name/ip address)") + @Default.String("mongodb-host") + String getMongoDBHostName(); + + void setMongoDBHostName(String host); + + @Description("Port for MongoDB") + @Default.Integer(27017) + Integer getMongoDBPort(); + + void setMongoDBPort(Integer port); + + @Description("Mongo database name") + @Default.String("beam") + String getMongoDBDatabaseName(); + + void setMongoDBDatabaseName(String name); } diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java index e6bc7e8..4465456 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.common; +import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount; + import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableMap; import java.io.Serializable; @@ -108,10 +110,6 @@ public abstract class TestRow implements Serializable, Comparable<TestRow> { */ public static String getExpectedHashForRowCount(int rowCount) throws UnsupportedOperationException { - String hash = EXPECTED_HASHES.get(rowCount); - if (hash == null) { - throw new UnsupportedOperationException("No hash for that row count"); - } - return hash; + return getHashForRecordCount(rowCount, EXPECTED_HASHES); } } diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java index bbf707e..5b7ff38 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.common; +import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount; + import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import java.io.IOException; @@ -33,6 +35,7 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; + /** * Contains helper methods for file based IO Integration tests. */ @@ -64,16 +67,6 @@ public class FileBasedIOITHelper { return getHashForRecordCount(lineCount, expectedHashes); } - public static String getHashForRecordCount(int recordCount, Map<Integer, String> hashes) { - String hash = hashes.get(recordCount); - if (hash == null) { - throw new UnsupportedOperationException( - String.format("No hash for that record count: %s", recordCount) - ); - } - return hash; - } - /** * Constructs text lines in files used for testing. */ diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java index 7176d7f..3ea27aa 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.io.xml; import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix; -import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.getHashForRecordCount; +import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount; import com.google.common.collect.ImmutableMap; import java.io.Serializable; diff --git a/sdks/java/io/mongodb/build.gradle b/sdks/java/io/mongodb/build.gradle index 9223780..6e82f35 100644 --- a/sdks/java/io/mongodb/build.gradle +++ b/sdks/java/io/mongodb/build.gradle @@ -21,6 +21,15 @@ applyJavaNature(artifactId: "beam-sdks-java-io-mongodb") description = "Apache Beam :: SDKs :: Java :: IO :: MongoDB" +/* + * We need to rely on manually specifying these evaluationDependsOn to ensure that + * the following projects are evaluated before we evaluate this project. This is because + * we are attempting to reference the "sourceSets.test.output" directly. + * TODO: Swap to generating test artifacts which we can then rely on instead of + * the test outputs directly. + */ +evaluationDependsOn(":sdks:java:io:common") + dependencies { compile library.java.guava shadow project(path: ":sdks:java:core", configuration: "shadow") @@ -32,6 +41,8 @@ dependencies { testCompile library.java.junit testCompile library.java.slf4j_jdk14 testCompile library.java.hamcrest_core + testCompile project(path: ":sdks:java:io:common", configuration: "shadow") + testCompile project(":sdks:java:io:common").sourceSets.test.output testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:1.50.1" testCompile "de.flapdoodle.embed:de.flapdoodle.embed.process:1.50.1" } diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml index ad811b5..2f237cb 100644 --- a/sdks/java/io/mongodb/pom.xml +++ b/sdks/java/io/mongodb/pom.xml @@ -34,6 +34,199 @@ <mongo-java-driver.version>3.2.2</mongo-java-driver.version> </properties> + + <profiles> + <!-- + This profile invokes PerfKitBenchmarker, which does benchmarking of + the IO ITs. The arguments passed to it allow it to invoke mvn again + with the desired benchmark. + + To invoke this, run: + + mvn verify -Dio-it-suite -pl sdks/java/io/mongodb + -DpkbLocation="path-to-pkb.py" \ + -DintegrationTestPipelineOptions='["-numberOfRecords=1000"]' + --> + <profile> + <id>io-it-suite</id> + <activation> + <property><name>io-it-suite</name></property> + </activation> + <properties> + <!-- This is based on the location of the current pom relative to the root + See discussion in BEAM-2460 --> + <beamRootProjectDir>${project.parent.parent.parent.parent.basedir}</beamRootProjectDir> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.gmaven</groupId> + <artifactId>groovy-maven-plugin</artifactId> + <version>${groovy-maven-plugin.version}</version> + <executions> + <execution> + <id>find-supported-python-for-compile</id> + <phase>initialize</phase> + <goals> + <goal>execute</goal> + </goals> + <configuration> + <source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>${maven-exec-plugin.version}</version> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>${python.interpreter.bin}</executable> + <arguments> + <argument>${pkbLocation}</argument> + <argument>-beam_it_timeout=1800</argument> + <argument>-benchmarks=beam_integration_benchmark</argument> + <argument>-beam_it_profile=io-it</argument> + <argument>-beam_location=${beamRootProjectDir}</argument> + <argument>-beam_prebuilt=true</argument> + <argument>-beam_sdk=java</argument> + <argument>-kubeconfig=${kubeconfig}</argument> + <argument>-kubectl=${kubectl}</argument> + <!-- runner overrides, controlled via forceDirectRunner --> + <argument>${pkbBeamRunnerProfile}</argument> + <argument>${pkbBeamRunnerOption}</argument> + <!-- specific to this IO --> + <argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml</argument> + <argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/node-port/mongo.yml</argument> + <argument>-beam_it_module=sdks/java/io/mongodb</argument> + <argument>-beam_it_class=org.apache.beam.sdk.io.mongodb.MongoDBIOIT</argument> + <!-- arguments typically defined by user --> + <argument>-beam_it_options=${integrationTestPipelineOptions}</argument> + </arguments> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + </plugins> + </build> + </profile> + + <!-- + io-it-suite-local overrides part of io-it-suite, allowing users to run tests + when they are on a separate network from the kubernetes cluster by + creating a LoadBalancer service. + --> + <profile> + <id>io-it-suite-local</id> + <activation><property><name>io-it-suite-local</name></property></activation> + <properties> + <!-- This is based on the location of the current pom relative to the root + See discussion in BEAM-2460 --> + <beamRootProjectDir>${project.parent.parent.parent.parent.basedir}</beamRootProjectDir> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.gmaven</groupId> + <artifactId>groovy-maven-plugin</artifactId> + <version>${groovy-maven-plugin.version}</version> + <executions> + <execution> + <id>find-supported-python-for-compile</id> + <phase>initialize</phase> + <goals> + <goal>execute</goal> + </goals> + <configuration> + <source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>${maven-exec-plugin.version}</version> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>${python.interpreter.bin}</executable> + <arguments> + <argument>${pkbLocation}</argument> + <argument>-beam_it_timeout=1800</argument> + <argument>-benchmarks=beam_integration_benchmark</argument> + <argument>-beam_it_profile=io-it</argument> + <argument>-beam_location=${beamRootProjectDir}</argument> + <argument>-beam_prebuilt=true</argument> + <argument>-beam_sdk=java</argument> + <argument>-kubeconfig=${kubeconfig}</argument> + <argument>-kubectl=${kubectl}</argument> + <!-- runner overrides, controlled via forceDirectRunner --> + <argument>${pkbBeamRunnerProfile}</argument> + <argument>${pkbBeamRunnerOption}</argument> + <!-- specific to this IO --> + <argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml</argument> + <argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml</argument> + <argument>-beam_it_module=sdks/java/io/mongodb</argument> + <argument>-beam_it_class=org.apache.beam.sdk.io.mongodb.MongoDBIOIT</argument> + <!-- arguments typically defined by user --> + <argument>-beam_it_options=${integrationTestPipelineOptions}</argument> + </arguments> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + </plugins> + </build> + </profile> + + <!-- Include the Google Cloud Dataflow runner activated by -DintegrationTestRunner=dataflow --> + <profile> + <id>dataflow-runner</id> + <activation> + <property> + <name>integrationTestRunner</name> + <value>dataflow</value> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> + <scope>runtime</scope> + </dependency> + </dependencies> + </profile> + </profiles> + <dependencies> <dependency> <groupId>org.apache.beam</groupId> @@ -108,7 +301,18 @@ <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <scope>test</scope> - </dependency> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-common</artifactId> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-common</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java new file mode 100644 index 0000000..4f425a3 --- /dev/null +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount; + +import com.google.common.collect.ImmutableMap; +import com.mongodb.MongoClient; +import java.util.Date; +import java.util.Map; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; +import org.bson.Document; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + + +/** + * A test of {@link org.apache.beam.sdk.io.mongodb.MongoDbIO} on an independent Mongo instance. + * + * <p>This test requires a running instance of MongoDB. Pass in connection information using + * PipelineOptions: + * <pre> + * mvn -e -Pio-it verify -pl sdks/java/io/mongodb -DintegrationTestPipelineOptions='[ + * "--mongoDBHostName=1.2.3.4", + * "--mongoDBPort=27017", + * "--mongoDBDatabaseName=mypass", + * "--numberOfRecords=1000" ]' + * </pre> + * + */ +@RunWith(JUnit4.class) +public class MongoDBIOIT { + + private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of( + 1000, "75a0d5803418444e76ae5b421662764c", + 100_000, "3bc762dc1c291904e3c7f577774c6276", + 10_000_000, "e5e0503902018c83e8c8977ef437feba" + ); + + private static int numberOfRecords; + + private static String host; + + private static Integer port; + + private static String database; + + private static String collection; + + @Rule + public final TestPipeline writePipeline = TestPipeline.create(); + + @Rule + public final TestPipeline readPipeline = TestPipeline.create(); + + @BeforeClass + public static void setUp() { + PipelineOptionsFactory.register(IOTestPipelineOptions.class); + IOTestPipelineOptions options = TestPipeline.testingPipelineOptions() + .as(IOTestPipelineOptions.class); + + numberOfRecords = options.getNumberOfRecords(); + host = options.getMongoDBHostName(); + port = options.getMongoDBPort(); + database = options.getMongoDBDatabaseName(); + collection = String.format("test_%s", new Date().getTime()); + } + + @After + public void tearDown() { + new MongoClient(host).getDatabase(database).drop(); + } + + @Test + public void testWriteAndRead() { + String mongoUrl = String.format("mongodb://%s:%s", host, port); + + writePipeline + .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRecords)) + .apply("Produce documents", MapElements.via(new LongToDocumentFn())) + .apply("Write documents to MongoDB", MongoDbIO.write() + .withUri(mongoUrl) + .withDatabase(database) + .withCollection(collection)); + + writePipeline.run().waitUntilFinish(); + + PCollection<String> consolidatedHashcode = readPipeline + .apply("Read all documents", MongoDbIO.read() + .withUri(mongoUrl) + .withDatabase(database) + .withCollection(collection)) + .apply("Map documents to Strings", MapElements.via(new DocumentToStringFn())) + .apply("Calculate hashcode", Combine.globally(new HashingFn())); + + String expectedHash = getHashForRecordCount(numberOfRecords, EXPECTED_HASHES); + PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash); + + readPipeline.run().waitUntilFinish(); + } + + private static class LongToDocumentFn extends SimpleFunction<Long, Document> { + @Override + public Document apply(Long input) { + return Document.parse(String.format("{\"scientist\":\"Test %s\"}", input)); + } + } + + private static class DocumentToStringFn extends SimpleFunction<Document, String> { + @Override + public String apply(Document input) { + return input.getString("scientist"); + } + } +} -- To stop receiving notification emails like this one, please contact chamik...@apache.org.