Repository: spark
Updated Branches:
  refs/heads/master 32790fe72 -> 1dde39d79


[SPARK-9818] Re-enable Docker tests for JDBC data source

This patch re-enables tests for the Docker JDBC data source. These tests were 
reverted in #4872 due to transitive dependency conflicts introduced by the 
`docker-client` library. This patch should avoid those problems by using a 
version of `docker-client` which shades its transitive dependencies and by 
performing some build-magic to work around problems with that shaded JAR.

In addition, I significantly refactored the tests to simplify the setup and 
teardown code and to fix several Docker networking issues which caused problems 
when running in `boot2docker`.

Closes #8101.

Author: Josh Rosen <joshro...@databricks.com>
Author: Yijie Shen <henry.yijies...@gmail.com>

Closes #9503 from JoshRosen/docker-jdbc-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1dde39d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1dde39d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1dde39d7

Branch: refs/heads/master
Commit: 1dde39d796bbf42336051a86bedf871c7fddd513
Parents: 32790fe
Author: Josh Rosen <joshro...@databricks.com>
Authored: Tue Nov 10 15:58:30 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Nov 10 15:58:30 2015 -0800

----------------------------------------------------------------------
 docker-integration-tests/pom.xml                | 149 +++++++++++++++++
 .../sql/jdbc/DockerJDBCIntegrationSuite.scala   | 160 +++++++++++++++++++
 .../spark/sql/jdbc/MySQLIntegrationSuite.scala  | 153 ++++++++++++++++++
 .../sql/jdbc/PostgresIntegrationSuite.scala     |  82 ++++++++++
 .../org/apache/spark/util/DockerUtils.scala     |  68 ++++++++
 pom.xml                                         |  14 ++
 project/SparkBuild.scala                        |  14 +-
 .../java/org/apache/spark/tags/DockerTest.java  |  26 +++
 8 files changed, 664 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1dde39d7/docker-integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml
new file mode 100644
index 0000000..dee0c4a
--- /dev/null
+++ b/docker-integration-tests/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.10</artifactId>
+    <version>1.6.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-docker-integration-tests_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Docker Integration Tests</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>docker-integration-tests</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.spotify</groupId>
+      <artifactId>docker-client</artifactId>
+      <classifier>shaded</classifier>
+      <scope>test</scope>
+      <!--
+        See 
https://github.com/spotify/docker-client/pull/272#issuecomment-155249101
+        for an explanation of why these exclusions are (necessarily) a mess.
+      -->
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.jaxrs</groupId>
+          <artifactId>jackson-jaxrs-json-provider</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.datatype</groupId>
+          <artifactId>jackson-datatype-guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.core</groupId>
+          <artifactId>jersey-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.connectors</groupId>
+          <artifactId>jersey-apache-connector</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.jersey.media</groupId>
+          <artifactId>jersey-media-json-jackson</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <!-- Necessary in order to avoid errors in log messages: -->
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>18.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <!-- Jersey dependencies, used to override version.
+     See https://github.com/apache/spark/pull/9503#issuecomment-154369560 for
+     background on why we need to use a newer Jersey only in this test module;
+     we can remove this once https://github.com/spotify/docker-client/pull/272 
is
+     merged and a new docker-client release is published. -->
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-server</artifactId>
+      <version>1.19</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-core</artifactId>
+      <version>1.19</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-servlet</artifactId>
+      <version>1.19</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+      <version>1.19</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>stax</groupId>
+          <artifactId>stax-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <!-- End Jersey dependencies -->
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/1dde39d7/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
 
