spark git commit: [SPARK-19816][SQL][TESTS] Fix an issue that DataFrameCallbackSuite doesn't recover the log level

2017-03-03 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 da04d45c2 -> 664c9795c


[SPARK-19816][SQL][TESTS] Fix an issue that DataFrameCallbackSuite doesn't 
recover the log level

## What changes were proposed in this pull request?

"DataFrameCallbackSuite.execute callback functions when a DataFrame action 
failed" sets the log level to "fatal" but doesn't recover it. Hence, tests 
running after it won't output any logs except fatal logs.

This PR uses `testQuietly` instead to avoid changing the log level.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #17156 from zsxwing/SPARK-19816.

(cherry picked from commit fbc4058037cf5b0be9f14a7dd28105f7f8151bed)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/664c9795
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/664c9795
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/664c9795

Branch: refs/heads/branch-2.1
Commit: 664c9795c94d3536ff9fe54af06e0fb6c0012862
Parents: da04d45
Author: Shixiong Zhu 
Authored: Fri Mar 3 19:00:35 2017 -0800
Committer: Yin Huai 
Committed: Fri Mar 3 19:09:38 2017 -0800

--
 .../scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/664c9795/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index 3ae5ce6..f372e94 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -58,7 +58,7 @@ class DataFrameCallbackSuite extends QueryTest with 
SharedSQLContext {
 spark.listenerManager.unregister(listener)
   }
 
