[ 
https://issues.apache.org/jira/browse/BEAM-3213?focusedWorklogId=86483&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86483
 ]

ASF GitHub Bot logged work on BEAM-3213:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Apr/18 05:57
            Start Date: 02/Apr/18 05:57
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #4859: [BEAM-3213] 
MongodbIO performance test
URL: https://github.com/apache/beam/pull/4859
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy 
b/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy
new file mode 100644
index 00000000000..1cd469c3320
--- /dev/null
+++ b/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import common_job_properties
+
+job('beam_PerformanceTests_MongoDBIO_IT') {
+    // Set default Beam job properties.
+    common_job_properties.setTopLevelMainJobProperties(delegate)
+
+    // Run job in postcommit every 6 hours, don't trigger every push, and
+    // don't email individual committers.
+    common_job_properties.setPostCommit(
+            delegate,
+            '0 */6 * * *',
+            false,
+            'commits@beam.apache.org',
+            false)
+
+    common_job_properties.enablePhraseTriggeringFromPullRequest(
+            delegate,
+            'Java MongoDBIO Performance Test',
+            'Run Java MongoDBIO Performance Test')
+
+    def pipelineOptions = [
+            tempRoot       : 'gs://temp-storage-for-perf-tests',
+            project        : 'apache-beam-testing',
+            numberOfRecords: '10000000'
+    ]
+
+    String namespace = 
common_job_properties.getKubernetesNamespace('mongodbioit')
+    String kubeconfig = 
common_job_properties.getKubeconfigLocationForNamespace(namespace)
+
+    def testArgs = [
+            kubeconfig              : kubeconfig,
+            beam_it_timeout         : '1800',
+            benchmarks              : 'beam_integration_benchmark',
+            beam_it_profile         : 'io-it',
+            beam_prebuilt           : 'true',
+            beam_sdk                : 'java',
+            beam_it_module          : 'sdks/java/io/mongodb',
+            beam_it_class           : 
'org.apache.beam.sdk.io.mongodb.MongoDBIOIT',
+            beam_it_options         : 
common_job_properties.joinPipelineOptions(pipelineOptions),
+            beam_kubernetes_scripts : 
common_job_properties.makePathAbsolute('src/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml'),
+            beam_options_config_file: 
common_job_properties.makePathAbsolute('src/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml'),
+            bigquery_table          : 
'beam_performance.mongodbioit_pkb_results'
+    ]
+
+    common_job_properties.setupKubernetes(delegate, namespace, kubeconfig)
+    common_job_properties.buildPerformanceTest(delegate, testArgs)
+    common_job_properties.cleanupKubernetes(delegate, namespace, kubeconfig)
+}
diff --git a/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml 
b/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml
new file mode 100644
index 00000000000..70e19651161
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml
@@ -0,0 +1,49 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+apiVersion: v1
+kind: Service
+metadata:
+  name: mongo-load-balancer-service
+  labels:
+    name: mongo
+spec:
+  ports:
+    - port: 27017
+  selector:
+    name: mongo
+  type: LoadBalancer
+
+---
+
+apiVersion: v1
+kind: ReplicationController
+metadata:
+  name: mongo
+spec:
+  replicas: 1
+  selector:
+    name: mongo
+  template:
+    metadata:
+      name: mongo
+      labels:
+        name: mongo
+    spec:
+      containers:
+        - name: mongo
+          image: mongo
+          ports:
+            - containerPort: 27017
diff --git a/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml 
b/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml
new file mode 100644
index 00000000000..299de0d5199
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is a pkb benchmark configuration file, used when running the IO ITs
+# that use this data store. It allows users to run tests when they are on a
+# separate network from the kubernetes cluster by reading the mongo IP
+# address from the LoadBalancer service.
+#
+# This file defines pipeline options to pass to beam, as well as how to derive
+# the values for those pipeline options from kubernetes (where appropriate.)
+
+static_pipeline_options:
+  - mongoDBDatabaseName: beam
+  - mongoDBPort: 27017
+dynamic_pipeline_options:
+  - name: mongoDBHostName
+    type: LoadBalancerIp
+    serviceName: mongo-load-balancer-service
diff --git a/.test-infra/kubernetes/mongodb/node-port/mongo.yml 
b/.test-infra/kubernetes/mongodb/node-port/mongo.yml
new file mode 100644
index 00000000000..d6406873d12
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/node-port/mongo.yml
@@ -0,0 +1,50 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+apiVersion: v1
+kind: Service
+metadata:
+  name: mongo-node-port-service
+  labels:
+    name: mongo
+spec:
+  ports:
+    - port: 27017
+      nodePort: 31235
+  selector:
+    name: mongo
+  type: NodePort
+
+---
+
+apiVersion: v1
+kind: ReplicationController
+metadata:
+  name: mongo
+spec:
+  replicas: 1
+  selector:
+    name: mongo
+  template:
+    metadata:
+      name: mongo
+      labels:
+        name: mongo
+    spec:
+      containers:
+        - name: mongo
+          image: mongo
+          ports:
+            - containerPort: 27017
diff --git a/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml 
b/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml
new file mode 100644
index 00000000000..1436b752ed9
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is a pkb benchmark configuration file, used when running the IO ITs
+# that use this data store.
+#
+# This file defines pipeline options to pass to beam, as well as how to derive
+# the values for those pipeline options from kubernetes (where appropriate.)
+
+static_pipeline_options:
+  - mongoDBDatabaseName: beam
+  - mongoDBPort: 27017
+dynamic_pipeline_options:
+  - name: mongoDBHostName
+    type: NodePortIp
+    podLabel: name=mongo
diff --git 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
new file mode 100644
index 00000000000..819bc9a171c
--- /dev/null
+++ 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import java.util.Map;
+
+/**
+ * Methods common to all types of IOITs.
+ */
+public class IOITHelper {
+
+  private IOITHelper() {
+  }
+
+  public static String getHashForRecordCount(int recordCount, Map<Integer, 
String> hashes) {
+    String hash = hashes.get(recordCount);
+    if (hash == null) {
+      throw new UnsupportedOperationException(
+        String.format("No hash for that record count: %s", recordCount)
+      );
+    }
+    return hash;
+  }
+}
diff --git 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 89b7ae81bc5..1260167eca1 100644
--- 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -114,4 +114,23 @@
   String getCharset();
 
   void setCharset(String charset);