b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
new file mode 100644
index 0000000..c503c4a
--- /dev/null
+++ 
b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.net.ServerSocket
+import java.sql.Connection
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.spotify.docker.client._
+import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, 
PortBinding}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.util.DockerUtils
+import org.apache.spark.sql.test.SharedSQLContext
+
+abstract class DatabaseOnDocker {
+  /**
+   * The docker image to be pulled.
+   */
+  val imageName: String
+
+  /**
+   * Environment variables to set inside of the Docker container while 
launching it.
+   */
+  val env: Map[String, String]
+
+  /**
+   * The container-internal JDBC port that the database listens on.
+   */
+  val jdbcPort: Int
+
+  /**
+   * Return a JDBC URL that connects to the database running at the given IP 
address and port.
+   */
+  def getJdbcUrl(ip: String, port: Int): String
+}
+
+abstract class DockerJDBCIntegrationSuite
+  extends SparkFunSuite
+  with BeforeAndAfterAll
+  with Eventually
+  with SharedSQLContext {
+
+  val db: DatabaseOnDocker
+
+  private var docker: DockerClient = _
+  private var containerId: String = _
+  protected var jdbcUrl: String = _
+
+  override def beforeAll() {
+    super.beforeAll()
+    try {
+      docker = DefaultDockerClient.fromEnv.build()
+      // Check that Docker is actually up
+      try {
+        docker.ping()
+      } catch {
+        case NonFatal(e) =>
+          log.error("Exception while connecting to Docker. Check whether 
Docker is running.")
+          throw e
+      }
+      // Ensure that the Docker image is installed:
+      try {
+        docker.inspectImage(db.imageName)
+      } catch {
+        case e: ImageNotFoundException =>
+          log.warn(s"Docker image ${db.imageName} not found; pulling image 
from registry")
+          docker.pull(db.imageName)
+      }
+      // Configure networking (necessary for boot2docker / Docker Machine)
+      val externalPort: Int = {
+        val sock = new ServerSocket(0)
+        val port = sock.getLocalPort
+        sock.close()
+        port
+      }
+      val dockerIp = DockerUtils.getDockerIp()
+      val hostConfig: HostConfig = HostConfig.builder()
+        .networkMode("bridge")
+        .portBindings(
+          Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, 
externalPort)).asJava).asJava)
+        .build()
+      // Create the database container:
+      val config = ContainerConfig.builder()
+        .image(db.imageName)
+        .networkDisabled(false)
+        .env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava)
+        .hostConfig(hostConfig)
+        .exposedPorts(s"${db.jdbcPort}/tcp")
+        .build()
+      containerId = docker.createContainer(config).id
+      // Start the container and wait until the database can accept JDBC 
connections:
+      docker.startContainer(containerId)
+      jdbcUrl = db.getJdbcUrl(dockerIp, externalPort)
+      eventually(timeout(60.seconds), interval(1.seconds)) {
+        val conn = java.sql.DriverManager.getConnection(jdbcUrl)
+        conn.close()
+      }
+      // Run any setup queries:
+      val conn: Connection = java.sql.DriverManager.getConnection(jdbcUrl)
+      try {
+        dataPreparation(conn)
+      } finally {
+        conn.close()
+      }
+    } catch {
+      case NonFatal(e) =>
+        try {
+          afterAll()
+        } finally {
+          throw e
+        }
+    }
+  }
+
+  override def afterAll() {
+    try {
+      if (docker != null) {
+        try {
+          if (containerId != null) {
+            docker.killContainer(containerId)
+            docker.removeContainer(containerId)
+          }
+        } catch {
+          case NonFatal(e) =>
+            logWarning(s"Could not stop container $containerId", e)
+        } finally {
+          docker.close()
+        }
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  /**
+   * Prepare databases and tables for testing.
+   */
+  def dataPreparation(connection: Connection): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1dde39d7/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
 
b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
new file mode 100644
index 0000000..c68e4dc
--- /dev/null
+++ 
b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.math.BigDecimal
+import java.sql.{Connection, Date, Timestamp}
+import java.util.Properties
+
+import org.apache.spark.tags.DockerTest
+
+@DockerTest
+class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
+  override val db = new DatabaseOnDocker {
+    override val imageName = "mysql:5.7.9"
+    override val env = Map(
+      "MYSQL_ROOT_PASSWORD" -> "rootpass"
+    )
+    override val jdbcPort: Int = 3306
+    override def getJdbcUrl(ip: String, port: Int): String =
+      s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
+  }
+
+  override def dataPreparation(conn: Connection): Unit = {
+    conn.prepareStatement("CREATE DATABASE foo").executeUpdate()
+    conn.prepareStatement("CREATE TABLE tbl (x INTEGER, y 
TEXT(8))").executeUpdate()
+    conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate()
+    conn.prepareStatement("INSERT INTO tbl VALUES (17,'dave')").executeUpdate()
+
+    conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits 
BIT(10), "
+      + "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci 
DECIMAL(40,20), flt FLOAT, "
+      + "dbl DOUBLE)").executeUpdate()
+    conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', "
+      + "17, 77777, 123456789, 123456789012345, 
123456789012345.123456789012345, "
+      + "42.75, 1.0000000000000002)").executeUpdate()
+
+    conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts 
TIMESTAMP, "
+      + "yr YEAR)").executeUpdate()
+    conn.prepareStatement("INSERT INTO dates VALUES ('1991-11-09', '13:31:24', 
"
+      + "'1996-01-01 01:23:45', '2009-02-13 23:31:30', 
'2001')").executeUpdate()
+
+    // TODO: Test locale conversion for strings.
+    conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c 
TINYTEXT, "
+      + "d TEXT, e MEDIUMTEXT, f LONGTEXT, g BINARY(4), h VARBINARY(10), i 
BLOB)"
+    ).executeUpdate()
+    conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 
'brown', 'fox', " +
+      "'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate()
+  }
+
+  test("Basic test") {
+    val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
+    val rows = df.collect()
+    assert(rows.length == 2)
+    val types = rows(0).toSeq.map(x => x.getClass.toString)
+    assert(types.length == 2)
+    assert(types(0).equals("class java.lang.Integer"))
+    assert(types(1).equals("class java.lang.String"))
+  }
+
+  test("Numeric types") {
+    val df = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
+    val rows = df.collect()
+    assert(rows.length == 1)
+    val types = rows(0).toSeq.map(x => x.getClass.toString)
+    assert(types.length == 9)
+    assert(types(0).equals("class java.lang.Boolean"))
+    assert(types(1).equals("class java.lang.Long"))
+    assert(types(2).equals("class java.lang.Integer"))
+    assert(types(3).equals("class java.lang.Integer"))
+    assert(types(4).equals("class java.lang.Integer"))
+    assert(types(5).equals("class java.lang.Long"))
+    assert(types(6).equals("class java.math.BigDecimal"))
+    assert(types(7).equals("class java.lang.Double"))
+    assert(types(8).equals("class java.lang.Double"))
+    assert(rows(0).getBoolean(0) == false)
+    assert(rows(0).getLong(1) == 0x225)
+    assert(rows(0).getInt(2) == 17)
+    assert(rows(0).getInt(3) == 77777)
+    assert(rows(0).getInt(4) == 123456789)
+    assert(rows(0).getLong(5) == 123456789012345L)
+    val bd = new BigDecimal("123456789012345.12345678901234500000")
+    assert(rows(0).getAs[BigDecimal](6).equals(bd))
+    assert(rows(0).getDouble(7) == 42.75)
+    assert(rows(0).getDouble(8) == 1.0000000000000002)
+  }
+
+  test("Date types") {
+    val df = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
+    val rows = df.collect()
+    assert(rows.length == 1)
+    val types = rows(0).toSeq.map(x => x.getClass.toString)
+    assert(types.length == 5)
+    assert(types(0).equals("class java.sql.Date"))
+    assert(types(1).equals("class java.sql.Timestamp"))
+    assert(types(2).equals("class java.sql.Timestamp"))
+    assert(types(3).equals("class java.sql.Timestamp"))
+    assert(types(4).equals("class java.sql.Date"))
+    assert(rows(0).getAs[Date](0).equals(Date.valueOf("1991-11-09")))
+    assert(rows(0).getAs[Timestamp](1).equals(Timestamp.valueOf("1970-01-01 
13:31:24")))
+    assert(rows(0).getAs[Timestamp](2).equals(Timestamp.valueOf("1996-01-01 
01:23:45")))
+    assert(rows(0).getAs[Timestamp](3).equals(Timestamp.valueOf("2009-02-13 
23:31:30")))
+    assert(rows(0).getAs[Date](4).equals(Date.valueOf("2001-01-01")))
+  }
+
+  test("String types") {
+    val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
+    val rows = df.collect()
+    assert(rows.length == 1)
+    val types = rows(0).toSeq.map(x => x.getClass.toString)
+    assert(types.length == 9)
+    assert(types(0).equals("class java.lang.String"))
+    assert(types(1).equals("class java.lang.String"))
+    assert(types(2).equals("class java.lang.String"))
+    assert(types(3).equals("class java.lang.String"))
+    assert(types(4).equals("class java.lang.String"))
+    assert(types(5).equals("class java.lang.String"))
+    assert(types(6).equals("class [B"))
+    assert(types(7).equals("class [B"))
+    assert(types(8).equals("class [B"))
+    assert(rows(0).getString(0).equals("the"))
+    assert(rows(0).getString(1).equals("quick"))
+    assert(rows(0).getString(2).equals("brown"))
+    assert(rows(0).getString(3).equals("fox"))
+    assert(rows(0).getString(4).equals("jumps"))
+    assert(rows(0).getString(5).equals("over"))
+    assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), 
Array[Byte](116, 104, 101, 0)))
+    assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](7), 
Array[Byte](108, 97, 122, 121)))
+    assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](8), 
Array[Byte](100, 111, 103)))
+  }
+
+  test("Basic write test") {
+    val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
+    val df2 = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
+    val df3 = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
+    df1.write.jdbc(jdbcUrl, "numberscopy", new Properties)
+    df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
+    df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1dde39d7/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
 
b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
new file mode 100644
index 0000000..164a7f3
--- /dev/null
+++ 
b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Connection
+import java.util.Properties
+
+import org.apache.spark.tags.DockerTest
+
+@DockerTest
+class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
+  override val db = new DatabaseOnDocker {
+    override val imageName = "postgres:9.4.5"
+    override val env = Map(
+      "POSTGRES_PASSWORD" -> "rootpass"
+    )
+    override val jdbcPort = 5432
+    override def getJdbcUrl(ip: String, port: Int): String =
+      s"jdbc:postgresql://$ip:$port/postgres?user=postgres&password=rootpass"
+  }
+
+  override def dataPreparation(conn: Connection): Unit = {
+    conn.prepareStatement("CREATE DATABASE foo").executeUpdate()
+    conn.setCatalog("foo")
+    conn.prepareStatement("CREATE TABLE bar (a text, b integer, c double 
precision, d bigint, "
+      + "e bit(1), f bit(10), g bytea, h boolean, i inet, j 
cidr)").executeUpdate()
+    conn.prepareStatement("INSERT INTO bar VALUES ('hello', 42, 1.25, 
123456789012345, B'0', "
+      + "B'1000100101', E'\\\\xDEADBEEF', true, '172.16.0.42', 
'192.168.0.0/16')").executeUpdate()
+  }
+
+  test("Type mapping for various types") {
+    val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties)
+    val rows = df.collect()
+    assert(rows.length == 1)
+    val types = rows(0).toSeq.map(x => x.getClass.toString)
+    assert(types.length == 10)
+    assert(types(0).equals("class java.lang.String"))
+    assert(types(1).equals("class java.lang.Integer"))
+    assert(types(2).equals("class java.lang.Double"))
+    assert(types(3).equals("class java.lang.Long"))
+    assert(types(4).equals("class java.lang.Boolean"))
+    assert(types(5).equals("class [B"))
+    assert(types(6).equals("class [B"))
+    assert(types(7).equals("class java.lang.Boolean"))
+    assert(types(8).equals("class java.lang.String"))
+    assert(types(9).equals("class java.lang.String"))
+    assert(rows(0).getString(0).equals("hello"))
+    assert(rows(0).getInt(1) == 42)
+    assert(rows(0).getDouble(2) == 1.25)
+    assert(rows(0).getLong(3) == 123456789012345L)
+    assert(rows(0).getBoolean(4) == false)
+    // BIT(10)'s come back as ASCII strings of ten ASCII 0's and 1's...
+    assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](5),
+      Array[Byte](49, 48, 48, 48, 49, 48, 48, 49, 48, 49)))
+    assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6),
+      Array[Byte](0xDE.toByte, 0xAD.toByte, 0xBE.toByte, 0xEF.toByte)))
+    assert(rows(0).getBoolean(7) == true)
+    assert(rows(0).getString(8) == "172.16.0.42")
+    assert(rows(0).getString(9) == "192.168.0.0/16")
+  }
+
+  test("Basic write test") {
+    val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties)
+    df.write.jdbc(jdbcUrl, "public.barcopy", new Properties)
+    // Test only that it doesn't crash.
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1dde39d7/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala
----------------------------------------------------------------------
diff --git 
a/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala
 