-  test("execute callback functions when a DataFrame action failed") {
+  testQuietly("execute callback functions when a DataFrame action failed") {
 val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)]
 val listener = new QueryExecutionListener {
   override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
@@ -73,8 +73,6 @@ class DataFrameCallbackSuite extends QueryTest with 
SharedSQLContext {
 val errorUdf = udf[Int, Int] { _ => throw new RuntimeException("udf 
error") }
 val df = sparkContext.makeRDD(Seq(1 -> "a")).toDF("i", "j")
 
-// Ignore the log when we are expecting an exception.
-sparkContext.setLogLevel("FATAL")
 val e = intercept[SparkException](df.select(errorUdf($"i")).collect())
 
 assert(metrics.length == 1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19816][SQL][TESTS] Fix an issue that DataFrameCallbackSuite doesn't recover the log level

2017-03-03 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 9e5b4ce72 -> fbc405803


[SPARK-19816][SQL][TESTS] Fix an issue that DataFrameCallbackSuite doesn't 
recover the log level

## What changes were proposed in this pull request?

"DataFrameCallbackSuite.execute callback functions when a DataFrame action 
failed" sets the log level to "fatal" but doesn't recover it. Hence, tests 
running after it won't output any logs except fatal logs.

This PR uses `testQuietly` instead to avoid changing the log level.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #17156 from zsxwing/SPARK-19816.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbc40580
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbc40580
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbc40580

Branch: refs/heads/master
Commit: fbc4058037cf5b0be9f14a7dd28105f7f8151bed
Parents: 9e5b4ce
Author: Shixiong Zhu 
Authored: Fri Mar 3 19:00:35 2017 -0800
Committer: Xiao Li 
Committed: Fri Mar 3 19:00:35 2017 -0800

--
 .../scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fbc40580/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index 9f27d06..7c9ea7d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -60,7 +60,7 @@ class DataFrameCallbackSuite extends QueryTest with 
SharedSQLContext {
 spark.listenerManager.unregister(listener)
   }
 
-  test("execute callback functions when a DataFrame action failed") {
+  testQuietly("execute callback functions when a DataFrame action failed") {
 val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)]
 val listener = new QueryExecutionListener {
   override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
@@ -75,8 +75,6 @@ class DataFrameCallbackSuite extends QueryTest with 
SharedSQLContext {
 val errorUdf = udf[Int, Int] { _ => throw new RuntimeException("udf 
error") }
 val df = sparkContext.makeRDD(Seq(1 -> "a")).toDF("i", "j")
 
-// Ignore the log when we are expecting an exception.
-sparkContext.setLogLevel("FATAL")
 val e = intercept[SparkException](df.select(errorUdf($"i")).collect())
 
 assert(metrics.length == 1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19084][SQL] Ensure context class loader is set when initializing Hive.

2017-03-03 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master a6a7a95e2 -> 9e5b4ce72


[SPARK-19084][SQL] Ensure context class loader is set when initializing Hive.

A change in Hive 2.2 (most probably HIVE-13149) causes this code path to fail,
since the call to "state.getConf.setClassLoader" does not actually change the
context's class loader. Spark doesn't yet officially support Hive 2.2, but some
distribution-specific metastore client libraries may have that change (as 
certain
versions of CDH already do), and this also makes it easier to support 2.2 when 
it
comes out.

Tested with existing unit tests; we've also used this patch extensively with 
Hive
metastore client jars containing the offending patch.

Author: Marcelo Vanzin 

Closes #17154 from vanzin/SPARK-19804.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e5b4ce7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e5b4ce7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e5b4ce7

Branch: refs/heads/master
Commit: 9e5b4ce727cf262a14a411efded85ee1e50a88ed
Parents: a6a7a95
Author: Marcelo Vanzin 
Authored: Fri Mar 3 18:44:31 2017 -0800
Committer: Xiao Li 
Committed: Fri Mar 3 18:44:31 2017 -0800

--
 .../apache/spark/sql/hive/client/HiveClientImpl.scala| 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9e5b4ce7/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 8f98c8f..7acaa9a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -269,16 +269,21 @@ private[hive] class HiveClientImpl(
*/
   def withHiveState[A](f: => A): A = retryLocked {
 val original = Thread.currentThread().getContextClassLoader
-// Set the thread local metastore client to the client associated with 
this HiveClientImpl.
-Hive.set(client)
+val originalConfLoader = state.getConf.getClassLoader
 // The classloader in clientLoader could be changed after addJar, always 
use the latest
-// classloader
+// classloader. We explicitly set the context class loader since 
"conf.setClassLoader" does
+// not do that, and the Hive client libraries may need to load classes 
defined by the client's
+// class loader.
+Thread.currentThread().setContextClassLoader(clientLoader.classLoader)
 state.getConf.setClassLoader(clientLoader.classLoader)
+// Set the thread local metastore client to the client associated with 
this HiveClientImpl.
+Hive.set(client)
 // setCurrentSessionState will use the classLoader associated
 // with the HiveConf in `state` to override the context class loader of 
the current
 // thread.
 shim.setCurrentSessionState(state)
 val ret = try f finally {
+  state.getConf.setClassLoader(originalConfLoader)
   Thread.currentThread().setContextClassLoader(original)
   HiveCatalogMetrics.incrementHiveClientCalls(1)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19718][SS] Handle more interrupt cases properly for Hadoop

2017-03-03 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master f5fdbe043 -> a6a7a95e2


[SPARK-19718][SS] Handle more interrupt cases properly for Hadoop

## What changes were proposed in this pull request?

[SPARK-19617](https://issues.apache.org/jira/browse/SPARK-19617) changed 
`HDFSMetadataLog` to enable interrupts when using the local file system. 
However, now we hit 
[HADOOP-12074](https://issues.apache.org/jira/browse/HADOOP-12074): 
`Shell.runCommand` converts `InterruptedException` to `new 
IOException(ie.toString())` before Hadoop 2.8. This is the Hadoop patch to fix 
HADOOP-1207: 
https://github.com/apache/hadoop/commit/95c73d49b1bb459b626a9ac52acadb8f5fa724de

This PR adds new logic to handle the following cases related to 
`InterruptedException`.
- Check if the message of IOException starts with 
`java.lang.InterruptedException`. If so, treat it as `InterruptedException`. 
This is for pre-Hadoop 2.8.
- Treat `InterruptedIOException` as `InterruptedException`. This is for Hadoop 
2.8+ and other places that may throw `InterruptedIOException` when the thread 
is interrupted.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu 

Closes #17044 from zsxwing/SPARK-19718.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6a7a95e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6a7a95e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6a7a95e

Branch: refs/heads/master
Commit: a6a7a95e2f3482d84fcd744713e43f80ea90e33a
Parents: f5fdbe0
Author: Shixiong Zhu 
Authored: Fri Mar 3 17:10:11 2017 -0800
Committer: Shixiong Zhu 
Committed: Fri Mar 3 17:10:11 2017 -0800

--
 .../execution/streaming/StreamExecution.scala   |  20 +++-
 .../spark/sql/streaming/StreamSuite.scala   | 109 ++-
 2 files changed, 119 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6a7a95e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6e77f35..70912d1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{InterruptedIOException, IOException}
 import java.util.UUID
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
@@ -37,6 +38,12 @@ import 
org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
+/** States for [[StreamExecution]]'s lifecycle. */
+trait State
+case object INITIALIZING extends State
+case object ACTIVE extends State
+case object TERMINATED extends State
+
 /**
  * Manages the execution of a streaming Spark SQL query that is occurring in a 
separate thread.
  * Unlike a standard query, a streaming query executes repeatedly each time 
new data arrives at any
@@ -298,7 +305,14 @@ class StreamExecution(
 // `stop()` is already called. Let `finally` finish the cleanup.
   }
 } catch {
-  case _: InterruptedException if state.get == TERMINATED => // 
interrupted by stop()
+  case _: InterruptedException | _: InterruptedIOException if state.get == 
TERMINATED =>
+// interrupted by stop()
+updateStatusMessage("Stopped")
+  case e: IOException if e.getMessage != null
+&& e.getMessage.startsWith(classOf[InterruptedException].getName)
+&& state.get == TERMINATED =>
+// This is a workaround for HADOOP-12074: `Shell.runCommand` converts 
`InterruptedException`
+// to `new IOException(ie.toString())` before Hadoop 2.8.
 updateStatusMessage("Stopped")
   case e: Throwable =>
 streamDeathCause = new StreamingQueryException(
@@ -721,10 +735,6 @@ class StreamExecution(
 }
   }
 
-  trait State
-  case object INITIALIZING extends State
-  case object ACTIVE extends State
-  case object TERMINATED extends State
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6a7a95e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f44cfad..6dfcd8b 100644
--- a/sql/core/src/test/scala/org/apache/spark/

spark git commit: [SPARK-13446][SQL] Support reading data from Hive 2.0.1 metastore

2017-03-03 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 44281ca81 -> f5fdbe043


[SPARK-13446][SQL] Support reading data from Hive 2.0.1 metastore

### What changes were proposed in this pull request?
This PR is to make Spark work with Hive 2.0's metastores. Compared with Hive 
1.2, Hive 2.0's metastore has an API update due to removal of `HOLD_DDLTIME` in 
https://issues.apache.org/jira/browse/HIVE-12224. Based on the following Hive 
JIRA description, `HOLD_DDLTIME` should be removed from our internal API too. 
(https://github.com/apache/spark/pull/17063 was submitted for it):
> This arcane feature was introduced long ago via HIVE-1394 It was broken as 
> soon as it landed, HIVE-1442 and is thus useless. Fact that no one has fixed 
> it since informs that its not really used by anyone. Better is to remove it 
> so no one hits the bug of HIVE-1442

In the next PR, we will support 2.1.0 metastore, whose APIs were changed due to 
https://issues.apache.org/jira/browse/HIVE-12730. However, before that, we need 
a code cleanup for stats collection and setting.

### How was this patch tested?
Added test cases to VersionsSuite.scala

Author: Xiao Li 

Closes #17061 from gatorsmile/Hive2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5fdbe04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5fdbe04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5fdbe04

Branch: refs/heads/master
Commit: f5fdbe04369d2c04972b7d9686d0c6c046f3d3dc
Parents: 44281ca
Author: Xiao Li 
Authored: Fri Mar 3 16:59:52 2017 -0800
Committer: Wenchen Fan 
Committed: Fri Mar 3 16:59:52 2017 -0800

--
 .../spark/sql/hive/client/HiveClientImpl.scala  |  1 +
 .../apache/spark/sql/hive/client/HiveShim.scala | 74 
 .../sql/hive/client/IsolatedClientLoader.scala  |  1 +
 .../apache/spark/sql/hive/client/package.scala  |  8 ++-
 .../hive/execution/InsertIntoHiveTable.scala| 11 ++-
 .../sql/hive/client/HiveClientBuilder.scala | 12 ++--
 .../spark/sql/hive/client/VersionsSuite.scala   |  9 ++-
 7 files changed, 107 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f5fdbe04/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index c326ac4..8f98c8f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -96,6 +96,7 @@ private[hive] class HiveClientImpl(
 case hive.v1_0 => new Shim_v1_0()
 case hive.v1_1 => new Shim_v1_1()
 case hive.v1_2 => new Shim_v1_2()
+case hive.v2_0 => new Shim_v2_0()
   }
 
   // Create an internal session state for this HiveClientImpl.

http://git-wip-us.apache.org/repos/asf/spark/blob/f5fdbe04/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 9fe1c76..7280748 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -833,3 +833,77 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
   }
 
 }
+
+private[client] class Shim_v2_0 extends Shim_v1_2 {
+  private lazy val loadPartitionMethod =
+findMethod(
+  classOf[Hive],
+  "loadPartition",
+  classOf[Path],
+  classOf[String],
+  classOf[JMap[String, String]],
+  JBoolean.TYPE,
+  JBoolean.TYPE,
+  JBoolean.TYPE,
+  JBoolean.TYPE,
+  JBoolean.TYPE)
+  private lazy val loadTableMethod =
+findMethod(
+  classOf[Hive],
+  "loadTable",
+  classOf[Path],
+  classOf[String],
+  JBoolean.TYPE,
+  JBoolean.TYPE,
+  JBoolean.TYPE,
+  JBoolean.TYPE)
+  private lazy val loadDynamicPartitionsMethod =
+findMethod(
+  classOf[Hive],
+  "loadDynamicPartitions",
+  classOf[Path],
+  classOf[String],
+  classOf[JMap[String, String]],
+  JBoolean.TYPE,
+  JInteger.TYPE,
+  JBoolean.TYPE,
+  JBoolean.TYPE,
+  JLong.TYPE)
+
+  override def loadPartition(
+  hive: Hive,
+  loadPath: Path,
+  tableName: String,
+  partSpec: JMap[String, String],
+  replace: Boolean,
+  inheritTableSpecs: Boolean,
+  isSkewedStoreAsSubdir: Boolean,
+  isSrcLocal: Boolean): Unit = {
+loadPartitionMethod.invoke(hive, 

spark git commit: [SPARK-19348][PYTHON] PySpark keyword_only decorator is not thread-safe

2017-03-03 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 2a7921a81 -> 44281ca81


[SPARK-19348][PYTHON] PySpark keyword_only decorator is not thread-safe

## What changes were proposed in this pull request?
The `keyword_only` decorator in PySpark is not thread-safe.  It writes kwargs 
to a static class variable in the decorator, which is then retrieved later in 
the class method as `_input_kwargs`.  If multiple threads are constructing the 
same class with different kwargs, it becomes a race condition to read from the 
static class variable before it's overwritten.  See 
[SPARK-19348](https://issues.apache.org/jira/browse/SPARK-19348) for 
reproduction code.

This change will write the kwargs to a member variable so that multiple threads 
can operate on separate instances without the race condition.  It does not 
protect against multiple threads operating on a single instance, but that is 
better left to the user to synchronize.

## How was this patch tested?
Added new unit tests for using the keyword_only decorator and a regression test 
that verifies `_input_kwargs` can be overwritten from different class instances.

Author: Bryan Cutler 

Closes #16782 from BryanCutler/pyspark-keyword_only-threadsafe-SPARK-19348.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44281ca8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44281ca8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44281ca8

Branch: refs/heads/master
Commit: 44281ca81d4eda02b627ba21841108438b7d1c27
Parents: 2a7921a
Author: Bryan Cutler 
Authored: Fri Mar 3 16:43:45 2017 -0800
Committer: Joseph K. Bradley 
Committed: Fri Mar 3 16:43:45 2017 -0800

--
 python/pyspark/__init__.py  |  10 +--
 python/pyspark/ml/classification.py |  32 -
 python/pyspark/ml/clustering.py |  16 ++---
 python/pyspark/ml/evaluation.py |  12 ++--
 python/pyspark/ml/feature.py| 120 +++
 python/pyspark/ml/pipeline.py   |   4 +-
 python/pyspark/ml/recommendation.py |   4 +-
 python/pyspark/ml/regression.py |  28 
 python/pyspark/ml/tests.py  |   8 +--
 python/pyspark/ml/tuning.py |   8 +--
 python/pyspark/tests.py |  39 ++
 11 files changed, 161 insertions(+), 120 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/44281ca8/python/pyspark/__init__.py
--
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 9331e74..14c51a3 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -93,13 +93,15 @@ def keyword_only(func):
 """
 A decorator that forces keyword arguments in the wrapped method
 and saves actual input keyword arguments in `_input_kwargs`.
+
+.. note:: Should only be used to wrap a method where first arg is `self`
 """
 @wraps(func)
-def wrapper(*args, **kwargs):
-if len(args) > 1:
+def wrapper(self, *args, **kwargs):
+if len(args) > 0:
 raise TypeError("Method %s forces keyword arguments." % 
func.__name__)
-wrapper._input_kwargs = kwargs
-return func(*args, **kwargs)
+self._input_kwargs = kwargs
+return func(self, **kwargs)
 return wrapper
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/44281ca8/python/pyspark/ml/classification.py
--
diff --git a/python/pyspark/ml/classification.py 
b/python/pyspark/ml/classification.py
index ac40fce..b4fc357 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -124,7 +124,7 @@ class LinearSVC(JavaEstimator, HasFeaturesCol, HasLabelCol, 
HasPredictionCol, Ha
 "org.apache.spark.ml.classification.LinearSVC", self.uid)
 self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, 
fitIntercept=True,
  standardization=True, threshold=0.0, 
aggregationDepth=2)
-kwargs = self.__init__._input_kwargs
+kwargs = self._input_kwargs
 self.setParams(**kwargs)
 
 @keyword_only
@@ -140,7 +140,7 @@ class LinearSVC(JavaEstimator, HasFeaturesCol, HasLabelCol, 
HasPredictionCol, Ha
   aggregationDepth=2):
 Sets params for Linear SVM Classifier.
 """
-kwargs = self.setParams._input_kwargs
+kwargs = self._input_kwargs
 return self._set(**kwargs)
 
 def _create_model(self, java_model):
@@ -266,7 +266,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, 
HasLabelCol, HasPredicti
 self._java_obj = self._new_java_obj(
 "org.apache.spark.ml.classification.LogisticRegression", self.uid)
 self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, t

spark git commit: [SPARK-18939][SQL] Timezone support in partition values.

2017-03-03 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master ba186a841 -> 2a7921a81


[SPARK-18939][SQL] Timezone support in partition values.

## What changes were proposed in this pull request?

This is a follow-up pr of #16308 and #16750.

This pr enables timezone support in partition values.

We should use `timeZone` option introduced at #16750 to parse/format partition 
values of the `TimestampType`.

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT` which will 
be used for partition values, the values written by the default timezone 
option, which is `"GMT"` because the session local timezone is `"GMT"` here, 
are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> val df = Seq((1, new java.sql.Timestamp(145160640L))).toDF("i", "ts")
df: org.apache.spark.sql.DataFrame = [i: int, ts: timestamp]

scala> df.show()
+---+---+
|  i| ts|
+---+---+
|  1|2016-01-01 00:00:00|
+---+---+

scala> df.write.partitionBy("ts").save("/path/to/gmtpartition")
```

```sh
$ ls /path/to/gmtpartition/
_SUCCESSts=2016-01-01 00%3A00%3A00
```

whereas setting the option to `"PST"`, they are:

```scala
scala> df.write.option("timeZone", 
"PST").partitionBy("ts").save("/path/to/pstpartition")
```

```sh
$ ls /path/to/pstpartition/
_SUCCESSts=2015-12-31 16%3A00%3A00
```

We can properly read the partition values if the session local timezone and the 
timezone of the partition values are the same:

```scala
scala> spark.read.load("/path/to/gmtpartition").show()
+---+---+
|  i| ts|
+---+---+
|  1|2016-01-01 00:00:00|
+---+---+
```

And even if the timezones are different, we can properly read the values with 
setting corrent timezone option:

```scala
// wrong result
scala> spark.read.load("/path/to/pstpartition").show()
+---+---+
|  i| ts|
+---+---+
|  1|2015-12-31 16:00:00|
+---+---+

// correct result
scala> spark.read.option("timeZone", "PST").load("/path/to/pstpartition").show()
+---+---+
|  i| ts|
+---+---+
|  1|2016-01-01 00:00:00|
+---+---+
```

## How was this patch tested?

Existing tests and added some tests.

Author: Takuya UESHIN 

Closes #17053 from ueshin/issues/SPARK-18939.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a7921a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a7921a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a7921a8

Branch: refs/heads/master
Commit: 2a7921a813ecd847fd933ffef10edc64684e9df7
Parents: ba186a8
Author: Takuya UESHIN 
Authored: Fri Mar 3 16:35:54 2017 -0800
Committer: Wenchen Fan 
Committed: Fri Mar 3 16:35:54 2017 -0800

--
 .../sql/catalyst/catalog/ExternalCatalog.scala  |  4 +-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  3 +-
 .../sql/catalyst/catalog/SessionCatalog.scala   |  2 +-
 .../spark/sql/catalyst/catalog/interface.scala  | 10 ++--
 .../execution/OptimizeMetadataOnlyQuery.scala   | 10 ++--
 .../datasources/CatalogFileIndex.scala  |  3 +-
 .../datasources/FileFormatWriter.scala  | 18 +++---
 .../PartitioningAwareFileIndex.scala| 16 +++--
 .../datasources/PartitioningUtils.scala | 42 +
 .../execution/datasources/csv/CSVSuite.scala| 15 +++--
 .../ParquetPartitionDiscoverySuite.scala| 62 +++-
 .../sql/sources/PartitionedWriteSuite.scala | 35 +++
 .../spark/sql/hive/HiveExternalCatalog.scala|  9 ++-
 .../sql/hive/execution/HiveTableScanExec.scala  |  3 +-
 .../sql/hive/HiveExternalCatalogSuite.scala |  2 +-
 15 files changed, 175 insertions(+), 59 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2a7921a8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index a3a4ab3..31eded4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -244,11 +244,13 @@ abstract class ExternalCatalog {
* @param db database name
* @param table table name
* @param predicates partition-pruning predicates
+   * @param defaultTimeZoneId default timezone id to parse partition values of 
TimestampType
*/
   def listPartitionsByFilter(
   db: String,
   tabl

spark git commit: [MINOR][DOC] Fix doc for web UI https configuration

2017-03-03 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 9314c0837 -> ba186a841


[MINOR][DOC] Fix doc for web UI https configuration

## What changes were proposed in this pull request?

Doc about enabling web UI https is not correct, "spark.ui.https.enabled" is not 
existed, actually enabling SSL is enough for https.

## How was this patch tested?

N/A

Author: jerryshao 

Closes #17147 from jerryshao/fix-doc-ssl.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba186a84
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba186a84
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba186a84

Branch: refs/heads/master
Commit: ba186a841fcfcd73a1530ca2418cc08bb0df92e1
Parents: 9314c08
Author: jerryshao 
Authored: Fri Mar 3 14:23:31 2017 -0800
Committer: Marcelo Vanzin 
Committed: Fri Mar 3 14:23:31 2017 -0800

--
 docs/security.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ba186a84/docs/security.md
--
diff --git a/docs/security.md b/docs/security.md
index a479676..9eda428 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -12,7 +12,7 @@ Spark currently supports authentication via a shared secret. 
Authentication can
 ## Web UI
 
 The Spark UI can be secured by using [javax servlet 
filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the 
`spark.ui.filters` setting
-and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via the 
`spark.ui.https.enabled` setting.
+and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via [SSL 
settings](security.html#ssl-configuration).
 
 ### Authentication
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19774] StreamExecution should call stop() on sources when a stream fails

2017-03-03 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 accbed7c2 -> da04d45c2


[SPARK-19774] StreamExecution should call stop() on sources when a stream fails

## What changes were proposed in this pull request?

We call stop() on a Structured Streaming Source only when the stream is 
shutdown when a user calls streamingQuery.stop(). We should actually stop all 
sources when the stream fails as well, otherwise we may leak resources, e.g. 
connections to Kafka.

## How was this patch tested?

Unit tests in `StreamingQuerySuite`.

Author: Burak Yavuz 

Closes #17107 from brkyvz/close-source.

(cherry picked from commit 9314c08377cc8da88f4e31d1a9d41376e96a81b3)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da04d45c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da04d45c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da04d45c

Branch: refs/heads/branch-2.1
Commit: da04d45c2c3c9830c57cee90be78cf2093d0
Parents: accbed7
Author: Burak Yavuz 
Authored: Fri Mar 3 10:35:15 2017 -0800
Committer: Shixiong Zhu 
Committed: Fri Mar 3 10:35:24 2017 -0800

--
 .../execution/streaming/StreamExecution.scala   | 14 +++-
 .../sql/streaming/StreamingQuerySuite.scala | 75 +-
 .../sql/streaming/util/MockSourceProvider.scala | 83 
 3 files changed, 169 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/da04d45c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 93face4..dd80a28 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -322,6 +322,7 @@ class StreamExecution(
   initializationLatch.countDown()
 
   try {
+stopSources()
 state.set(TERMINATED)
 currentStatus = status.copy(isTriggerActive = false, isDataAvailable = 
false)
 
@@ -559,6 +560,18 @@ class StreamExecution(
 sparkSession.streams.postListenerEvent(event)
   }
 
+  /** Stops all streaming sources safely. */
+  private def stopSources(): Unit = {
+uniqueSources.foreach { source =>
+  try {
+source.stop()
+  } catch {
+case NonFatal(e) =>
+  logWarning(s"Failed to stop streaming source: $source. Resources may 
have leaked.", e)
+  }
+}
+  }
+
   /**
* Signals to the thread executing micro-batches that it should stop running 
after the next
* batch. This method blocks until the thread stops running.
@@ -571,7 +584,6 @@ class StreamExecution(
   microBatchThread.interrupt()
   microBatchThread.join()
 }
-uniqueSources.foreach(_.stop())
 logInfo(s"Query $prettyIdString was stopped")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/da04d45c/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 1525ad5..a0a2b2b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -20,10 +20,12 @@ package org.apache.spark.sql.streaming
 import java.util.concurrent.CountDownLatch
 
 import org.apache.commons.lang3.RandomStringUtils
+import org.mockito.Mockito._
 import org.scalactic.TolerantNumerics
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.mock.MockitoSugar
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset}
@@ -32,11 +34,11 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.util.BlockingSource
+import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider}
 import org.apache.spark.util.ManualClock
 
 
-class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
+class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging 
with MockitoSugar {
 
   import AwaitTerminationTester._
   import testImplicits._
@@

spark git commit: [SPARK-19774] StreamExecution should call stop() on sources when a stream fails

2017-03-03 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 37a1c0e46 -> 9314c0837


[SPARK-19774] StreamExecution should call stop() on sources when a stream fails

## What changes were proposed in this pull request?

We call stop() on a Structured Streaming Source only when the stream is 
shutdown when a user calls streamingQuery.stop(). We should actually stop all 
sources when the stream fails as well, otherwise we may leak resources, e.g. 
connections to Kafka.

## How was this patch tested?

Unit tests in `StreamingQuerySuite`.

Author: Burak Yavuz 

Closes #17107 from brkyvz/close-source.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9314c083
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9314c083
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9314c083

Branch: refs/heads/master
Commit: 9314c08377cc8da88f4e31d1a9d41376e96a81b3
Parents: 37a1c0e
Author: Burak Yavuz 
Authored: Fri Mar 3 10:35:15 2017 -0800
Committer: Shixiong Zhu 
Committed: Fri Mar 3 10:35:15 2017 -0800

--
 .../execution/streaming/StreamExecution.scala   | 14 +++-
 .../sql/streaming/StreamingQuerySuite.scala | 75 +-
 .../sql/streaming/util/MockSourceProvider.scala | 83 
 3 files changed, 169 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9314c083/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 4bd6431..6e77f35 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -321,6 +321,7 @@ class StreamExecution(
   initializationLatch.countDown()
 
   try {
+stopSources()
 state.set(TERMINATED)
 currentStatus = status.copy(isTriggerActive = false, isDataAvailable = 
false)
 
@@ -558,6 +559,18 @@ class StreamExecution(
 sparkSession.streams.postListenerEvent(event)
   }
 
+  /** Stops all streaming sources safely. */
+  private def stopSources(): Unit = {
+uniqueSources.foreach { source =>
+  try {
+source.stop()
+  } catch {
+case NonFatal(e) =>
+  logWarning(s"Failed to stop streaming source: $source. Resources may 
have leaked.", e)
+  }
+}
+  }
+
   /**
* Signals to the thread executing micro-batches that it should stop running 
after the next
* batch. This method blocks until the thread stops running.
@@ -570,7 +583,6 @@ class StreamExecution(
   microBatchThread.interrupt()
   microBatchThread.join()
 }
-uniqueSources.foreach(_.stop())
 logInfo(s"Query $prettyIdString was stopped")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9314c083/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 1525ad5..a0a2b2b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -20,10 +20,12 @@ package org.apache.spark.sql.streaming
 import java.util.concurrent.CountDownLatch
 
 import org.apache.commons.lang3.RandomStringUtils
+import org.mockito.Mockito._
 import org.scalactic.TolerantNumerics
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.mock.MockitoSugar
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset}
@@ -32,11 +34,11 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.util.BlockingSource
+import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider}
 import org.apache.spark.util.ManualClock
 
 
-class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
+class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging 
with MockitoSugar {
 
   import AwaitTerminationTester._
   import testImplicits._
@@ -481,6 +483,75 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
 }

spark git commit: [SPARK-19710][SQL][TESTS] Fix ordering of rows in query results

2017-03-03 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 98bcc188f -> 37a1c0e46


[SPARK-19710][SQL][TESTS] Fix ordering of rows in query results

## What changes were proposed in this pull request?
Changes to SQLQueryTests to make the order of the results constant.
Where possible ORDER BY has been added to match the existing expected output

## How was this patch tested?
Test runs on x86, zLinux (big endian), ppc (big endian)

Author: Pete Robbins 

Closes #17039 from robbinspg/SPARK-19710.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37a1c0e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37a1c0e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37a1c0e4

Branch: refs/heads/master
Commit: 37a1c0e461737d4a4bbb03d397b651ec5ba00e96
Parents: 98bcc18
Author: Pete Robbins 
Authored: Fri Mar 3 07:53:46 2017 -0800
Committer: Herman van Hovell 
Committed: Fri Mar 3 07:53:46 2017 -0800

--
 .../sql-tests/inputs/subquery/in-subquery/in-joins.sql|  2 +-
 .../inputs/subquery/in-subquery/in-set-operations.sql |  6 +++---
 .../inputs/subquery/in-subquery/not-in-joins.sql  |  2 +-
 .../results/subquery/in-subquery/in-joins.sql.out |  2 +-
 .../subquery/in-subquery/in-set-operations.sql.out| 10 +-
 .../results/subquery/in-subquery/not-in-joins.sql.out |  4 ++--
 6 files changed, 13 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/37a1c0e4/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-joins.sql
--
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-joins.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-joins.sql
index b10c419..880175f 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-joins.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-joins.sql
@@ -79,7 +79,7 @@ GROUP BY  t1a,
   t3a,
   t3b,
   t3c
-ORDER BY  t1a DESC;
+ORDER BY  t1a DESC, t3b DESC;
 
 -- TC 01.03
 SELECT Count(DISTINCT(t1a))

http://git-wip-us.apache.org/repos/asf/spark/blob/37a1c0e4/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql
--
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql
index 6b9e8bf..5c371d2 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-set-operations.sql
@@ -287,7 +287,7 @@ WHERE  t1a IN (SELECT t3a
WHERE  t1b > 6) AS t5)
 GROUP BY t1a, t1b, t1c, t1d
 HAVING t1c IS NOT NULL AND t1b IS NOT NULL
-ORDER BY t1c DESC;
+ORDER BY t1c DESC, t1a DESC;
 
 -- TC 01.08
 SELECT t1a,
@@ -351,7 +351,7 @@ WHERE  t1b IN
 FROM   t1
 WHERE  t1b > 6) AS t4
   WHERE  t2b = t1b)
-ORDER BY t1c DESC NULLS last;
+ORDER BY t1c DESC NULLS last, t1a DESC;
 
 -- TC 01.11
 SELECT *
@@ -468,5 +468,5 @@ HAVING   t1b NOT IN
 EXCEPT
 SELECT t3b
 FROM   t3)
-ORDER BY t1c DESC NULLS LAST;
+ORDER BY t1c DESC NULLS LAST, t1i;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/37a1c0e4/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-joins.sql
--
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-joins.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-joins.sql
index 505366b..e09b91f 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-joins.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/not-in-joins.sql
@@ -85,7 +85,7 @@ AND t1b != t3b
 AND t1d = t2d
 GROUP BYt1a, t1b, t1c, t3a, t3b, t3c
 HAVING  count(distinct(t3a)) >= 1
-ORDER BYt1a;
+ORDER BYt1a, t3b;
 
 -- TC 01.03
 SELECT t1a,

http://git-wip-us.apache.org/repos/asf/spark/blob/37a1c0e4/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-joins.sql.out
--
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-joins.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-joins.sql.out
index 7258bcf..ab6a11a 100644
--- 
a/sql/core/src/test/resources/sql-t

spark git commit: [SPARK-19758][SQL] Resolving timezone aware expressions with time zone when resolving inline table

2017-03-03 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 776fac398 -> 98bcc188f


[SPARK-19758][SQL] Resolving timezone aware expressions with time zone when 
resolving inline table

## What changes were proposed in this pull request?

When we resolve inline tables in analyzer, we will evaluate the expressions of 
inline tables.

When it evaluates a `TimeZoneAwareExpression` expression, an error will happen 
because the `TimeZoneAwareExpression` is not associated with timezone yet.

So we need to resolve these `TimeZoneAwareExpression`s with time zone when 
resolving inline tables.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #17114 from viirya/resolve-timeawareexpr-inline-table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98bcc188
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98bcc188
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98bcc188

Branch: refs/heads/master
Commit: 98bcc188f98e44c1675d8b3a28f44f4f900abc43
Parents: 776fac3
Author: Liang-Chi Hsieh 
Authored: Fri Mar 3 07:14:37 2017 -0800
Committer: Herman van Hovell 
Committed: Fri Mar 3 07:14:37 2017 -0800

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  2 +-
 .../catalyst/analysis/ResolveInlineTables.scala | 16 +---
 .../analysis/ResolveInlineTablesSuite.scala | 40 
 .../resources/sql-tests/inputs/inline-table.sql |  3 ++
 .../sql-tests/results/inline-table.sql.out  | 10 -
 5 files changed, 48 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/98bcc188/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c477cb4..6d569b6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -146,7 +146,7 @@ class Analyzer(
   GlobalAggregates ::
   ResolveAggregateFunctions ::
   TimeWindowing ::
-  ResolveInlineTables ::
+  ResolveInlineTables(conf) ::
   TypeCoercion.typeCoercionRules ++
   extendedResolutionRules : _*),
 Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),

http://git-wip-us.apache.org/repos/asf/spark/blob/98bcc188/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
index 7323197..d5b3ea8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
@@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Cast
+import org.apache.spark.sql.catalyst.{CatalystConf, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, 
TimeZoneAwareExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
 /**
  * An analyzer rule that replaces [[UnresolvedInlineTable]] with 
[[LocalRelation]].
  */
-object ResolveInlineTables extends Rule[LogicalPlan] {
+case class ResolveInlineTables(conf: CatalystConf) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
 case table: UnresolvedInlineTable if table.expressionsResolved =>
   validateInputDimension(table)
@@ -95,11 +95,15 @@ object ResolveInlineTables extends Rule[LogicalPlan] {
   InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
 val targetType = fields(ci).dataType
 try {
-  if (e.dataType.sameType(targetType)) {
-e.eval()
+  val castedExpr = if (e.dataType.sameType(targetType)) {
+e
   } else {
-Cast(e, targetType).eval()
+Cast(e, targetType)
   }
+  castedExpr.transform {
+case e: TimeZoneAwareExpression if e.ti

spark-website git commit: Update commiter list

2017-03-03 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 470b7ed51 -> c1b9ad3cb


Update commiter list


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/c1b9ad3c
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/c1b9ad3c
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/c1b9ad3c

Branch: refs/heads/asf-site
Commit: c1b9ad3cbe413b10f872c6a3363f1028c31b1a16
Parents: 470b7ed
Author: Holden Karau 
Authored: Wed Mar 1 22:15:10 2017 -0800
Committer: Sean Owen 
Committed: Fri Mar 3 12:31:03 2017 +0100

--
 committers.md|  4 
 site/committers.html | 15 ++-
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/c1b9ad3c/committers.md
--
diff --git a/committers.md b/committers.md
index 03defa6..a97bb72 100644
--- a/committers.md
+++ b/committers.md
@@ -28,6 +28,7 @@ navigation:
 |Herman van Hovell|QuestTec B.V.|
 |Yin Huai|Databricks|
 |Shane Huang|Intel|
+|Holden Karau|IBM|
 |Andy Konwinski|Databricks|
 |Ryan LeCompte|Quantifind|
 |Haoyuan Li|Alluxio, UC Berkeley|
@@ -50,11 +51,13 @@ navigation:
 |Prashant Sharma|IBM|
 |Ram Sriharsha|Databricks|
 |DB Tsai|Netflix|
+|Takuya Ueshin||
 |Marcelo Vanzin|Cloudera|
 |Shivaram Venkataraman|UC Berkeley|
 |Patrick Wendell|Databricks|
 |Andrew Xia|Alibaba|
 |Reynold Xin|Databricks|
+|Burak Yavuz|Databricks|
 |Matei Zaharia|Databricks, Stanford|
 |Shixiong Zhu|Databricks|
 
@@ -117,6 +120,7 @@ You can verify the result is one change with `git log`. 
Then resume the script i
 
 Also, please remember to set Assignee on JIRAs where applicable when they are 
resolved. The script 
 can't do this automatically.
+Once a PR is merged please leave a comment on the PR stating which branch(es) 
it has been merged with.
 
 

spark git commit: [SPARK-19801][BUILD] Remove JDK7 from Travis CI

2017-03-03 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 0bac3e4cd -> 776fac398


[SPARK-19801][BUILD] Remove JDK7 from Travis CI

## What changes were proposed in this pull request?

Since Spark 2.1.0, Travis CI was supported by SPARK-15207 for automated PR 
verification (JDK7/JDK8 maven compilation and Java Linter) and contributors can 
see the additional result via their Travis CI dashboard (or PC).

This PR aims to make `.travis.yml` up-to-date by removing JDK7 which was 
removed via SPARK-19550.

## How was this patch tested?

See the result via Travis CI.

- https://travis-ci.org/dongjoon-hyun/spark/builds/207111713

Author: Dongjoon Hyun 

Closes #17143 from dongjoon-hyun/SPARK-19801.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/776fac39
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/776fac39
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/776fac39

Branch: refs/heads/master
Commit: 776fac3988271a1e4128cb31f21e5f7f3b7bcf0e
Parents: 0bac3e4
Author: Dongjoon Hyun 
Authored: Fri Mar 3 12:00:54 2017 +0100
Committer: Sean Owen 
Committed: Fri Mar 3 12:00:54 2017 +0100

--
 .travis.yml | 1 -
 1 file changed, 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/776fac39/.travis.yml
--
diff --git a/.travis.yml b/.travis.yml
index d94872d..d7e9f8c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -28,7 +28,6 @@ dist: trusty
 # 2. Choose language and target JDKs for parallel builds.
 language: java
 jdk:
-  - oraclejdk7
   - oraclejdk8
 
 # 3. Setup cache directory for SBT and Maven.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19797][DOC] ML pipeline document correction

2017-03-03 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master fa50143cd -> 0bac3e4cd


[SPARK-19797][DOC] ML pipeline document correction

## What changes were proposed in this pull request?
Description about pipeline in this paragraph is incorrect 
https://spark.apache.org/docs/latest/ml-pipeline.html#how-it-works

> If the Pipeline had more **stages**, it would call the 
> LogisticRegressionModel’s transform() method on the DataFrame before 
> passing the DataFrame to the next stage.

Reason: Transformer could also be a stage. But only another Estimator will 
invoke an transform call and pass the data to next stage. The description in 
the document misleads ML pipeline users.

## How was this patch tested?
This is a tiny modification of **docs/ml-pipelines.md**. I jekyll build the 
modification and check the compiled document.

Author: Zhe Sun 

Closes #17137 from ymwdalex/SPARK-19797-ML-pipeline-document-correction.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bac3e4c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bac3e4c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bac3e4c

Branch: refs/heads/master
Commit: 0bac3e4cde75678beac02e67b8873fe779e9ad34
Parents: fa50143
Author: Zhe Sun 
Authored: Fri Mar 3 11:55:57 2017 +0100
Committer: Sean Owen 
Committed: Fri Mar 3 11:55:57 2017 +0100

--
 docs/ml-pipeline.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0bac3e4c/docs/ml-pipeline.md
--
diff --git a/docs/ml-pipeline.md b/docs/ml-pipeline.md
index 7cbb146..aa92c0a 100644
--- a/docs/ml-pipeline.md
+++ b/docs/ml-pipeline.md
@@ -132,7 +132,7 @@ The `Pipeline.fit()` method is called on the original 
`DataFrame`, which has raw
 The `Tokenizer.transform()` method splits the raw text documents into words, 
adding a new column with words to the `DataFrame`.
 The `HashingTF.transform()` method converts the words column into feature 
vectors, adding a new column with those vectors to the `DataFrame`.
 Now, since `LogisticRegression` is an `Estimator`, the `Pipeline` first calls 
`LogisticRegression.fit()` to produce a `LogisticRegressionModel`.
-If the `Pipeline` had more stages, it would call the 
`LogisticRegressionModel`'s `transform()`
+If the `Pipeline` had more `Estimator`s, it would call the 
`LogisticRegressionModel`'s `transform()`
 method on the `DataFrame` before passing the `DataFrame` to the next stage.
 
 A `Pipeline` is an `Estimator`.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19797][DOC] ML pipeline document correction

2017-03-03 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 1237aaea2 -> accbed7c2


[SPARK-19797][DOC] ML pipeline document correction

## What changes were proposed in this pull request?
Description about pipeline in this paragraph is incorrect 
https://spark.apache.org/docs/latest/ml-pipeline.html#how-it-works

> If the Pipeline had more **stages**, it would call the 
> LogisticRegressionModel’s transform() method on the DataFrame before 
> passing the DataFrame to the next stage.

Reason: Transformer could also be a stage. But only another Estimator will 
invoke an transform call and pass the data to next stage. The description in 
the document misleads ML pipeline users.

## How was this patch tested?
This is a tiny modification of **docs/ml-pipelines.md**. I jekyll build the 
modification and check the compiled document.

Author: Zhe Sun 

Closes #17137 from ymwdalex/SPARK-19797-ML-pipeline-document-correction.

(cherry picked from commit 0bac3e4cde75678beac02e67b8873fe779e9ad34)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/accbed7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/accbed7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/accbed7c

Branch: refs/heads/branch-2.1
Commit: accbed7c2cfbe46fa6f55e97241b617c6ad4431f
Parents: 1237aae
Author: Zhe Sun 
Authored: Fri Mar 3 11:55:57 2017 +0100
Committer: Sean Owen 
Committed: Fri Mar 3 11:56:07 2017 +0100

--
 docs/ml-pipeline.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/accbed7c/docs/ml-pipeline.md
--
diff --git a/docs/ml-pipeline.md b/docs/ml-pipeline.md
index 7cbb146..aa92c0a 100644
--- a/docs/ml-pipeline.md
+++ b/docs/ml-pipeline.md
@@ -132,7 +132,7 @@ The `Pipeline.fit()` method is called on the original 
`DataFrame`, which has raw
 The `Tokenizer.transform()` method splits the raw text documents into words, 
adding a new column with words to the `DataFrame`.
 The `HashingTF.transform()` method converts the words column into feature 
vectors, adding a new column with those vectors to the `DataFrame`.
 Now, since `LogisticRegression` is an `Estimator`, the `Pipeline` first calls 
`LogisticRegression.fit()` to produce a `LogisticRegressionModel`.
-If the `Pipeline` had more stages, it would call the 
`LogisticRegressionModel`'s `transform()`
+If the `Pipeline` had more `Estimator`s, it would call the 
`LogisticRegressionModel`'s `transform()`
 method on the `DataFrame` before passing the `DataFrame` to the next stage.
 
 A `Pipeline` is an `Estimator`.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19739][CORE] propagate S3 session token to cluser

2017-03-03 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master d556b3170 -> fa50143cd


[SPARK-19739][CORE] propagate S3 session token to cluser

## What changes were proposed in this pull request?

propagate S3 session token to cluser

## How was this patch tested?

existing ut

Author: uncleGen 

Closes #17080 from uncleGen/SPARK-19739.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa50143c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa50143c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa50143c

Branch: refs/heads/master
Commit: fa50143cd33586f4658892f434c9f6c23346e1bf
Parents: d556b31
Author: uncleGen 
Authored: Fri Mar 3 11:49:00 2017 +0100
Committer: Sean Owen 
Committed: Fri Mar 3 11:49:00 2017 +0100

--
 .../org/apache/spark/deploy/SparkHadoopUtil.scala  | 13 -
 1 file changed, 8 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fa50143c/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 941e2d1..f475ce8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -82,17 +82,20 @@ class SparkHadoopUtil extends Logging {
 // the behavior of the old implementation of this code, for backwards 
compatibility.
 if (conf != null) {
   // Explicitly check for S3 environment variables
-  if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
-  System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
-val keyId = System.getenv("AWS_ACCESS_KEY_ID")
-val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
-
+  val keyId = System.getenv("AWS_ACCESS_KEY_ID")
+  val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
+  if (keyId != null && accessKey != null) {
 hadoopConf.set("fs.s3.awsAccessKeyId", keyId)
 hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)
 hadoopConf.set("fs.s3a.access.key", keyId)
 hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)
 hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)
 hadoopConf.set("fs.s3a.secret.key", accessKey)
+
+val sessionToken = System.getenv("AWS_SESSION_TOKEN")
+if (sessionToken != null) {
+  hadoopConf.set("fs.s3a.session.token", sessionToken)
+}
   }
   // Copy any "spark.hadoop.foo=bar" system properties into conf as 
"foo=bar"
   conf.getAll.foreach { case (key, value) =>


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18699][SQL][FOLLOWUP] Add explanation in CSV parser and minor cleanup

2017-03-03 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 982f3223b -> d556b3170


[SPARK-18699][SQL][FOLLOWUP] Add explanation in CSV parser and minor cleanup

## What changes were proposed in this pull request?

This PR suggests adding some comments in `UnivocityParser` logics to explain 
what happens. Also, it proposes, IMHO, a little bit cleaner (at least easy for 
me to explain).

## How was this patch tested?

Unit tests in `CSVSuite`.

Author: hyukjinkwon 

Closes #17142 from HyukjinKwon/SPARK-18699.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d556b317
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d556b317
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d556b317

Branch: refs/heads/master
Commit: d556b317038455dc25e193f3add723fccdc54958
Parents: 982f322
Author: hyukjinkwon 
Authored: Fri Mar 3 00:50:58 2017 -0800
Committer: Wenchen Fan 
Committed: Fri Mar 3 00:50:58 2017 -0800

--
 .../datasources/csv/UnivocityParser.scala   | 97 ++--
 1 file changed, 68 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d556b317/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 804031a..3b3b87e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -54,39 +54,77 @@ private[csv] class UnivocityParser(
 
   private val dataSchema = StructType(schema.filter(_.name != 
options.columnNameOfCorruptRecord))
 
-  private val valueConverters =
-dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, 
options)).toArray
-
   private val tokenizer = new CsvParser(options.asParserSettings)
 
   private var numMalformedRecords = 0
 
   private val row = new GenericInternalRow(requiredSchema.length)
 
-  // This gets the raw input that is parsed lately.
+  // In `PERMISSIVE` parse mode, we should be able to put the raw malformed 
row into the field
+  // specified in `columnNameOfCorruptRecord`. The raw input is retrieved by 
this method.
   private def getCurrentInput(): String = 
tokenizer.getContext.currentParsedContent().stripLineEnd
 
-  // This parser loads an `indexArr._1`-th position value in input tokens,
-  // then put the value in `row(indexArr._2)`.
-  private val indexArr: Array[(Int, Int)] = {
-val fields = if (options.dropMalformed) {
-  // If `dropMalformed` is enabled, then it needs to parse all the values
-  // so that we can decide which row is malformed.
-  requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
-} else {
-  requiredSchema
-}
-// TODO: Revisit this; we need to clean up code here for readability.
-// See an URL below for related discussions:
-// https://github.com/apache/spark/pull/16928#discussion_r102636720
-val fieldsWithIndexes = fields.zipWithIndex
-corruptFieldIndex.map { case corrFieldIndex =>
-  fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex }
-}.getOrElse {
-  fieldsWithIndexes
-}.map { case (f, i) =>
-  (dataSchema.indexOf(f), i)
-}.toArray
+  // This parser loads an `tokenIndexArr`-th position value in input tokens,
+  // then put the value in `row(rowIndexArr)`.
+  //
+  // For example, let's say there is CSV data as below:
+  //
+  //   a,b,c
+  //   1,2,A
+  //
+  // Also, let's say `columnNameOfCorruptRecord` is set to "_unparsed", 
`header` is `true`
+  // by user and the user selects "c", "b", "_unparsed" and "a" fields. In 
this case, we need
+  // to map those values below:
+  //
+  //   required schema - ["c", "b", "_unparsed", "a"]
+  //   CSV data schema - ["a", "b", "c"]
+  //   required CSV data schema - ["c", "b", "a"]
+  //
+  // with the input tokens,
+  //
+  //   input tokens - [1, 2, "A"]
+  //
+  // Each input token is placed in each output row's position by mapping 
these. In this case,
+  //
+  //   output row - ["A", 2, null, 1]
+  //
+  // In more details,
+  // - `valueConverters`, input tokens - CSV data schema
+  //   `valueConverters` keeps the positions of input token indices (by its 
index) to each
+  //   value's converter (by its value) in an order of CSV data schema. In 
this case,
+  //   [string->int, string->int, string->string].
+  //
+  // - `tokenIndexArr`, input tokens - required CSV data schema
+  //   `tokenIndexArr` keeps the positions of input token indices (by its 
index) to reordered