+
+  /* MongoDB */
+  @Description("MongoDB host (host name/ip address)")
+  @Default.String("mongodb-host")
+  String getMongoDBHostName();
+
+  void setMongoDBHostName(String host);
+
+  @Description("Port for MongoDB")
+  @Default.Integer(27017)
+  Integer getMongoDBPort();
+
+  void setMongoDBPort(Integer port);
+
+  @Description("Mongo database name")
+  @Default.String("beam")
+  String getMongoDBDatabaseName();
+
+  void setMongoDBDatabaseName(String name);
 }
diff --git 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
index e6bc7e8c4f7..44654567783 100644
--- 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
+++ 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.common;
 
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
@@ -108,10 +110,6 @@ public void processElement(ProcessContext c) {
    */
   public static String getExpectedHashForRowCount(int rowCount)
       throws UnsupportedOperationException {
-    String hash = EXPECTED_HASHES.get(rowCount);
-    if (hash == null) {
-      throw new UnsupportedOperationException("No hash for that row count");
-    }
-    return hash;
+    return getHashForRecordCount(rowCount, EXPECTED_HASHES);
   }
 }
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index bbf707e26dd..5b7ff38ee34 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.common;
 
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
@@ -33,6 +35,7 @@
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 
+
 /**
  * Contains helper methods for file based IO Integration tests.
  */
@@ -64,16 +67,6 @@ public static String getExpectedHashForLineCount(int 
lineCount) {
     return getHashForRecordCount(lineCount, expectedHashes);
   }
 
