Repository: spark
Updated Branches:
  refs/heads/master 255b56f9f -> a7a9d1447


http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
new file mode 100644
index 0000000..a4e1f3e
--- /dev/null
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver.server
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.math.{random, round}
+
+import java.sql.Timestamp
+import java.util.{Map => JMap}
+
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hive.service.cli._
+import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, 
Operation, OperationManager}
+import org.apache.hive.service.cli.session.HiveSession
+
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.{Logging, SchemaRDD, Row => SparkRow}
+
+/**
+ * Executes queries using Spark SQL, and maintains a list of handles to active 
queries.
+ */
+class SparkSQLOperationManager(hiveContext: HiveContext) extends 
OperationManager with Logging {
+  val handleToOperation = ReflectionUtils
+    .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
+
+  override def newExecuteStatementOperation(
+      parentSession: HiveSession,
+      statement: String,
+      confOverlay: JMap[String, String],
+      async: Boolean): ExecuteStatementOperation = synchronized {
+
+    val operation = new ExecuteStatementOperation(parentSession, statement, 
confOverlay) {
+      private var result: SchemaRDD = _
+      private var iter: Iterator[SparkRow] = _
+      private var dataTypes: Array[DataType] = _
+
+      def close(): Unit = {
+        // RDDs will be cleaned automatically upon garbage collection.
+        logger.debug("CLOSING")
+      }
+
+      def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
+        if (!iter.hasNext) {
+          new RowSet()
+        } else {
+          val maxRows = maxRowsL.toInt // Do you really want a row batch 
larger than Int Max? No.
+          var curRow = 0
+          var rowSet = new ArrayBuffer[Row](maxRows)
+
+          while (curRow < maxRows && iter.hasNext) {
+            val sparkRow = iter.next()
+            val row = new Row()
+            var curCol = 0
+
+            while (curCol < sparkRow.length) {
+              dataTypes(curCol) match {
+                case StringType =>
+                  row.addString(sparkRow(curCol).asInstanceOf[String])
+                case IntegerType =>
+                  
row.addColumnValue(ColumnValue.intValue(sparkRow.getInt(curCol)))
+                case BooleanType =>
+                  
row.addColumnValue(ColumnValue.booleanValue(sparkRow.getBoolean(curCol)))
+                case DoubleType =>
+                  
row.addColumnValue(ColumnValue.doubleValue(sparkRow.getDouble(curCol)))
+                case FloatType =>
+                  
row.addColumnValue(ColumnValue.floatValue(sparkRow.getFloat(curCol)))
+                case DecimalType =>
+                  val hiveDecimal = 
sparkRow.get(curCol).asInstanceOf[BigDecimal].bigDecimal
+                  row.addColumnValue(ColumnValue.stringValue(new 
HiveDecimal(hiveDecimal)))
+                case LongType =>
+                  
row.addColumnValue(ColumnValue.longValue(sparkRow.getLong(curCol)))
+                case ByteType =>
+                  
row.addColumnValue(ColumnValue.byteValue(sparkRow.getByte(curCol)))
+                case ShortType =>
+                  
row.addColumnValue(ColumnValue.intValue(sparkRow.getShort(curCol)))
+                case TimestampType =>
+                  row.addColumnValue(
+                    
ColumnValue.timestampValue(sparkRow.get(curCol).asInstanceOf[Timestamp]))
+                case BinaryType | _: ArrayType | _: StructType | _: MapType =>
+                  val hiveString = result
+                    .queryExecution
+                    .asInstanceOf[HiveContext#QueryExecution]
+                    .toHiveString((sparkRow.get(curCol), dataTypes(curCol)))
+                  row.addColumnValue(ColumnValue.stringValue(hiveString))
+              }
+              curCol += 1
+            }
+            rowSet += row
+            curRow += 1
+          }
+          new RowSet(rowSet, 0)
+        }
+      }
+
+      def getResultSetSchema: TableSchema = {
+        logger.warn(s"Result Schema: ${result.queryExecution.analyzed.output}")
+        if (result.queryExecution.analyzed.output.size == 0) {
+          new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
+        } else {
+          val schema = result.queryExecution.analyzed.output.map { attr =>
+            new FieldSchema(attr.name, 
HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
+          }
+          new TableSchema(schema)
+        }
+      }
+
+      def run(): Unit = {
+        logger.info(s"Running query '$statement'")
+        setState(OperationState.RUNNING)
+        try {
+          result = hiveContext.hql(statement)
+          logger.debug(result.queryExecution.toString())
+          val groupId = round(random * 1000000).toString
+          hiveContext.sparkContext.setJobGroup(groupId, statement)
+          iter = result.queryExecution.toRdd.toLocalIterator
+          dataTypes = 
result.queryExecution.analyzed.output.map(_.dataType).toArray
+          setHasResultSet(true)
+        } catch {
+          // Actually do need to catch Throwable as some failures don't 
inherit from Exception and
+          // HiveServer will silently swallow them.
+          case e: Throwable =>
+            logger.error("Error executing query:",e)
+            throw new HiveSQLException(e.toString)
+        }
+        setState(OperationState.FINISHED)
+      }
+    }
+
+   handleToOperation.put(operation.getHandle, operation)
+   operation
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt 
b/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt
new file mode 100644
index 0000000..850f801
--- /dev/null
+++ b/sql/hive-thriftserver/src/test/resources/data/files/small_kv.txt
@@ -0,0 +1,5 @@
+238val_238
+86val_86
+311val_311
+27val_27
+165val_165

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
new file mode 100644
index 0000000..69f19f8
--- /dev/null
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.io.{BufferedReader, InputStreamReader, PrintWriter}
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils {
+  val WAREHOUSE_PATH = TestUtils.getWarehousePath("cli")
+  val METASTORE_PATH = TestUtils.getMetastorePath("cli")
+
+  override def beforeAll() {
+    val pb = new ProcessBuilder(
+      "../../bin/spark-sql",
+      "--master",
+      "local",
+      "--hiveconf",
+      
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
+      "--hiveconf",
+      "hive.metastore.warehouse.dir=" + WAREHOUSE_PATH)
+
+    process = pb.start()
+    outputWriter = new PrintWriter(process.getOutputStream, true)
+    inputReader = new BufferedReader(new 
InputStreamReader(process.getInputStream))
+    errorReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream))
+    waitForOutput(inputReader, "spark-sql>")
+  }
+
+  override def afterAll() {
+    process.destroy()
+    process.waitFor()
+  }
+
+  test("simple commands") {
+    val dataFilePath = getDataFile("data/files/small_kv.txt")
+    executeQuery("create table hive_test1(key int, val string);")
+    executeQuery("load data local inpath '" + dataFilePath+ "' overwrite into 
table hive_test1;")
+    executeQuery("cache table hive_test1", "Time taken")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
new file mode 100644
index 0000000..fe3403b
--- /dev/null
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import scala.collection.JavaConversions._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent._
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.net.ServerSocket
+import java.sql.{Connection, DriverManager, Statement}
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.sql.Logging
+import org.apache.spark.sql.catalyst.util.getTempFilePath
+
+/**
+ * Test for the HiveThriftServer2 using JDBC.
+ */
+class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with 
TestUtils with Logging {
+
+  val WAREHOUSE_PATH = getTempFilePath("warehouse")
+  val METASTORE_PATH = getTempFilePath("metastore")
+
+  val DRIVER_NAME  = "org.apache.hive.jdbc.HiveDriver"
+  val TABLE = "test"
+  val HOST = "localhost"
+  val PORT =  {
+    // Let the system to choose a random available port to avoid collision 
with other parallel
+    // builds.
+    val socket = new ServerSocket(0)
+    val port = socket.getLocalPort
+    socket.close()
+    port
+  }
+
+  // If verbose is true, the test program will print all outputs coming from 
the Hive Thrift server.
+  val VERBOSE = 
Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).getOrElse("false").toBoolean
+
+  Class.forName(DRIVER_NAME)
+
+  override def beforeAll() { launchServer() }
+
+  override def afterAll() { stopServer() }
+
+  private def launchServer(args: Seq[String] = Seq.empty) {
+    // Forking a new process to start the Hive Thrift server. The reason to do 
this is it is
+    // hard to clean up Hive resources entirely, so we just start a new 
process and kill
+    // that process for cleanup.
+    val defaultArgs = Seq(
+      "../../sbin/start-thriftserver.sh",
+      "--master local",
+      "--hiveconf",
+      "hive.root.logger=INFO,console",
+      "--hiveconf",
+      
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
+      "--hiveconf",
+      s"hive.metastore.warehouse.dir=$WAREHOUSE_PATH")
+    val pb = new ProcessBuilder(defaultArgs ++ args)
+    val environment = pb.environment()
+    environment.put("HIVE_SERVER2_THRIFT_PORT", PORT.toString)
+    environment.put("HIVE_SERVER2_THRIFT_BIND_HOST", HOST)
+    process = pb.start()
+    inputReader = new BufferedReader(new 
InputStreamReader(process.getInputStream))
+    errorReader = new BufferedReader(new 
InputStreamReader(process.getErrorStream))
+    waitForOutput(inputReader, "ThriftBinaryCLIService listening on")
+
+    // Spawn a thread to read the output from the forked process.
+    // Note that this is necessary since in some configurations, log4j could 
be blocked
+    // if its output to stderr are not read, and eventually blocking the 
entire test suite.
+    future {
+      while (true) {
+        val stdout = readFrom(inputReader)
+        val stderr = readFrom(errorReader)
+        if (VERBOSE && stdout.length > 0) {
+          println(stdout)
+        }
+        if (VERBOSE && stderr.length > 0) {
+          println(stderr)
+        }
+        Thread.sleep(50)
+      }
+    }
+  }
+
+  private def stopServer() {
+    process.destroy()
+    process.waitFor()
+  }
+
+  test("test query execution against a Hive Thrift server") {
+    Thread.sleep(5 * 1000)
+    val dataFilePath = getDataFile("data/files/small_kv.txt")
+    val stmt = createStatement()
+    stmt.execute("DROP TABLE IF EXISTS test")
+    stmt.execute("DROP TABLE IF EXISTS test_cached")
+    stmt.execute("CREATE TABLE test(key int, val string)")
+    stmt.execute(s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE 
test")
+    stmt.execute("CREATE TABLE test_cached as select * from test limit 4")
+    stmt.execute("CACHE TABLE test_cached")
+
+    var rs = stmt.executeQuery("select count(*) from test")
+    rs.next()
+    assert(rs.getInt(1) === 5)
+
+    rs = stmt.executeQuery("select count(*) from test_cached")
+    rs.next()
+    assert(rs.getInt(1) === 4)
+
+    stmt.close()
+  }
+
+  def getConnection: Connection = {
+    val connectURI = s"jdbc:hive2://localhost:$PORT/"
+    DriverManager.getConnection(connectURI, System.getProperty("user.name"), 
"")
+  }
+
+  def createStatement(): Statement = getConnection.createStatement()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala
new file mode 100644
index 0000000..bb22426
--- /dev/null
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/TestUtils.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.io.{BufferedReader, PrintWriter}
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import org.apache.hadoop.hive.common.LogUtils
+import org.apache.hadoop.hive.common.LogUtils.LogInitializationException
+
+object TestUtils {
+  val timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss")
+
+  def getWarehousePath(prefix: String): String = {
+    System.getProperty("user.dir") + "/test_warehouses/" + prefix + 
"-warehouse-" +
+      timestamp.format(new Date)
+  }
+
+  def getMetastorePath(prefix: String): String = {
+    System.getProperty("user.dir") + "/test_warehouses/" + prefix + 
"-metastore-" +
+      timestamp.format(new Date)
+  }
+
+  // Dummy function for initialize the log4j properties.
+  def init() { }
+
+  // initialize log4j
+  try {
+    LogUtils.initHiveLog4j()
+  } catch {
+    case e: LogInitializationException => // Ignore the error.
+  }
+}
+
+trait TestUtils {
+  var process : Process = null
+  var outputWriter : PrintWriter = null
+  var inputReader : BufferedReader = null
+  var errorReader : BufferedReader = null
+
+  def executeQuery(
+    cmd: String, outputMessage: String = "OK", timeout: Long = 15000): String 
= {
+    println("Executing: " + cmd + ", expecting output: " + outputMessage)
+    outputWriter.write(cmd + "\n")
+    outputWriter.flush()
+    waitForQuery(timeout, outputMessage)
+  }
+
+  protected def waitForQuery(timeout: Long, message: String): String = {
+    if (waitForOutput(errorReader, message, timeout)) {
+      Thread.sleep(500)
+      readOutput()
+    } else {
+      assert(false, "Didn't find \"" + message + "\" in the output:\n" + 
readOutput())
+      null
+    }
+  }
+
+  // Wait for the specified str to appear in the output.
+  protected def waitForOutput(
+    reader: BufferedReader, str: String, timeout: Long = 10000): Boolean = {
+    val startTime = System.currentTimeMillis
+    var out = ""
+    while (!out.contains(str) && System.currentTimeMillis < (startTime + 
timeout)) {
+      out += readFrom(reader)
+    }
+    out.contains(str)
+  }
+
+  // Read stdout output and filter out garbage collection messages.
+  protected def readOutput(): String = {
+    val output = readFrom(inputReader)
+    // Remove GC Messages
+    val filteredOutput = output.lines.filterNot(x => x.contains("[GC") || 
x.contains("[Full GC"))
+      .mkString("\n")
+    filteredOutput
+  }
+
+  protected def readFrom(reader: BufferedReader): String = {
+    var out = ""
+    var c = 0
+    while (reader.ready) {
+      c = reader.read()
+      out += c.asInstanceOf[Char]
+    }
+    out
+  }
+
+  protected def getDataFile(name: String) = {
+    Thread.currentThread().getContextClassLoader.getResource(name)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/sql/hive/pom.xml
----------------------------------------------------------------------
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 1699ffe..93d00f7 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -32,7 +32,7 @@
   <name>Spark Project Hive</name>
   <url>http://spark.apache.org/</url>
   <properties>
-     <sbt.project.name>hive</sbt.project.name>
+    <sbt.project.name>hive</sbt.project.name>
   </properties>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 201c85f..84d43ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -255,7 +255,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
       Seq(StringType, IntegerType, LongType, DoubleType, FloatType, 
BooleanType, ByteType,
         ShortType, DecimalType, TimestampType, BinaryType)
 
-    protected def toHiveString(a: (Any, DataType)): String = a match {
+    protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
       case (struct: Row, StructType(fields)) =>
         struct.zip(fields).map {
           case (v, t) => s""""${t.name}":${toHiveStructString(v, 
t.dataType)}"""

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index a8623b6..a022a1e 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -419,10 +419,10 @@ class HiveQuerySuite extends HiveComparisonTest {
     hql(s"set $testKey=$testVal")
     assert(get(testKey, testVal + "_") == testVal)
 
-    hql("set mapred.reduce.tasks=20")
-    assert(get("mapred.reduce.tasks", "0") == "20")
-    hql("set mapred.reduce.tasks = 40")
-    assert(get("mapred.reduce.tasks", "0") == "40")
+    hql("set some.property=20")
+    assert(get("some.property", "0") == "20")
+    hql("set some.property = 40")
+    assert(get("some.property", "0") == "40")
 
     hql(s"set $testKey=$testVal")
     assert(get(testKey, "0") == testVal)
@@ -436,63 +436,61 @@ class HiveQuerySuite extends HiveComparisonTest {
     val testKey = "spark.sql.key.usedfortestonly"
     val testVal = "test.val.0"
     val nonexistentKey = "nonexistent"
-    def collectResults(rdd: SchemaRDD): Set[(String, String)] =
-      rdd.collect().map { case Row(key: String, value: String) => key -> value 
}.toSet
 
     clear()
 
     // "set" itself returns all config variables currently specified in 
SQLConf.
     assert(hql("SET").collect().size == 0)
 
-    assertResult(Set(testKey -> testVal)) {
-      collectResults(hql(s"SET $testKey=$testVal"))
+    assertResult(Array(s"$testKey=$testVal")) {
+      hql(s"SET $testKey=$testVal").collect().map(_.getString(0))
     }
 
     assert(hiveconf.get(testKey, "") == testVal)
-    assertResult(Set(testKey -> testVal)) {
-      collectResults(hql("SET"))
+    assertResult(Array(s"$testKey=$testVal")) {
+      hql(s"SET $testKey=$testVal").collect().map(_.getString(0))
     }
 
     hql(s"SET ${testKey + testKey}=${testVal + testVal}")
     assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
-    assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + 
testVal))) {
-      collectResults(hql("SET"))
+    assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal 
+ testVal}")) {
+      hql(s"SET").collect().map(_.getString(0))
     }
 
     // "set key"
-    assertResult(Set(testKey -> testVal)) {
-      collectResults(hql(s"SET $testKey"))
+    assertResult(Array(s"$testKey=$testVal")) {
+      hql(s"SET $testKey").collect().map(_.getString(0))
     }
 
-    assertResult(Set(nonexistentKey -> "<undefined>")) {
-      collectResults(hql(s"SET $nonexistentKey"))
+    assertResult(Array(s"$nonexistentKey=<undefined>")) {
+      hql(s"SET $nonexistentKey").collect().map(_.getString(0))
     }
 
     // Assert that sql() should have the same effects as hql() by repeating 
the above using sql().
     clear()
     assert(sql("SET").collect().size == 0)
 
-    assertResult(Set(testKey -> testVal)) {
-      collectResults(sql(s"SET $testKey=$testVal"))
+    assertResult(Array(s"$testKey=$testVal")) {
+      sql(s"SET $testKey=$testVal").collect().map(_.getString(0))
     }
 
     assert(hiveconf.get(testKey, "") == testVal)
-    assertResult(Set(testKey -> testVal)) {
-      collectResults(sql("SET"))
+    assertResult(Array(s"$testKey=$testVal")) {
+      sql("SET").collect().map(_.getString(0))
     }
 
     sql(s"SET ${testKey + testKey}=${testVal + testVal}")
     assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
-    assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + 
testVal))) {
-      collectResults(sql("SET"))
+    assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal 
+ testVal}")) {
+      sql("SET").collect().map(_.getString(0))
     }
 
-    assertResult(Set(testKey -> testVal)) {
-      collectResults(sql(s"SET $testKey"))
+    assertResult(Array(s"$testKey=$testVal")) {
+      sql(s"SET $testKey").collect().map(_.getString(0))
     }
 
-    assertResult(Set(nonexistentKey -> "<undefined>")) {
-      collectResults(sql(s"SET $nonexistentKey"))
+    assertResult(Array(s"$nonexistentKey=<undefined>")) {
+      sql(s"SET $nonexistentKey").collect().map(_.getString(0))
     }
 
     clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
index f60697c..b99f306 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming_2.10</artifactId>
   <properties>
-     <sbt.project.name>streaming</sbt.project.name>
+    <sbt.project.name>streaming</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project Streaming</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/tools/pom.xml
----------------------------------------------------------------------
diff --git a/tools/pom.xml b/tools/pom.xml
index c0ee8fa..97abb6b 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -27,7 +27,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-tools_2.10</artifactId>
   <properties>
-     <sbt.project.name>tools</sbt.project.name>
+    <sbt.project.name>tools</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project Tools</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/yarn/alpha/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml
index 5b13a1f..51744ec 100644
--- a/yarn/alpha/pom.xml
+++ b/yarn/alpha/pom.xml
@@ -24,7 +24,7 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
   <properties>
-     <sbt.project.name>yarn-alpha</sbt.project.name>
+    <sbt.project.name>yarn-alpha</sbt.project.name>
   </properties>
 
   <groupId>org.apache.spark</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
index efb473a..3faaf05 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -29,7 +29,7 @@
   <packaging>pom</packaging>
   <name>Spark Project YARN Parent POM</name>
   <properties>
-     <sbt.project.name>yarn</sbt.project.name>
+    <sbt.project.name>yarn</sbt.project.name>
   </properties>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/spark/blob/a7a9d144/yarn/stable/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml
index ceaf9f9..b6c8456 100644
--- a/yarn/stable/pom.xml
+++ b/yarn/stable/pom.xml
@@ -24,7 +24,7 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
   <properties>
-     <sbt.project.name>yarn-stable</sbt.project.name>
+    <sbt.project.name>yarn-stable</sbt.project.name>
   </properties>
 
   <groupId>org.apache.spark</groupId>

Reply via email to