b/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala
new file mode 100644
index 0000000..8727177
--- /dev/null
+++ 
b/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.net.{Inet4Address, NetworkInterface, InetAddress}
+
+import scala.collection.JavaConverters._
+import scala.sys.process._
+import scala.util.Try
+
+private[spark] object DockerUtils {
+
+  def getDockerIp(): String = {
+    /** If docker-machine is setup on this box, attempts to find the ip from 
it. */
+    def findFromDockerMachine(): Option[String] = {
+      sys.env.get("DOCKER_MACHINE_NAME").flatMap { name =>
+        Try(Seq("/bin/bash", "-c", s"docker-machine ip $name 
2>/dev/null").!!.trim).toOption
+      }
+    }
+    sys.env.get("DOCKER_IP")
+      .orElse(findFromDockerMachine())
+      .orElse(Try(Seq("/bin/bash", "-c", "boot2docker ip 
2>/dev/null").!!.trim).toOption)
+      .getOrElse {
+        // This block of code is based on Utils.findLocalInetAddress(), but is 
modified to blacklist
+        // certain interfaces.
+        val address = InetAddress.getLocalHost
+        // Address resolves to something like 127.0.1.1, which happens on 
Debian; try to find
+        // a better address using the local network interfaces
+        // getNetworkInterfaces returns ifs in reverse order compared to 
ifconfig output order
+        // on unix-like system. On windows, it returns in index order.
+        // It's more proper to pick ip address following system output order.
+        val blackListedIFs = Seq(
+          "vboxnet0",  // Mac
+          "docker0"    // Linux
+        )
+        val activeNetworkIFs = 
NetworkInterface.getNetworkInterfaces.asScala.toSeq.filter { i =>
+          !blackListedIFs.contains(i.getName)
+        }
+        val reOrderedNetworkIFs = activeNetworkIFs.reverse
+        for (ni <- reOrderedNetworkIFs) {
+          val addresses = ni.getInetAddresses.asScala
+            .filterNot(addr => addr.isLinkLocalAddress || 
addr.isLoopbackAddress).toSeq
+          if (addresses.nonEmpty) {
+            val addr = 
addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
+            // because of Inet6Address.toHostName may add interface at the end 
if it knows about it
+            val strippedAddress = InetAddress.getByAddress(addr.getAddress)
+            return strippedAddress.getHostAddress
+          }
+        }
+        address.getHostAddress
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1dde39d7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fd8c773..c499a80 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,7 @@
     <module>sql/catalyst</module>
     <module>sql/core</module>
     <module>sql/hive</module>
+    <module>docker-integration-tests</module>
     <module>unsafe</module>
     <module>assembly</module>
     <module>external/twitter</module>
@@ -779,6 +780,19 @@
         <scope>test</scope>
       </dependency>
       <dependency>
+        <groupId>com.spotify</groupId>
+        <artifactId>docker-client</artifactId>
+        <classifier>shaded</classifier>
+        <version>3.2.1</version>
+        <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <artifactId>guava</artifactId>
+            <groupId>com.google.guava</groupId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-recipes</artifactId>
         <version>${curator.version}</version>

http://git-wip-us.apache.org/repos/asf/spark/blob/1dde39d7/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a9fb741..b7c6192 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -43,8 +43,9 @@ object BuildCommons {
       "streaming-zeromq", "launcher", "unsafe", 
"test-tags").map(ProjectRef(buildLocation, _))
 
   val optionallyEnabledProjects@Seq(yarn, yarnStable, java8Tests, 
sparkGangliaLgpl,
-    streamingKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", 
"ganglia-lgpl",
-    "streaming-kinesis-asl").map(ProjectRef(buildLocation, _))
+    streamingKinesisAsl, dockerIntegrationTests) =
+    Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl", 
"streaming-kinesis-asl",
+      "docker-integration-tests").map(ProjectRef(buildLocation, _))
 
   val assemblyProjects@Seq(assembly, examples, networkYarn, 
streamingFlumeAssembly, streamingKafkaAssembly, streamingMqttAssembly, 
streamingKinesisAslAssembly) =
     Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", 
"streaming-kafka-assembly", "streaming-mqtt-assembly", 
"streaming-kinesis-asl-assembly")
@@ -240,6 +241,8 @@ object SparkBuild extends PomBuild {
 
   enable(Flume.settings)(streamingFlumeSink)
 
+  enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
+
 
   /**
    * Adds the ability to run the spark shell directly from SBT without 
building an assembly
@@ -291,6 +294,13 @@ object Flume {
   lazy val settings = sbtavro.SbtAvro.avroSettings
 }
 
+object DockerIntegrationTests {
+  // This serves to override the override specified in DependencyOverrides:
+  lazy val settings = Seq(
+    dependencyOverrides += "com.google.guava" % "guava" % "18.0"
+  )
+}
+
 /**
  * Overrides to work around sbt's dependency resolution being different from 
Maven's.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/1dde39d7/tags/src/main/java/org/apache/spark/tags/DockerTest.java
----------------------------------------------------------------------
diff --git a/tags/src/main/java/org/apache/spark/tags/DockerTest.java 
b/tags/src/main/java/org/apache/spark/tags/DockerTest.java
new file mode 100644
index 0000000..0fecf3b
--- /dev/null
+++ b/tags/src/main/java/org/apache/spark/tags/DockerTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.tags;
+
+import java.lang.annotation.*;
+import org.scalatest.TagAnnotation;
+
+@TagAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.TYPE})
+public @interface DockerTest { }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to