-  public static String getHashForRecordCount(int recordCount, Map<Integer, 
String> hashes) {
-    String hash = hashes.get(recordCount);
-    if (hash == null) {
-      throw new UnsupportedOperationException(
-        String.format("No hash for that record count: %s", recordCount)
-      );
-    }
-    return hash;
-  }
-
   /**
    * Constructs text lines in files used for testing.
    */
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
index 7176d7f42da..3ea27aa88c9 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.io.xml;
 
 import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
-import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getHashForRecordCount;
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
 
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
diff --git a/sdks/java/io/mongodb/build.gradle 
b/sdks/java/io/mongodb/build.gradle
index 22cb5994c62..65c49dde684 100644
--- a/sdks/java/io/mongodb/build.gradle
+++ b/sdks/java/io/mongodb/build.gradle
@@ -21,6 +21,15 @@ applyJavaNature()
 
 description = "Apache Beam :: SDKs :: Java :: IO :: MongoDB"
 
+/*
+ * We need to rely on manually specifying these evaluationDependsOn to ensure 
that
+ * the following projects are evaluated before we evaluate this project. This 
is because
+ * we are attempting to reference the "sourceSets.test.output" directly.
+ * TODO: Swap to generating test artifacts which we can then rely on instead of
+ * the test outputs directly.
+ */
+evaluationDependsOn(":sdks:java:io:common")
+
 dependencies {
   compile library.java.guava
   shadow project(path: ":sdks:java:core", configuration: "shadow")
@@ -32,6 +41,8 @@ dependencies {
   testCompile library.java.junit
   testCompile library.java.slf4j_jdk14
   testCompile library.java.hamcrest_core
+  testCompile project(path: ":sdks:java:io:common", configuration: "shadow")
+  testCompile project(":sdks:java:io:common").sourceSets.test.output
   testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:1.50.1"
   testCompile "de.flapdoodle.embed:de.flapdoodle.embed.process:1.50.1"
 }
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index ad811b56633..2f237cb18a3 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -34,6 +34,199 @@
     <mongo-java-driver.version>3.2.2</mongo-java-driver.version>
   </properties>
 
+
+  <profiles>
+    <!--
+      This profile invokes PerfKitBenchmarker, which does benchmarking of
+      the IO ITs. The arguments passed to it allow it to invoke mvn again
+      with the desired benchmark.
+
+      To invoke this, run:
+
+      mvn verify -Dio-it-suite -pl sdks/java/io/mongodb
+          -DpkbLocation="path-to-pkb.py" \
+          -DintegrationTestPipelineOptions='["-numberOfRecords=1000"]'
+  -->
+    <profile>
+      <id>io-it-suite</id>
+      <activation>
+        <property><name>io-it-suite</name></property>
+      </activation>
+      <properties>
+        <!-- This is based on the location of the current pom relative to the 
root
+             See discussion in BEAM-2460 -->
+        
<beamRootProjectDir>${project.parent.parent.parent.parent.basedir}</beamRootProjectDir>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.gmaven</groupId>
+            <artifactId>groovy-maven-plugin</artifactId>
+            <version>${groovy-maven-plugin.version}</version>
+            <executions>
+              <execution>
+                <id>find-supported-python-for-compile</id>
+                <phase>initialize</phase>
+                <goals>
+                  <goal>execute</goal>
+                </goals>
+                <configuration>
+                  
<source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <version>${maven-exec-plugin.version}</version>
+            <executions>
+              <execution>
+                <phase>verify</phase>
+                <goals>
+                  <goal>exec</goal>
+                </goals>
+              </execution>
+            </executions>
+            <configuration>
+              <executable>${python.interpreter.bin}</executable>
+              <arguments>
+                <argument>${pkbLocation}</argument>
+                <argument>-beam_it_timeout=1800</argument>
+                <argument>-benchmarks=beam_integration_benchmark</argument>
+                <argument>-beam_it_profile=io-it</argument>
+                <argument>-beam_location=${beamRootProjectDir}</argument>
+                <argument>-beam_prebuilt=true</argument>
+                <argument>-beam_sdk=java</argument>
+                <argument>-kubeconfig=${kubeconfig}</argument>
+                <argument>-kubectl=${kubectl}</argument>
+                <!-- runner overrides, controlled via forceDirectRunner -->
+                <argument>${pkbBeamRunnerProfile}</argument>
+                <argument>${pkbBeamRunnerOption}</argument>
+                <!-- specific to this IO -->
+                
<argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml</argument>
+                
<argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/node-port/mongo.yml</argument>
+                <argument>-beam_it_module=sdks/java/io/mongodb</argument>
+                
<argument>-beam_it_class=org.apache.beam.sdk.io.mongodb.MongoDBIOIT</argument>
+                <!-- arguments typically defined by user -->
+                
<argument>-beam_it_options=${integrationTestPipelineOptions}</argument>
+              </arguments>
+            </configuration>
+          </plugin>
+
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>true</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <!--
+      io-it-suite-local overrides part of io-it-suite, allowing users to run 
tests
+      when they are on a separate network from the kubernetes cluster by
+      creating a LoadBalancer service.
+    -->
+    <profile>
+      <id>io-it-suite-local</id>
+      
<activation><property><name>io-it-suite-local</name></property></activation>
+      <properties>
+        <!-- This is based on the location of the current pom relative to the 
root
+             See discussion in BEAM-2460 -->
+        
<beamRootProjectDir>${project.parent.parent.parent.parent.basedir}</beamRootProjectDir>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.gmaven</groupId>
+            <artifactId>groovy-maven-plugin</artifactId>
+            <version>${groovy-maven-plugin.version}</version>
+            <executions>
+              <execution>
+                <id>find-supported-python-for-compile</id>
+                <phase>initialize</phase>
+                <goals>
+                  <goal>execute</goal>
+                </goals>
+                <configuration>
+                  
<source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <version>${maven-exec-plugin.version}</version>
+            <executions>
+              <execution>
+                <phase>verify</phase>
+                <goals>
+                  <goal>exec</goal>
+                </goals>
+              </execution>
+            </executions>
+            <configuration>
+              <executable>${python.interpreter.bin}</executable>
+              <arguments>
+                <argument>${pkbLocation}</argument>
+                <argument>-beam_it_timeout=1800</argument>
+                <argument>-benchmarks=beam_integration_benchmark</argument>
+                <argument>-beam_it_profile=io-it</argument>
+                <argument>-beam_location=${beamRootProjectDir}</argument>
+                <argument>-beam_prebuilt=true</argument>
+                <argument>-beam_sdk=java</argument>
+                <argument>-kubeconfig=${kubeconfig}</argument>
+                <argument>-kubectl=${kubectl}</argument>
+                <!-- runner overrides, controlled via forceDirectRunner -->
+                <argument>${pkbBeamRunnerProfile}</argument>
+                <argument>${pkbBeamRunnerOption}</argument>
+                <!-- specific to this IO -->
+                
<argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml</argument>
+                
<argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml</argument>
+                <argument>-beam_it_module=sdks/java/io/mongodb</argument>
+                
<argument>-beam_it_class=org.apache.beam.sdk.io.mongodb.MongoDBIOIT</argument>
+                <!-- arguments typically defined by user -->
+                
<argument>-beam_it_options=${integrationTestPipelineOptions}</argument>
+              </arguments>
+            </configuration>
+          </plugin>
+
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>true</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <!-- Include the Google Cloud Dataflow runner activated by 
-DintegrationTestRunner=dataflow -->
+    <profile>
+      <id>dataflow-runner</id>
+      <activation>
+        <property>
+          <name>integrationTestRunner</name>
+          <value>dataflow</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>
@@ -108,7 +301,18 @@
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-library</artifactId>
       <scope>test</scope>
-    </dependency>   
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-common</artifactId>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-common</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
new file mode 100644
index 00000000000..4f425a38a4d
--- /dev/null
+++ 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+
+import com.google.common.collect.ImmutableMap;
+import com.mongodb.MongoClient;
+import java.util.Date;
+import java.util.Map;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.bson.Document;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.mongodb.MongoDbIO} on an 
independent Mongo instance.
+ *
+ * <p>This test requires a running instance of MongoDB. Pass in connection 
information using
+ * PipelineOptions:
+ * <pre>
+ *  mvn -e -Pio-it verify -pl sdks/java/io/mongodb 
-DintegrationTestPipelineOptions='[
+ *  "--mongoDBHostName=1.2.3.4",
+ *  "--mongoDBPort=27017",
+ *  "--mongoDBDatabaseName=mypass",
+ *  "--numberOfRecords=1000" ]'
+ * </pre>
+ *
+ */
+@RunWith(JUnit4.class)
+public class MongoDBIOIT {
+
+  private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(
+    1000, "75a0d5803418444e76ae5b421662764c",
+    100_000, "3bc762dc1c291904e3c7f577774c6276",
+    10_000_000, "e5e0503902018c83e8c8977ef437feba"
+  );
+
+  private static int numberOfRecords;
+
+  private static String host;
+
+  private static Integer port;
+
+  private static String database;
+
+  private static String collection;
+
+  @Rule
+  public final TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule
+  public final TestPipeline readPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setUp() {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
+      .as(IOTestPipelineOptions.class);
+
+    numberOfRecords = options.getNumberOfRecords();
+    host = options.getMongoDBHostName();
+    port = options.getMongoDBPort();
+    database = options.getMongoDBDatabaseName();
+    collection = String.format("test_%s", new Date().getTime());
+  }
+
+  @After
+  public void tearDown() {
+    new MongoClient(host).getDatabase(database).drop();
+  }
+
+  @Test
+  public void testWriteAndRead() {
+    String mongoUrl = String.format("mongodb://%s:%s", host, port);
+
+    writePipeline
+      .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRecords))
+      .apply("Produce documents", MapElements.via(new LongToDocumentFn()))
+      .apply("Write documents to MongoDB", MongoDbIO.write()
+        .withUri(mongoUrl)
+        .withDatabase(database)
+        .withCollection(collection));
+
+    writePipeline.run().waitUntilFinish();
+
+    PCollection<String> consolidatedHashcode  = readPipeline
+      .apply("Read all documents", MongoDbIO.read()
+        .withUri(mongoUrl)
+        .withDatabase(database)
+        .withCollection(collection))
+      .apply("Map documents to Strings", MapElements.via(new 
DocumentToStringFn()))
+      .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+    String expectedHash = getHashForRecordCount(numberOfRecords, 
EXPECTED_HASHES);
+    PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+    readPipeline.run().waitUntilFinish();
+  }
+
+  private static class LongToDocumentFn extends SimpleFunction<Long, Document> 
{
+    @Override
+    public Document apply(Long input) {
+      return Document.parse(String.format("{\"scientist\":\"Test %s\"}", 
input));
+    }
+  }
+
+  private static class DocumentToStringFn extends SimpleFunction<Document, 
String> {
+    @Override
+    public String apply(Document input) {
+      return input.getString("scientist");
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 86483)
    Time Spent: 1.5h  (was: 1h 20m)

> Add a performance test for MongoDBIO
> ------------------------------------
>
>                 Key: BEAM-3213
>                 URL: https://issues.apache.org/jira/browse/BEAM-3213
>             Project: Beam
>          Issue Type: Test
>          Components: io-java-mongodb
>            Reporter: Chamikara Jayalath
>            Assignee: Ɓukasz Gajowy
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We should add a large scale performance test for MongoDBIO. We could use 
> PerfKitBenchmarker based performance testing framework [1] to manage a 
> Kubernetes based multi-node MongoDB cluster and to publish benchmark results.
> Example docker image to use: https://hub.docker.com/_/mongo/
> [1] https://beam.apache.org/documentation/io/testing/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to