This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ecf9ee6  [BEAM-3213] MongodbIO performance test (#4859)
ecf9ee6 is described below

commit ecf9ee6ac336398530954456fcbd6b9baa0239a1
Author: Ɓukasz Gajowy <lukasz.gaj...@gmail.com>
AuthorDate: Mon Apr 2 07:57:00 2018 +0200

    [BEAM-3213] MongodbIO performance test (#4859)
    
    * [BEAM-3213] Add MongoDBIOIT
    
    * [BEAM-3213] Add perfkit support for mongodb. Enable dataflow runner
    
    * [BEAM-3213] Add hashes for larger datasets
    
    * [BEAM-3213] Add jenkins job for MongoDBIOIT
    
    * [BEAM-3213] Refactor: remove test code duplication
---
 .../job_beam_PerformanceTests_MongoDBIO_IT.groovy  |  66 +++++++
 .../kubernetes/mongodb/load-balancer/mongo.yml     |  49 +++++
 .../mongodb/load-balancer/pkb-config.yml           |  32 ++++
 .test-infra/kubernetes/mongodb/node-port/mongo.yml |  50 +++++
 .../kubernetes/mongodb/node-port/pkb-config.yml    |  30 +++
 .../org/apache/beam/sdk/io/common/IOITHelper.java  |  39 ++++
 .../beam/sdk/io/common/IOTestPipelineOptions.java  |  19 ++
 .../org/apache/beam/sdk/io/common/TestRow.java     |   8 +-
 .../beam/sdk/io/common/FileBasedIOITHelper.java    |  13 +-
 .../java/org/apache/beam/sdk/io/xml/XmlIOIT.java   |   2 +-
 sdks/java/io/mongodb/build.gradle                  |  11 ++
 sdks/java/io/mongodb/pom.xml                       | 206 ++++++++++++++++++++-
 .../apache/beam/sdk/io/mongodb/MongoDBIOIT.java    | 143 ++++++++++++++
 13 files changed, 651 insertions(+), 17 deletions(-)

diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy 
b/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy
new file mode 100644
index 0000000..1cd469c
--- /dev/null
+++ b/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import common_job_properties
+
+job('beam_PerformanceTests_MongoDBIO_IT') {
+    // Set default Beam job properties.
+    common_job_properties.setTopLevelMainJobProperties(delegate)
+
+    // Run job in postcommit every 6 hours, don't trigger every push, and
+    // don't email individual committers.
+    common_job_properties.setPostCommit(
+            delegate,
+            '0 */6 * * *',
+            false,
+            'commits@beam.apache.org',
+            false)
+
+    common_job_properties.enablePhraseTriggeringFromPullRequest(
+            delegate,
+            'Java MongoDBIO Performance Test',
+            'Run Java MongoDBIO Performance Test')
+
+    def pipelineOptions = [
+            tempRoot       : 'gs://temp-storage-for-perf-tests',
+            project        : 'apache-beam-testing',
+            numberOfRecords: '10000000'
+    ]
+
+    String namespace = 
common_job_properties.getKubernetesNamespace('mongodbioit')
+    String kubeconfig = 
common_job_properties.getKubeconfigLocationForNamespace(namespace)
+
+    def testArgs = [
+            kubeconfig              : kubeconfig,
+            beam_it_timeout         : '1800',
+            benchmarks              : 'beam_integration_benchmark',
+            beam_it_profile         : 'io-it',
+            beam_prebuilt           : 'true',
+            beam_sdk                : 'java',
+            beam_it_module          : 'sdks/java/io/mongodb',
+            beam_it_class           : 
'org.apache.beam.sdk.io.mongodb.MongoDBIOIT',
+            beam_it_options         : 
common_job_properties.joinPipelineOptions(pipelineOptions),
+            beam_kubernetes_scripts : 
common_job_properties.makePathAbsolute('src/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml'),
+            beam_options_config_file: 
common_job_properties.makePathAbsolute('src/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml'),
+            bigquery_table          : 
'beam_performance.mongodbioit_pkb_results'
+    ]
+
+    common_job_properties.setupKubernetes(delegate, namespace, kubeconfig)
+    common_job_properties.buildPerformanceTest(delegate, testArgs)
+    common_job_properties.cleanupKubernetes(delegate, namespace, kubeconfig)
+}
diff --git a/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml 
b/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml
new file mode 100644
index 0000000..70e1965
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml
@@ -0,0 +1,49 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+apiVersion: v1
+kind: Service
+metadata:
+  name: mongo-load-balancer-service
+  labels:
+    name: mongo
+spec:
+  ports:
+    - port: 27017
+  selector:
+    name: mongo
+  type: LoadBalancer
+
+---
+
+apiVersion: v1
+kind: ReplicationController
+metadata:
+  name: mongo
+spec:
+  replicas: 1
+  selector:
+    name: mongo
+  template:
+    metadata:
+      name: mongo
+      labels:
+        name: mongo
+    spec:
+      containers:
+        - name: mongo
+          image: mongo
+          ports:
+            - containerPort: 27017
diff --git a/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml 
b/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml
new file mode 100644
index 0000000..299de0d
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is a pkb benchmark configuration file, used when running the IO ITs
+# that use this data store. It allows users to run tests when they are on a
+# separate network from the kubernetes cluster by reading the mongo IP
+# address from the LoadBalancer service.
+#
+# This file defines pipeline options to pass to beam, as well as how to derive
+# the values for those pipeline options from kubernetes (where appropriate.)
+
+static_pipeline_options:
+  - mongoDBDatabaseName: beam
+  - mongoDBPort: 27017
+dynamic_pipeline_options:
+  - name: mongoDBHostName
+    type: LoadBalancerIp
+    serviceName: mongo-load-balancer-service
diff --git a/.test-infra/kubernetes/mongodb/node-port/mongo.yml 
b/.test-infra/kubernetes/mongodb/node-port/mongo.yml
new file mode 100644
index 0000000..d640687
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/node-port/mongo.yml
@@ -0,0 +1,50 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+apiVersion: v1
+kind: Service
+metadata:
+  name: mongo-node-port-service
+  labels:
+    name: mongo
+spec:
+  ports:
+    - port: 27017
+      nodePort: 31235
+  selector:
+    name: mongo
+  type: NodePort
+
+---
+
+apiVersion: v1
+kind: ReplicationController
+metadata:
+  name: mongo
+spec:
+  replicas: 1
+  selector:
+    name: mongo
+  template:
+    metadata:
+      name: mongo
+      labels:
+        name: mongo
+    spec:
+      containers:
+        - name: mongo
+          image: mongo
+          ports:
+            - containerPort: 27017
diff --git a/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml 
b/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml
new file mode 100644
index 0000000..1436b75
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is a pkb benchmark configuration file, used when running the IO ITs
+# that use this data store.
+#
+# This file defines pipeline options to pass to beam, as well as how to derive
+# the values for those pipeline options from kubernetes (where appropriate.)
+
+static_pipeline_options:
+  - mongoDBDatabaseName: beam
+  - mongoDBPort: 27017
+dynamic_pipeline_options:
+  - name: mongoDBHostName
+    type: NodePortIp
+    podLabel: name=mongo
diff --git 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
new file mode 100644
index 0000000..819bc9a
--- /dev/null
+++ 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import java.util.Map;
+
+/**
+ * Methods common to all types of IOITs.
+ */
+public class IOITHelper {
+
+  private IOITHelper() {
+  }
+
+  public static String getHashForRecordCount(int recordCount, Map<Integer, 
String> hashes) {
+    String hash = hashes.get(recordCount);
+    if (hash == null) {
+      throw new UnsupportedOperationException(
+        String.format("No hash for that record count: %s", recordCount)
+      );
+    }
+    return hash;
+  }
+}
diff --git 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 89b7ae8..1260167 100644
--- 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -114,4 +114,23 @@ public interface IOTestPipelineOptions extends 
TestPipelineOptions {
   String getCharset();
 
   void setCharset(String charset);
+
+  /* MongoDB */
+  @Description("MongoDB host (host name/ip address)")
+  @Default.String("mongodb-host")
+  String getMongoDBHostName();
+
+  void setMongoDBHostName(String host);
+
+  @Description("Port for MongoDB")
+  @Default.Integer(27017)
+  Integer getMongoDBPort();
+
+  void setMongoDBPort(Integer port);
+
+  @Description("Mongo database name")
+  @Default.String("beam")
+  String getMongoDBDatabaseName();
+
+  void setMongoDBDatabaseName(String name);
 }
diff --git 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
index e6bc7e8..4465456 100644
--- 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
+++ 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.common;
 
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
@@ -108,10 +110,6 @@ public abstract class TestRow implements Serializable, 
Comparable<TestRow> {
    */
   public static String getExpectedHashForRowCount(int rowCount)
       throws UnsupportedOperationException {
-    String hash = EXPECTED_HASHES.get(rowCount);
-    if (hash == null) {
-      throw new UnsupportedOperationException("No hash for that row count");
-    }
-    return hash;
+    return getHashForRecordCount(rowCount, EXPECTED_HASHES);
   }
 }
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index bbf707e..5b7ff38 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.common;
 
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
@@ -33,6 +35,7 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 
+
 /**
  * Contains helper methods for file based IO Integration tests.
  */
@@ -64,16 +67,6 @@ public class FileBasedIOITHelper {
     return getHashForRecordCount(lineCount, expectedHashes);
   }
 
-  public static String getHashForRecordCount(int recordCount, Map<Integer, 
String> hashes) {
-    String hash = hashes.get(recordCount);
-    if (hash == null) {
-      throw new UnsupportedOperationException(
-        String.format("No hash for that record count: %s", recordCount)
-      );
-    }
-    return hash;
-  }
-
   /**
    * Constructs text lines in files used for testing.
    */
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
index 7176d7f..3ea27aa 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.io.xml;
 
 import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
-import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getHashForRecordCount;
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
 
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
diff --git a/sdks/java/io/mongodb/build.gradle 
b/sdks/java/io/mongodb/build.gradle
index 9223780..6e82f35 100644
--- a/sdks/java/io/mongodb/build.gradle
+++ b/sdks/java/io/mongodb/build.gradle
@@ -21,6 +21,15 @@ applyJavaNature(artifactId: "beam-sdks-java-io-mongodb")
 
 description = "Apache Beam :: SDKs :: Java :: IO :: MongoDB"
 
+/*
+ * We need to rely on manually specifying these evaluationDependsOn to ensure 
that
+ * the following projects are evaluated before we evaluate this project. This 
is because
+ * we are attempting to reference the "sourceSets.test.output" directly.
+ * TODO: Swap to generating test artifacts which we can then rely on instead of
+ * the test outputs directly.
+ */
+evaluationDependsOn(":sdks:java:io:common")
+
 dependencies {
   compile library.java.guava
   shadow project(path: ":sdks:java:core", configuration: "shadow")
@@ -32,6 +41,8 @@ dependencies {
   testCompile library.java.junit
   testCompile library.java.slf4j_jdk14
   testCompile library.java.hamcrest_core
+  testCompile project(path: ":sdks:java:io:common", configuration: "shadow")
+  testCompile project(":sdks:java:io:common").sourceSets.test.output
   testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:1.50.1"
   testCompile "de.flapdoodle.embed:de.flapdoodle.embed.process:1.50.1"
 }
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index ad811b5..2f237cb 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -34,6 +34,199 @@
     <mongo-java-driver.version>3.2.2</mongo-java-driver.version>
   </properties>
 
+
+  <profiles>
+    <!--
+      This profile invokes PerfKitBenchmarker, which does benchmarking of
+      the IO ITs. The arguments passed to it allow it to invoke mvn again
+      with the desired benchmark.
+
+      To invoke this, run:
+
+      mvn verify -Dio-it-suite -pl sdks/java/io/mongodb
+          -DpkbLocation="path-to-pkb.py" \
+          -DintegrationTestPipelineOptions='["-numberOfRecords=1000"]'
+  -->
+    <profile>
+      <id>io-it-suite</id>
+      <activation>
+        <property><name>io-it-suite</name></property>
+      </activation>
+      <properties>
+        <!-- This is based on the location of the current pom relative to the 
root
+             See discussion in BEAM-2460 -->
+        
<beamRootProjectDir>${project.parent.parent.parent.parent.basedir}</beamRootProjectDir>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.gmaven</groupId>
+            <artifactId>groovy-maven-plugin</artifactId>
+            <version>${groovy-maven-plugin.version}</version>
+            <executions>
+              <execution>
+                <id>find-supported-python-for-compile</id>
+                <phase>initialize</phase>
+                <goals>
+                  <goal>execute</goal>
+                </goals>
+                <configuration>
+                  
<source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <version>${maven-exec-plugin.version}</version>
+            <executions>
+              <execution>
+                <phase>verify</phase>
+                <goals>
+                  <goal>exec</goal>
+                </goals>
+              </execution>
+            </executions>
+            <configuration>
+              <executable>${python.interpreter.bin}</executable>
+              <arguments>
+                <argument>${pkbLocation}</argument>
+                <argument>-beam_it_timeout=1800</argument>
+                <argument>-benchmarks=beam_integration_benchmark</argument>
+                <argument>-beam_it_profile=io-it</argument>
+                <argument>-beam_location=${beamRootProjectDir}</argument>
+                <argument>-beam_prebuilt=true</argument>
+                <argument>-beam_sdk=java</argument>
+                <argument>-kubeconfig=${kubeconfig}</argument>
+                <argument>-kubectl=${kubectl}</argument>
+                <!-- runner overrides, controlled via forceDirectRunner -->
+                <argument>${pkbBeamRunnerProfile}</argument>
+                <argument>${pkbBeamRunnerOption}</argument>
+                <!-- specific to this IO -->
+                
<argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml</argument>
+                
<argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/node-port/mongo.yml</argument>
+                <argument>-beam_it_module=sdks/java/io/mongodb</argument>
+                
<argument>-beam_it_class=org.apache.beam.sdk.io.mongodb.MongoDBIOIT</argument>
+                <!-- arguments typically defined by user -->
+                
<argument>-beam_it_options=${integrationTestPipelineOptions}</argument>
+              </arguments>
+            </configuration>
+          </plugin>
+
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>true</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <!--
+      io-it-suite-local overrides part of io-it-suite, allowing users to run 
tests
+      when they are on a separate network from the kubernetes cluster by
+      creating a LoadBalancer service.
+    -->
+    <profile>
+      <id>io-it-suite-local</id>
+      
<activation><property><name>io-it-suite-local</name></property></activation>
+      <properties>
+        <!-- This is based on the location of the current pom relative to the 
root
+             See discussion in BEAM-2460 -->
+        
<beamRootProjectDir>${project.parent.parent.parent.parent.basedir}</beamRootProjectDir>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.gmaven</groupId>
+            <artifactId>groovy-maven-plugin</artifactId>
+            <version>${groovy-maven-plugin.version}</version>
+            <executions>
+              <execution>
+                <id>find-supported-python-for-compile</id>
+                <phase>initialize</phase>
+                <goals>
+                  <goal>execute</goal>
+                </goals>
+                <configuration>
+                  
<source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <version>${maven-exec-plugin.version}</version>
+            <executions>
+              <execution>
+                <phase>verify</phase>
+                <goals>
+                  <goal>exec</goal>
+                </goals>
+              </execution>
+            </executions>
+            <configuration>
+              <executable>${python.interpreter.bin}</executable>
+              <arguments>
+                <argument>${pkbLocation}</argument>
+                <argument>-beam_it_timeout=1800</argument>
+                <argument>-benchmarks=beam_integration_benchmark</argument>
+                <argument>-beam_it_profile=io-it</argument>
+                <argument>-beam_location=${beamRootProjectDir}</argument>
+                <argument>-beam_prebuilt=true</argument>
+                <argument>-beam_sdk=java</argument>
+                <argument>-kubeconfig=${kubeconfig}</argument>
+                <argument>-kubectl=${kubectl}</argument>
+                <!-- runner overrides, controlled via forceDirectRunner -->
+                <argument>${pkbBeamRunnerProfile}</argument>
+                <argument>${pkbBeamRunnerOption}</argument>
+                <!-- specific to this IO -->
+                
<argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml</argument>
+                
<argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml</argument>
+                <argument>-beam_it_module=sdks/java/io/mongodb</argument>
+                
<argument>-beam_it_class=org.apache.beam.sdk.io.mongodb.MongoDBIOIT</argument>
+                <!-- arguments typically defined by user -->
+                
<argument>-beam_it_options=${integrationTestPipelineOptions}</argument>
+              </arguments>
+            </configuration>
+          </plugin>
+
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>true</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <!-- Include the Google Cloud Dataflow runner activated by 
-DintegrationTestRunner=dataflow -->
+    <profile>
+      <id>dataflow-runner</id>
+      <activation>
+        <property>
+          <name>integrationTestRunner</name>
+          <value>dataflow</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>
@@ -108,7 +301,18 @@
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-library</artifactId>
       <scope>test</scope>
-    </dependency>   
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-common</artifactId>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-common</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
new file mode 100644
index 0000000..4f425a3
--- /dev/null
+++ 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+
+import com.google.common.collect.ImmutableMap;
+import com.mongodb.MongoClient;
+import java.util.Date;
+import java.util.Map;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.bson.Document;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.mongodb.MongoDbIO} on an 
independent Mongo instance.
+ *
+ * <p>This test requires a running instance of MongoDB. Pass in connection 
information using
+ * PipelineOptions:
+ * <pre>
+ *  mvn -e -Pio-it verify -pl sdks/java/io/mongodb 
-DintegrationTestPipelineOptions='[
+ *  "--mongoDBHostName=1.2.3.4",
+ *  "--mongoDBPort=27017",
+ *  "--mongoDBDatabaseName=mypass",
+ *  "--numberOfRecords=1000" ]'
+ * </pre>
+ *
+ */
+@RunWith(JUnit4.class)
+public class MongoDBIOIT {
+
+  private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(
+    1000, "75a0d5803418444e76ae5b421662764c",
+    100_000, "3bc762dc1c291904e3c7f577774c6276",
+    10_000_000, "e5e0503902018c83e8c8977ef437feba"
+  );
+
+  private static int numberOfRecords;
+
+  private static String host;
+
+  private static Integer port;
+
+  private static String database;
+
+  private static String collection;
+
+  @Rule
+  public final TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule
+  public final TestPipeline readPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setUp() {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
+      .as(IOTestPipelineOptions.class);
+
+    numberOfRecords = options.getNumberOfRecords();
+    host = options.getMongoDBHostName();
+    port = options.getMongoDBPort();
+    database = options.getMongoDBDatabaseName();
+    collection = String.format("test_%s", new Date().getTime());
+  }
+
+  @After
+  public void tearDown() {
+    new MongoClient(host).getDatabase(database).drop();
+  }
+
+  @Test
+  public void testWriteAndRead() {
+    String mongoUrl = String.format("mongodb://%s:%s", host, port);
+
+    writePipeline
+      .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRecords))
+      .apply("Produce documents", MapElements.via(new LongToDocumentFn()))
+      .apply("Write documents to MongoDB", MongoDbIO.write()
+        .withUri(mongoUrl)
+        .withDatabase(database)
+        .withCollection(collection));
+
+    writePipeline.run().waitUntilFinish();
+
+    PCollection<String> consolidatedHashcode  = readPipeline
+      .apply("Read all documents", MongoDbIO.read()
+        .withUri(mongoUrl)
+        .withDatabase(database)
+        .withCollection(collection))
+      .apply("Map documents to Strings", MapElements.via(new 
DocumentToStringFn()))
+      .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+    String expectedHash = getHashForRecordCount(numberOfRecords, 
EXPECTED_HASHES);
+    PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+    readPipeline.run().waitUntilFinish();
+  }
+
+  private static class LongToDocumentFn extends SimpleFunction<Long, Document> 
{
+    @Override
+    public Document apply(Long input) {
+      return Document.parse(String.format("{\"scientist\":\"Test %s\"}", 
input));
+    }
+  }
+
+  private static class DocumentToStringFn extends SimpleFunction<Document, 
String> {
+    @Override
+    public String apply(Document input) {
+      return input.getString("scientist");
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
chamik...@apache.org.

Reply via email to