spark git commit: [SPARK-18120][SPARK-19557][SQL] Call QueryExecutionListener callback methods for DataFrameWriter methods

2017-02-16 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 21fde57f1 -> 54d23599d


[SPARK-18120][SPARK-19557][SQL] Call QueryExecutionListener callback methods 
for DataFrameWriter methods

## What changes were proposed in this pull request?

We only notify `QueryExecutionListener` for several `Dataset` operations, e.g. 
collect, take, etc. We should also do the notification for `DataFrameWriter` 
operations.

## How was this patch tested?

new regression test

close https://github.com/apache/spark/pull/16664

Author: Wenchen Fan 

Closes #16962 from cloud-fan/insert.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54d23599
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54d23599
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54d23599

Branch: refs/heads/master
Commit: 54d23599df7c28a7685416ced6ad8fcde047e534
Parents: 21fde57
Author: Wenchen Fan 
Authored: Thu Feb 16 21:09:14 2017 -0800
Committer: Wenchen Fan 
Committed: Thu Feb 16 21:09:14 2017 -0800

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 49 -
 .../datasources/SaveIntoDataSourceCommand.scala | 52 ++
 .../spark/sql/util/DataFrameCallbackSuite.scala | 57 +++-
 3 files changed, 142 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/54d23599/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index cdae8ea..3939251 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,9 +25,9 @@ import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, 
CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan}
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
LogicalRelation, SaveIntoDataSourceCommand}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
 
@@ -211,13 +211,15 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 }
 
 assertNotBucketed("save")
-val dataSource = DataSource(
-  df.sparkSession,
-  className = source,
-  partitionColumns = partitioningColumns.getOrElse(Nil),
-  options = extraOptions.toMap)
 
-dataSource.write(mode, df)
+runCommand(df.sparkSession, "save") {
+  SaveIntoDataSourceCommand(
+query = df.logicalPlan,
+provider = source,
+partitionColumns = partitioningColumns.getOrElse(Nil),
+options = extraOptions.toMap,
+mode = mode)
+}
   }
 
   /**
@@ -260,13 +262,14 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
   )
 }
 
-df.sparkSession.sessionState.executePlan(
+runCommand(df.sparkSession, "insertInto") {
   InsertIntoTable(
 table = UnresolvedRelation(tableIdent),
 partition = Map.empty[String, Option[String]],
 query = df.logicalPlan,
 overwrite = mode == SaveMode.Overwrite,
-ifNotExists = false)).toRdd
+ifNotExists = false)
+}
   }
 
   private def getBucketSpec: Option[BucketSpec] = {
@@ -389,10 +392,9 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
   schema = new StructType,
   provider = Some(source),
   partitionColumnNames = partitioningColumns.getOrElse(Nil),
-  bucketSpec = getBucketSpec
-)
-df.sparkSession.sessionState.executePlan(
-  CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
+  bucketSpec = getBucketSpec)
+
+runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, 
Some(df.logicalPlan)))
   }
 
   /**
@@ -573,6 +575,25 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 format("csv").save(path)
   }
 
+  /**
+   * Wrap a DataFrameWriter action to track the QueryExecution and time cost, 
then report to the
+   * user-registered callback functions.
+   */
+  private def runCommand(session: SparkSession, name: String)(command: 
LogicalPlan): Unit = {
+val qe = 

spark git commit: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-16 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master dcc2d540a -> 21fde57f1


[SPARK-18352][SQL] Support parsing multiline json files

## What changes were proposed in this pull request?

If a new option `wholeFile` is set to `true` the JSON reader will parse each 
file (instead of a single line) as a value. This is done with Jackson streaming 
and it should be capable of parsing very large documents, assuming the row will 
fit in memory.

Because the file is not buffered in memory the corrupt record handling is also 
slightly different when `wholeFile` is enabled: the corrupt column will contain 
the filename instead of the literal JSON if there is a parsing failure. It 
would be easy to extend this to add the parser location (line, column and byte 
offsets) to the output if desired.

These changes have allowed types other than `String` to be parsed. Support for 
`UTF8String` and `Text` have been added (alongside `String` and `InputFormat`) 
and no longer require a conversion to `String` just for parsing.

I've also included a few other changes that generate slightly better bytecode 
and (imo) make it more obvious when and where boxing is occurring in the 
parser. These are included as separate commits, let me know if they should be 
flattened into this PR or moved to a new one.

## How was this patch tested?

New and existing unit tests. No performance or load tests have been run.

Author: Nathan Howell 

Closes #16386 from NathanHowell/SPARK-18352.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21fde57f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21fde57f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21fde57f

Branch: refs/heads/master
Commit: 21fde57f15db974b710e7b00e72c744da7c1ac3c
Parents: dcc2d54
Author: Nathan Howell 
Authored: Thu Feb 16 20:51:19 2017 -0800
Committer: Wenchen Fan 
Committed: Thu Feb 16 20:51:19 2017 -0800

--
 .../apache/spark/unsafe/types/UTF8String.java   |  20 +-
 .../apache/spark/input/PortableDataStream.scala |   7 +
 python/pyspark/sql/readwriter.py|  13 +-
 python/pyspark/sql/streaming.py |  14 +-
 python/pyspark/sql/tests.py |   7 +
 python/test_support/sql/people_array.json   |  13 +
 .../catalyst/expressions/jsonExpressions.scala  |  10 +-
 .../sql/catalyst/json/CreateJacksonParser.scala |  46 +++
 .../spark/sql/catalyst/json/JSONOptions.scala   |  20 +-
 .../spark/sql/catalyst/json/JacksonParser.scala | 287 +++
 .../org/apache/spark/sql/DataFrameReader.scala  |  32 ++-
 .../execution/datasources/CodecStreams.scala|  17 +-
 .../datasources/json/JsonDataSource.scala   | 216 ++
 .../datasources/json/JsonFileFormat.scala   |  96 +++
 .../datasources/json/JsonInferSchema.scala  |  13 +-
 .../spark/sql/streaming/DataStreamReader.scala  |   8 +-
 .../execution/datasources/json/JsonSuite.scala  | 152 +-
 17 files changed, 740 insertions(+), 231 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/21fde57f/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 3800d53..87b9e8e 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -147,7 +147,13 @@ public final class UTF8String implements 
Comparable, Externalizable,
 buffer.position(pos + numBytes);
   }
 
-  public void writeTo(OutputStream out) throws IOException {
+  /**
+   * Returns a {@link ByteBuffer} wrapping the base object if it is a byte 
array
+   * or a copy of the data if the base object is not a byte array.
+   *
+   * Unlike getBytes this will not create a copy the array if this is a slice.
+   */
+  public @Nonnull ByteBuffer getByteBuffer() {
 if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
   final byte[] bytes = (byte[]) base;
 
@@ -160,12 +166,20 @@ public final class UTF8String implements 
Comparable, Externalizable,
 throw new ArrayIndexOutOfBoundsException();
   }
 
-  out.write(bytes, (int) arrayOffset, numBytes);
+  return ByteBuffer.wrap(bytes, (int) arrayOffset, numBytes);
 } else {
-  out.write(getBytes());
+  return ByteBuffer.wrap(getBytes());
 }
   }
 
+  public void writeTo(OutputStream out) throws IOException {
+final ByteBuffer bb = this.getByteBuffer();
+assert(bb.hasArray());
+
+// similar to Utils.writeByteBuffer but without the 

spark git commit: [SPARK-19436][SQL] Add missing tests for approxQuantile

2017-02-16 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 3b4376876 -> 54a30c8a7


[SPARK-19436][SQL] Add missing tests for approxQuantile

## What changes were proposed in this pull request?
1, check the behavior with illegal `quantiles` and `relativeError`
2, add tests for `relativeError` > 1
3, update tests for `null` data
4, update some docs for javadoc8

## How was this patch tested?
local test in spark-shell

Author: Zheng RuiFeng 
Author: Ruifeng Zheng 

Closes #16776 from zhengruifeng/fix_approxQuantile.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54a30c8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54a30c8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54a30c8a

Branch: refs/heads/master
Commit: 54a30c8a70c86294059e6eb6b30cb81978b47b54
Parents: 3b43768
Author: Zheng RuiFeng 
Authored: Thu Feb 16 09:42:13 2017 -0800
Committer: Xiao Li 
Committed: Thu Feb 16 09:42:13 2017 -0800

--
 .../spark/sql/DataFrameStatFunctions.scala  | 30 
 .../sql/execution/stat/StatFunctions.scala  |  4 +-
 .../apache/spark/sql/DataFrameStatSuite.scala   | 77 +---
 3 files changed, 88 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/54a30c8a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
index 2b782fd..bdcdf0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
@@ -58,12 +58,13 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
* @param probabilities a list of quantile probabilities
*   Each number must belong to [0, 1].
*   For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
-   * @param relativeError The relative target precision to achieve (greater or 
equal to 0).
+   * @param relativeError The relative target precision to achieve (greater 
than or equal to 0).
*   If set to zero, the exact quantiles are computed, which could be very 
expensive.
*   Note that values greater than 1 are accepted but give the same result 
as 1.
* @return the approximate quantiles at the given probabilities
*
-   * @note NaN values will be removed from the numerical column before 
calculation
+   * @note null and NaN values will be removed from the numerical column 
before calculation. If
+   *   the dataframe is empty or all rows contain null or NaN, null is 
returned.
*
* @since 2.0.0
*/
@@ -71,27 +72,25 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
   col: String,
   probabilities: Array[Double],
   relativeError: Double): Array[Double] = {
-StatFunctions.multipleApproxQuantiles(df.select(col).na.drop(),
-  Seq(col), probabilities, relativeError).head.toArray
+val res = approxQuantile(Array(col), probabilities, relativeError)
+Option(res).map(_.head).orNull
   }
 
   /**
* Calculates the approximate quantiles of numerical columns of a DataFrame.
-   * @see [[DataFrameStatsFunctions.approxQuantile(col:Str* approxQuantile]] 
for
-   * detailed description.
+   * @see `approxQuantile(col:Str* approxQuantile)` for detailed description.
*
-   * Note that rows containing any null or NaN values values will be removed 
before
-   * calculation.
* @param cols the names of the numerical columns
* @param probabilities a list of quantile probabilities
*   Each number must belong to [0, 1].
*   For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
-   * @param relativeError The relative target precision to achieve (>= 0).
+   * @param relativeError The relative target precision to achieve (greater 
than or equal to 0).
*   If set to zero, the exact quantiles are computed, which could be very 
expensive.
*   Note that values greater than 1 are accepted but give the same result 
as 1.
* @return the approximate quantiles at the given probabilities of each 
column
*
-   * @note Rows containing any NaN values will be removed before calculation
+   * @note Rows containing any null or NaN values will be removed before 
calculation. If
+   *   the dataframe is empty or all rows contain null or NaN, null is 
returned.
*
* @since 2.2.0
*/
@@ -99,8 +98,13 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
   cols: Array[String],
   probabilities: Array[Double],
   relativeError: 

[2/2] spark-website git commit: Remove references to Java 7 and Hadoop support before 2.5

2017-02-16 Thread srowen
Remove references to Java 7 and Hadoop support before 2.5


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/ae58782b
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/ae58782b
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/ae58782b

Branch: refs/heads/asf-site
Commit: ae58782baafff924a904e01197400104d85471f1
Parents: fe49ab1
Author: Sean Owen 
Authored: Tue Feb 14 21:33:56 2017 +
Committer: Sean Owen 
Committed: Tue Feb 14 21:35:47 2017 +

--
 developer-tools.md  |  7 ++--
 js/downloads.js | 18 +++--
 release-process.md  |  6 +--
 site/committers.html| 40 +++-
 site/community.html |  8 ++--
 site/contributing.html  | 20 +-
 site/developer-tools.html   | 21 +-
 site/documentation.html |  5 ++-
 site/js/downloads.js| 18 +++--
 site/news/index.html| 10 ++---
 site/news/spark-0-9-1-released.html |  2 +-
 site/news/spark-0-9-2-released.html |  2 +-
 site/news/spark-1-1-0-released.html |  2 +-
 site/news/spark-1-2-2-released.html |  2 +-
 site/news/spark-and-shark-in-the-news.html  |  2 +-
 .../spark-summit-east-2015-videos-posted.html   |  2 +-
 site/release-process.html   |  8 ++--
 site/releases/spark-release-0-8-0.html  |  4 +-
 site/releases/spark-release-0-9-1.html  | 20 +-
 site/releases/spark-release-1-0-1.html  |  8 ++--
 site/releases/spark-release-1-0-2.html  |  2 +-
 site/releases/spark-release-1-1-0.html  |  6 +--
 site/releases/spark-release-1-2-0.html  |  2 +-
 site/releases/spark-release-1-3-0.html  |  6 +--
 site/releases/spark-release-1-3-1.html  |  6 +--
 site/releases/spark-release-1-4-0.html  |  4 +-
 site/releases/spark-release-1-5-0.html  | 30 +++
 site/releases/spark-release-1-6-0.html  | 20 +-
 site/releases/spark-release-2-0-0.html  | 36 +-
 site/releases/spark-release-2-1-0.html  | 24 ++--
 site/sitemap.xml| 12 +++---
 site/third-party-projects.html  |  2 +-
 site/versioning-policy.html |  2 +-
 33 files changed, 174 insertions(+), 183 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/ae58782b/developer-tools.md
--
diff --git a/developer-tools.md b/developer-tools.md
index 77d225f..e8853b8 100644
--- a/developer-tools.md
+++ b/developer-tools.md
@@ -115,8 +115,7 @@ When running tests for a pull request on Jenkins, you can 
add special phrases to
 your pull request to change testing behavior. This includes:
 
 - `[test-maven]` - signals to test the pull request using maven
-- `[test-hadoop1.0]` - signals to test using Spark's Hadoop 1.0 profile (other 
options include 
-Hadoop 2.0, 2.2, and 2.3)
+- `[test-hadoop2.7]` - signals to test using Spark's Hadoop 2.7 profile
 
 Organizing Imports
 
@@ -143,8 +142,8 @@ automatically update the IntelliJ project.
 - As documented in http://spark.apache.org/docs/latest/building-spark.html;>Building 
Spark, 
 some build configurations require specific profiles to be 
 enabled. The same profiles that are enabled with `-P[profile name]` above may 
be enabled on the 
-Profiles screen in the Import wizard. For example, if developing for Hadoop 
2.4 with YARN support, 
-enable profiles yarn and hadoop-2.4. These selections can be changed later by 
accessing the 
+Profiles screen in the Import wizard. For example, if developing for Hadoop 
2.7 with YARN support, 
+enable profiles `yarn` and `hadoop-2.7`. These selections can be changed later 
by accessing the 
 "Maven Projects" tool window from the View menu, and expanding the Profiles 
section.
 
 Other tips:

http://git-wip-us.apache.org/repos/asf/spark-website/blob/ae58782b/js/downloads.js
--
diff --git a/js/downloads.js b/js/downloads.js
index 36a04c7..04a6b13 100644
--- a/js/downloads.js
+++ b/js/downloads.js
@@ -16,26 +16,18 @@ var hadoop2p3 = {pretty: "Pre-built for Hadoop 2.3", tag: 
"hadoop2.3"};
 var hadoop2p4 = {pretty: "Pre-built for Hadoop 2.4", tag: "hadoop2.4"};
 var hadoop2p6 = {pretty: "Pre-built for Hadoop 2.6", tag: "hadoop2.6"};
 var hadoop2p7 = {pretty: "Pre-built for Hadoop 2.7 and later", tag: 
"hadoop2.7"};
-//var mapr3 = {pretty: 

[1/2] spark-website git commit: Remove references to Java 7 and Hadoop support before 2.5

2017-02-16 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/asf-site fe49ab1ef -> ae58782ba


http://git-wip-us.apache.org/repos/asf/spark-website/blob/ae58782b/site/releases/spark-release-2-1-0.html
--
diff --git a/site/releases/spark-release-2-1-0.html 
b/site/releases/spark-release-2-1-0.html
index 26e65a2..7c7df63 100644
--- a/site/releases/spark-release-2-1-0.html
+++ b/site/releases/spark-release-2-1-0.html
@@ -200,15 +200,15 @@
 To download Apache Spark 2.1.0, visit the downloads page. You can consult JIRA for the https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315420version=12335644;>detailed
 changes. We have curated a list of high level changes here, grouped by 
major modules.
 
 
-  Core and Spark SQL
-  Structured Streaming
-  MLlib
-  SparkR
-  GraphX
-  Deprecations
-  Changes of behavior
-  Known Issues
-  Credits
+  Core 
and Spark SQL
+  Structured Streaming
+  MLlib
+  SparkR
+  GraphX
+  Deprecations
+  Changes of behavior
+  Known 
Issues
+  Credits
 
 
 Core and Spark SQL
@@ -216,7 +216,7 @@
 
   API updates
 
-  SPARK-17864: Data type APIs are stable APIs. 
+  SPARK-17864: Data type APIs are stable APIs.
   SPARK-18351: from_json and to_json for parsing JSON for string 
columns
   SPARK-16700: When creating a DataFrame in PySpark, Python 
dictionaries can be used as values of a StructType.
 
@@ -318,9 +318,9 @@
   SPARK-18377: spark.sql.warehouse.dir is a static configuration now. 
Users need to set it before the start of the first SparkSession and its value 
is shared by sessions in the same application.
   SPARK-14393: Values generated by non-deterministic functions will 
not change after coalesce or union.
   SPARK-18076: Fix default Locale used in DateFormat, NumberFormat to 
Locale.US
-  SPARK-16216: CSV and JSON data sources write timestamp and date 
values in https://www.w3.org/TR/NOTE-datetime;>ISO 8601 formatted 
string. Two options, timestampFormat and dateFormat, are added to these two 
data sources to let users control the format of timestamp and date value in 
string representation, respectively. Please refer to the API doc of DataFrameReader
 and DataFrameWriter
 for more details about these two configurations. 
+  SPARK-16216: CSV and JSON data sources write timestamp and date 
values in https://www.w3.org/TR/NOTE-datetime;>ISO 8601 formatted 
string. Two options, timestampFormat and dateFormat, are added to these two 
data sources to let users control the format of timestamp and date value in 
string representation, respectively. Please refer to the API doc of DataFrameReader
 and DataFrameWriter
 for more details about these two configurations.
   SPARK-17427: Function SIZE returns -1 when its input parameter is 
null.
-  SPARK-16498: LazyBinaryColumnarSerDe is fixed as the the SerDe for 
RCFile. 
+  SPARK-16498: LazyBinaryColumnarSerDe is fixed as the the SerDe for 
RCFile.
   SPARK-16552: If a user does not specify the schema to a table and 
relies on schema inference, the inferred schema will be stored in the 
metastore. The schema will be not inferred again when this table is used.
 
   

http://git-wip-us.apache.org/repos/asf/spark-website/blob/ae58782b/site/sitemap.xml
--
diff --git a/site/sitemap.xml b/site/sitemap.xml
index 1ed4c74..67a171b 100644
--- a/site/sitemap.xml
+++ b/site/sitemap.xml
@@ -632,27 +632,27 @@
   weekly
 
 
-  http://spark.apache.org/sql/
+  http://spark.apache.org/mllib/
   weekly
 
 
-  http://spark.apache.org/screencasts/
+  http://spark.apache.org/news/
   weekly
 
 
-  http://spark.apache.org/streaming/
+  http://spark.apache.org/screencasts/
   weekly
 
 
-  http://spark.apache.org/
+  http://spark.apache.org/sql/
   weekly
 
 
-  http://spark.apache.org/mllib/
+  http://spark.apache.org/streaming/
   weekly
 
 
-  http://spark.apache.org/news/
+  http://spark.apache.org/
   weekly
 
 

http://git-wip-us.apache.org/repos/asf/spark-website/blob/ae58782b/site/third-party-projects.html
--
diff --git a/site/third-party-projects.html b/site/third-party-projects.html
index 74db810..8cb21d1 100644
--- a/site/third-party-projects.html
+++ b/site/third-party-projects.html
@@ -212,7 +212,7 @@ for details)
   http://mesos.apache.org/;>Apache Mesos - Cluster management 
system that supports 
 running Spark
   http://alluxio.org/;>Alluxio (née Tachyon) - Memory speed 
virtual distributed 
-storage system that supports running Spark
+storage system that supports running Spark
   https://github.com/datastax/spark-cassandra-connector;>Spark 
Cassandra Connector - 
 Easily load your Cassandra data into Spark and Spark SQL; from Datastax
   http://github.com/tuplejump/FiloDB;>FiloDB - a Spark 
integrated analytical/columnar 


spark git commit: [MINOR][BUILD] Fix javadoc8 break

2017-02-16 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 0e2405490 -> 3b4376876


[MINOR][BUILD] Fix javadoc8 break

## What changes were proposed in this pull request?

These error below seems caused by unidoc that does not understand double 
commented block.

```
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:69: 
error: class, interface, or enum expected
[error]  * MapGroupsWithStateFunctionString, Integer, Integer, String 
mappingFunction =
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:69: 
error: class, interface, or enum expected
[error]  * MapGroupsWithStateFunctionString, Integer, Integer, String 
mappingFunction =
[error]   ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:70: 
error: class, interface, or enum expected
[error]  *new MapGroupsWithStateFunctionString, Integer, Integer, 
String() {
[error] ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:70: 
error: class, interface, or enum expected
[error]  *new MapGroupsWithStateFunctionString, Integer, Integer, 
String() {
[error] 
^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:72: 
error: illegal character: '#'
[error]  *  Override
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:72: 
error: class, interface, or enum expected
[error]  *  Override
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: 
error: class, interface, or enum expected
[error]  *  public String call(String key, IteratorInteger value, 
KeyedStateInteger state) {
[error]^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: 
error: class, interface, or enum expected
[error]  *  public String call(String key, IteratorInteger value, 
KeyedStateInteger state) {
[error]^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: 
error: class, interface, or enum expected
[error]  *  public String call(String key, IteratorInteger value, 
KeyedStateInteger state) {
[error]^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: 
error: class, interface, or enum expected
[error]  *  public String call(String key, IteratorInteger value, 
KeyedStateInteger state) {
[error] 
^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: 
error: class, interface, or enum expected
[error]  *  public String call(String key, IteratorInteger value, 
KeyedStateInteger state) {
[error] 
^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:76: 
error: class, interface, or enum expected
[error]  *  boolean shouldRemove = ...; // Decide whether to remove the 
state
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:77: 
error: class, interface, or enum expected
[error]  *  if (shouldRemove) {
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:79: 
error: class, interface, or enum expected
[error]  *  } else {
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:81: 
error: class, interface, or enum expected
[error]  *state.update(newState); // Set the new state
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:82: 
error: class, interface, or enum expected
[error]  *  }
[error]  ^
[error] 
.../forked/spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:85: 
error: class, interface, or enum expected
[error]  *  state.update(initialState);
[error]  ^
[error] 
.../forked/spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:86: 
error: class, interface, or enum expected
[error]  *}
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:90: 
error: class, interface, or enum expected
[error]  * 
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:92: 
error: class, interface, or enum expected
[error]  * tparam S User-defined type of the state to be stored for each key. 
Must be encodable into
[error]^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:93: 
error: class, interface, or enum expected
[error]  *   Spark SQL types (see {link Encoder} for more 

[6/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-16 Thread srowen
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
--
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java 
b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
new file mode 100644
index 000..80aab10
--- /dev/null
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -0,0 +1,1842 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark;
+
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.AccumulatorParam;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import scala.Tuple2;
+import scala.Tuple3;
+import scala.Tuple4;
+import scala.collection.JavaConverters;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.base.Throwables;
+import com.google.common.io.Files;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.input.PortableDataStream;
+import org.apache.spark.partial.BoundedDouble;
+import org.apache.spark.partial.PartialResult;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.util.LongAccumulator;
+import org.apache.spark.util.StatCounter;
+
+// The test suite itself is Serializable so that anonymous Function 
implementations can be
+// serialized, as an alternative to converting these anonymous classes to 
static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite implements Serializable {
+  private transient JavaSparkContext sc;
+  private transient File tempDir;
+
+  @Before
+  public void setUp() {
+sc = new JavaSparkContext("local", "JavaAPISuite");
+tempDir = Files.createTempDir();
+tempDir.deleteOnExit();
+  }
+
+  @After
+  public void tearDown() {
+sc.stop();
+sc = null;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void sparkContextUnion() {
+// Union of non-specialized JavaRDDs
+List strings = Arrays.asList("Hello", "World");
+JavaRDD s1 = sc.parallelize(strings);
+JavaRDD s2 = sc.parallelize(strings);
+// Varargs
+JavaRDD sUnion = sc.union(s1, s2);
+assertEquals(4, sUnion.count());
+// List
+List list = new ArrayList<>();
+list.add(s2);
+sUnion = sc.union(s1, list);
+assertEquals(4, sUnion.count());
+
+// Union of JavaDoubleRDDs
+List doubles = Arrays.asList(1.0, 2.0);
+JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
+JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
+JavaDoubleRDD dUnion = sc.union(d1, d2);
+assertEquals(4, dUnion.count());
+
+// Union of JavaPairRDDs
+List> pairs = new 

[2/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-16 Thread srowen
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
--
diff --git 
a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java 
b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
new file mode 100644
index 000..646cb97
--- /dev/null
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -0,0 +1,887 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.streaming;
+
+import java.io.Serializable;
+import java.util.*;
+
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.JavaTestUtils;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.StateSpec;
+import org.apache.spark.streaming.Time;
+import scala.Tuple2;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
+
+/**
+ * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using 
java 8
+ * lambda syntax.
+ */
+@SuppressWarnings("unchecked")
+public class Java8APISuite extends LocalJavaStreamingContext implements 
Serializable {
+
+  @Test
+  public void testMap() {
+List inputData = Arrays.asList(
+  Arrays.asList("hello", "world"),
+  Arrays.asList("goodnight", "moon"));
+
+List expected = Arrays.asList(
+  Arrays.asList(5, 5),
+  Arrays.asList(9, 4));
+
+JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+JavaDStream letterCount = stream.map(String::length);
+JavaTestUtils.attachTestOutputStream(letterCount);
+List result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testFilter() {
+List inputData = Arrays.asList(
+  Arrays.asList("giants", "dodgers"),
+  Arrays.asList("yankees", "red sox"));
+
+List expected = Arrays.asList(
+  Arrays.asList("giants"),
+  Arrays.asList("yankees"));
+
+JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+JavaDStream filtered = stream.filter(s -> s.contains("a"));
+JavaTestUtils.attachTestOutputStream(filtered);
+List result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+assertOrderInvariantEquals(expected, result);
+  }
+
+  @Test
+  public void testMapPartitions() {
+List inputData = Arrays.asList(
+  Arrays.asList("giants", "dodgers"),
+  Arrays.asList("yankees", "red sox"));
+
+List expected = Arrays.asList(
+  Arrays.asList("GIANTSDODGERS"),
+  Arrays.asList("YANKEESRED SOX"));
+
+JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+JavaDStream mapped = stream.mapPartitions(in -> {
+  String out = "";
+  while (in.hasNext()) {
+out = out + in.next().toUpperCase();
+  }
+  return Lists.newArrayList(out).iterator();
+});
+JavaTestUtils.attachTestOutputStream(mapped);
+List result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testReduce() {
+List inputData = Arrays.asList(
+  Arrays.asList(1, 2, 3),
+  Arrays.asList(4, 5, 6),
+  Arrays.asList(7, 8, 9));
+
+List expected = Arrays.asList(
+  Arrays.asList(6),
+  Arrays.asList(15),
+  Arrays.asList(24));
+
+JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+JavaDStream reduced = stream.reduce((x, y) -> x + y);
+

[7/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-16 Thread srowen
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/test/java/org/apache/spark/JavaAPISuite.java
--
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java 
b/core/src/test/java/org/apache/spark/JavaAPISuite.java
deleted file mode 100644
index 7bebe06..000
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ /dev/null
@@ -1,1836 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import java.io.*;
-import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.*;
-
-import scala.Tuple2;
-import scala.Tuple3;
-import scala.Tuple4;
-import scala.collection.JavaConverters;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.base.Throwables;
-import com.google.common.io.Files;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.After;
-import static org.junit.Assert.*;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.api.java.JavaDoubleRDD;
-import org.apache.spark.api.java.JavaFutureAction;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.input.PortableDataStream;
-import org.apache.spark.partial.BoundedDouble;
-import org.apache.spark.partial.PartialResult;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.serializer.KryoSerializer;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.util.LongAccumulator;
-import org.apache.spark.util.StatCounter;
-
-// The test suite itself is Serializable so that anonymous Function 
implementations can be
-// serialized, as an alternative to converting these anonymous classes to 
static inner classes;
-// see http://stackoverflow.com/questions/758570/.
-public class JavaAPISuite implements Serializable {
-  private transient JavaSparkContext sc;
-  private transient File tempDir;
-
-  @Before
-  public void setUp() {
-sc = new JavaSparkContext("local", "JavaAPISuite");
-tempDir = Files.createTempDir();
-tempDir.deleteOnExit();
-  }
-
-  @After
-  public void tearDown() {
-sc.stop();
-sc = null;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void sparkContextUnion() {
-// Union of non-specialized JavaRDDs
-List strings = Arrays.asList("Hello", "World");
-JavaRDD s1 = sc.parallelize(strings);
-JavaRDD s2 = sc.parallelize(strings);
-// Varargs
-JavaRDD sUnion = sc.union(s1, s2);
-assertEquals(4, sUnion.count());
-// List
-List list = new ArrayList<>();
-list.add(s2);
-sUnion = sc.union(s1, list);
-assertEquals(4, sUnion.count());
-
-// Union of JavaDoubleRDDs
-List doubles = Arrays.asList(1.0, 2.0);
-JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles);
-JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles);
-JavaDoubleRDD dUnion = sc.union(d1, d2);
-assertEquals(4, dUnion.count());
-
-// Union of JavaPairRDDs
-List> pairs = new ArrayList<>();
-pairs.add(new Tuple2<>(1, 2));
-pairs.add(new Tuple2<>(3, 4));
-JavaPairRDD p1 = sc.parallelizePairs(pairs);
-JavaPairRDD p2 = sc.parallelizePairs(pairs);
-JavaPairRDD

[4/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-16 Thread srowen
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
--
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java 
b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
index c7959ae..ff80453 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
@@ -44,12 +44,7 @@ class OutputRedirector {
   OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
 this.active = true;
 this.reader = new BufferedReader(new InputStreamReader(in, 
StandardCharsets.UTF_8));
-this.thread = tf.newThread(new Runnable() {
-  @Override
-  public void run() {
-redirect();
-  }
-});
+this.thread = tf.newThread(this::redirect);
 this.sink = Logger.getLogger(loggerName);
 thread.start();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
--
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index 0aa7bd1..cefb4d1 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -91,9 +91,6 @@ public interface SparkAppHandle {
* Tries to kill the underlying application. Implies {@link #disconnect()}. 
This will not send
* a {@link #stop()} message to the application, so it's recommended that 
users first try to
* stop the application cleanly and only resort to this method if that fails.
-   * 
-   * Note that if the application is running as a child process, this method 
fail to kill the
-   * process when using Java 7. This may happen if, for example, the 
application is deadlocked.
*/
   void kill();
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
--
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index 82b593a..8178684 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -49,35 +49,44 @@ class SparkClassCommandBuilder extends 
AbstractCommandBuilder {
 
 // Master, Worker, HistoryServer, ExternalShuffleService, 
MesosClusterDispatcher use
 // SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
-if (className.equals("org.apache.spark.deploy.master.Master")) {
-  javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
-  javaOptsKeys.add("SPARK_MASTER_OPTS");
-  memKey = "SPARK_DAEMON_MEMORY";
-} else if (className.equals("org.apache.spark.deploy.worker.Worker")) {
-  javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
-  javaOptsKeys.add("SPARK_WORKER_OPTS");
-  memKey = "SPARK_DAEMON_MEMORY";
-} else if 
(className.equals("org.apache.spark.deploy.history.HistoryServer")) {
-  javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
-  javaOptsKeys.add("SPARK_HISTORY_OPTS");
-  memKey = "SPARK_DAEMON_MEMORY";
-} else if 
(className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) {
-  javaOptsKeys.add("SPARK_JAVA_OPTS");
-  javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
-  memKey = "SPARK_EXECUTOR_MEMORY";
-} else if 
(className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
-  javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
-  memKey = "SPARK_EXECUTOR_MEMORY";
-} else if 
(className.equals("org.apache.spark.deploy.mesos.MesosClusterDispatcher")) {
-  javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
-} else if 
(className.equals("org.apache.spark.deploy.ExternalShuffleService") ||
-
className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) {
-  javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
-  javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
-  memKey = "SPARK_DAEMON_MEMORY";
-} else {
-  javaOptsKeys.add("SPARK_JAVA_OPTS");
-  memKey = "SPARK_DRIVER_MEMORY";
+switch (className) {
+  case "org.apache.spark.deploy.master.Master":
+javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+javaOptsKeys.add("SPARK_MASTER_OPTS");
+memKey = "SPARK_DAEMON_MEMORY";
+break;
+  case "org.apache.spark.deploy.worker.Worker":
+javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+javaOptsKeys.add("SPARK_WORKER_OPTS");
+memKey = "SPARK_DAEMON_MEMORY";
+ 

[5/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-16 Thread srowen
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/streaming-kafka-0-8-integration.md
--
diff --git a/docs/streaming-kafka-0-8-integration.md 
b/docs/streaming-kafka-0-8-integration.md
index 58b17aa..24a3e4c 100644
--- a/docs/streaming-kafka-0-8-integration.md
+++ b/docs/streaming-kafka-0-8-integration.md
@@ -155,33 +155,22 @@ Next, we discuss how to use this approach in your 
streaming application.


// Hold a reference to the current offset ranges, so it can be 
used downstream
-   final AtomicReference offsetRanges = new 
AtomicReference<>();
-
-   directKafkaStream.transformToPair(
- new Function, JavaPairRDD>() {
-   @Override
-   public JavaPairRDD call(JavaPairRDD rdd) throws Exception {
- OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
- offsetRanges.set(offsets);
- return rdd;
-   }
- }
-   ).map(
+   AtomicReference offsetRanges = new 
AtomicReference<>();
+
+   directKafkaStream.transformToPair(rdd -> {
+  OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+  offsetRanges.set(offsets);
+  return rdd;
+   }).map(
  ...
-   ).foreachRDD(
- new Function, Void>() {
-   @Override
-   public Void call(JavaPairRDD rdd) throws 
IOException {
- for (OffsetRange o : offsetRanges.get()) {
-   System.out.println(
- o.topic() + " " + o.partition() + " " + 
o.fromOffset() + " " + o.untilOffset()
-   );
- }
- ...
- return null;
-   }
- }
-   );
+   ).foreachRDD(rdd -> {
+  for (OffsetRange o : offsetRanges.get()) {
+System.out.println(
+  o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + 
o.untilOffset()
+);
+  }
+  ...
+   });


offsetRanges = []

http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/streaming-programming-guide.md
--
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index a878971..abd4ac9 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -163,12 +163,7 @@ space into words.
 
 {% highlight java %}
 // Split each line into words
-JavaDStream words = lines.flatMap(
-  new FlatMapFunction() {
-@Override public Iterator call(String x) {
-  return Arrays.asList(x.split(" ")).iterator();
-}
-  });
+JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split(" 
")).iterator());
 {% endhighlight %}
 
 `flatMap` is a DStream operation that creates a new DStream by
@@ -183,18 +178,8 @@ Next, we want to count these words.
 
 {% highlight java %}
 // Count each word in each batch
-JavaPairDStream pairs = words.mapToPair(
-  new PairFunction() {
-@Override public Tuple2 call(String s) {
-  return new Tuple2<>(s, 1);
-}
-  });
-JavaPairDStream wordCounts = pairs.reduceByKey(
-  new Function2() {
-@Override public Integer call(Integer i1, Integer i2) {
-  return i1 + i2;
-}
-  });
+JavaPairDStream pairs = words.mapToPair(s -> new Tuple2<>(s, 
1));
+JavaPairDStream wordCounts = pairs.reduceByKey((i1, i2) -> i1 
+ i2);
 
 // Print the first ten elements of each RDD generated in this DStream to the 
console
 wordCounts.print();
@@ -836,11 +821,9 @@ the `(word, 1)` pairs) and the `runningCount` having the 
previous count.
 
 {% highlight java %}
 Function2 updateFunction =
-  new Function2() {
-@Override public Optional call(List values, 
Optional state) {
-  Integer newSum = ...  // add the new values with the previous running 
count to get the new count
-  return Optional.of(newSum);
-}
+  (values, state) -> {
+Integer newSum = ...  // add the new values with the previous running 
count to get the new count
+return Optional.of(newSum);
   };
 {% endhighlight %}
 
@@ -915,15 +898,12 @@ val cleanedDStream = wordCounts.transform { rdd =>
 {% highlight java %}
 import org.apache.spark.streaming.api.java.*;
 // RDD containing spam information
-final JavaPairRDD spamInfoRDD = 

[3/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-16 Thread srowen
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
--
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
deleted file mode 100644
index 648a5ab..000
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ /dev/null
@@ -1,2000 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import scala.Tuple2;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.io.Files;
-import com.google.common.collect.Sets;
-
-import org.apache.spark.HashPartitioner;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.util.LongAccumulator;
-import org.apache.spark.util.Utils;
-
-// The test suite itself is Serializable so that anonymous Function 
implementations can be
-// serialized, as an alternative to converting these anonymous classes to 
static inner classes;
-// see http://stackoverflow.com/questions/758570/.
-public class JavaAPISuite extends LocalJavaStreamingContext implements 
Serializable {
-
-  public static void equalIterator(Iterator a, Iterator b) {
-while (a.hasNext() && b.hasNext()) {
-  Assert.assertEquals(a.next(), b.next());
-}
-Assert.assertEquals(a.hasNext(), b.hasNext());
-  }
-
-  public static void equalIterable(Iterable a, Iterable b) {
-  equalIterator(a.iterator(), b.iterator());
-  }
-
-  @Test
-  public void testInitialization() {
-Assert.assertNotNull(ssc.sparkContext());
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testContextState() {
-List inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
-Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
-JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-JavaTestUtils.attachTestOutputStream(stream);
-Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
-ssc.start();
-Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState());
-ssc.stop();
-Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState());
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testCount() {
-List inputData = Arrays.asList(
-Arrays.asList(1,2,3,4),
-Arrays.asList(3,4,5),
-Arrays.asList(3));
-
-List expected = Arrays.asList(
-Arrays.asList(4L),
-Arrays.asList(3L),
-Arrays.asList(1L));
-
-JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-JavaDStream count = stream.count();
-JavaTestUtils.attachTestOutputStream(count);
-List result = JavaTestUtils.runStreams(ssc, 3, 3);
-assertOrderInvariantEquals(expected, result);
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testMap() {
-List inputData = Arrays.asList(
-Arrays.asList("hello", "world"),
-Arrays.asList("goodnight", "moon"));
-
-List expected = Arrays.asList(
-Arrays.asList(5,5),
-Arrays.asList(9,4));
-
-JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
-JavaDStream letterCount = stream.map(new Function() {
-@Override
-public Integer call(String s) {
-  return s.length();
-}
-});
-

[8/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-16 Thread srowen
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

- Move external/java8-tests tests into core, streaming, sql and remove
- Remove MaxPermGen and related options
- Fix some reflection / TODOs around Java 8+ methods
- Update doc references to 1.7/1.8 differences
- Remove Java 7/8 related build profiles
- Update some plugins for better Java 8 compatibility
- Fix a few Java-related warnings

For the future:

- Update Java 8 examples to fully use Java 8
- Update Java tests to use lambdas for simplicity
- Update Java internal implementations to use lambdas

## How was this patch tested?

Existing tests

Author: Sean Owen 

Closes #16871 from srowen/SPARK-19493.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e240549
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e240549
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e240549

Branch: refs/heads/master
Commit: 0e2405490f2056728d1353abbac6f3ea177ae533
Parents: 3871d94
Author: Sean Owen 
Authored: Thu Feb 16 12:32:45 2017 +
Committer: Sean Owen 
Committed: Thu Feb 16 12:32:45 2017 +

--
 assembly/pom.xml|1 +
 build/mvn   |8 +-
 build/sbt-launch-lib.bash   |2 +-
 .../spark/network/client/TransportClient.java   |  111 +-
 .../network/crypto/AuthClientBootstrap.java |   16 +-
 .../spark/network/crypto/AuthRpcHandler.java|3 -
 .../network/server/TransportRequestHandler.java |   27 +-
 .../spark/network/crypto/AuthEngineSuite.java   |2 -
 .../shuffle/ExternalShuffleBlockHandler.java|8 +-
 .../shuffle/ExternalShuffleBlockResolver.java   |7 +-
 .../network/shuffle/ExternalShuffleClient.java  |   21 +-
 .../network/shuffle/RetryingBlockFetcher.java   |9 +-
 common/sketch/pom.xml   |2 +
 common/unsafe/pom.xml   |2 +
 .../java/org/apache/spark/unsafe/Platform.java  |9 +-
 .../spark/unsafe/types/CalendarInterval.java|   88 +-
 .../org/apache/spark/api/java/Optional.java |7 +-
 .../api/java/function/CoGroupFunction.java  |1 +
 .../java/function/DoubleFlatMapFunction.java|1 +
 .../spark/api/java/function/DoubleFunction.java |1 +
 .../spark/api/java/function/FilterFunction.java |1 +
 .../api/java/function/FlatMapFunction.java  |1 +
 .../api/java/function/FlatMapFunction2.java |1 +
 .../java/function/FlatMapGroupsFunction.java|1 +
 .../api/java/function/ForeachFunction.java  |1 +
 .../java/function/ForeachPartitionFunction.java |1 +
 .../spark/api/java/function/Function.java   |1 +
 .../spark/api/java/function/Function0.java  |1 +
 .../spark/api/java/function/Function2.java  |1 +
 .../spark/api/java/function/Function3.java  |1 +
 .../spark/api/java/function/Function4.java  |1 +
 .../spark/api/java/function/MapFunction.java|1 +
 .../api/java/function/MapGroupsFunction.java|1 +
 .../java/function/MapPartitionsFunction.java|1 +
 .../api/java/function/PairFlatMapFunction.java  |1 +
 .../spark/api/java/function/PairFunction.java   |1 +
 .../spark/api/java/function/ReduceFunction.java |1 +
 .../spark/api/java/function/VoidFunction.java   |1 +
 .../spark/api/java/function/VoidFunction2.java  |1 +
 .../unsafe/sort/UnsafeExternalSorter.java   |9 +-
 .../unsafe/sort/UnsafeSorterSpillMerger.java|   28 +-
 .../scala/org/apache/spark/SparkContext.scala   |3 -
 .../spark/launcher/WorkerCommandBuilder.scala   |1 -
 .../scala/org/apache/spark/util/Utils.scala |   44 +-
 .../java/org/apache/spark/JavaAPISuite.java | 1836 
 .../test/org/apache/spark/Java8RDDAPISuite.java |  356 
 .../test/org/apache/spark/JavaAPISuite.java | 1842 
 .../org/apache/spark/util/UtilsSuite.scala  |6 +-
 dev/appveyor-install-dependencies.ps1   |2 +-
 dev/create-release/release-build.sh |1 -
 dev/make-distribution.sh|2 +-
 dev/mima|1 -
 dev/run-tests.py|3 -
 dev/test-dependencies.sh|2 +-
 docs/building-spark.md  |   32 +-
 docs/index.md   |6 +-
 docs/mllib-linear-methods.md|2 +-
 docs/mllib-statistics.md|7 +-
 docs/programming-guide.md   |   11 +-
 docs/quick-start.md |9 +-
 docs/streaming-custom-receivers.md  |   10 +-
 docs/streaming-kafka-0-10-integration.md|   62 +-
 docs/streaming-kafka-0-8-integration.md |   41 +-
 

[1/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-16 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 3871d94a6 -> 0e2405490


http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
--
diff --git 
a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
new file mode 100644
index 000..8d24104
--- /dev/null
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -0,0 +1,2008 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.streaming;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.JavaCheckpointTestUtils;
+import org.apache.spark.streaming.JavaTestUtils;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.Seconds;
+import org.apache.spark.streaming.StreamingContextState;
+import org.apache.spark.streaming.StreamingContextSuite;
+import org.apache.spark.streaming.Time;
+import scala.Tuple2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+import com.google.common.collect.Sets;
+
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.util.LongAccumulator;
+import org.apache.spark.util.Utils;
+
+// The test suite itself is Serializable so that anonymous Function 
implementations can be
+// serialized, as an alternative to converting these anonymous classes to 
static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite extends LocalJavaStreamingContext implements 
Serializable {
+
+  public static void equalIterator(Iterator a, Iterator b) {
+while (a.hasNext() && b.hasNext()) {
+  Assert.assertEquals(a.next(), b.next());
+}
+Assert.assertEquals(a.hasNext(), b.hasNext());
+  }
+
+  public static void equalIterable(Iterable a, Iterable b) {
+  equalIterator(a.iterator(), b.iterator());
+  }
+
+  @Test
+  public void testInitialization() {
+Assert.assertNotNull(ssc.sparkContext());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testContextState() {
+List inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
+Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
+JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+JavaTestUtils.attachTestOutputStream(stream);
+Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
+ssc.start();
+Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState());
+ssc.stop();
+Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCount() {
+List inputData = Arrays.asList(
+Arrays.asList(1,2,3,4),
+Arrays.asList(3,4,5),
+Arrays.asList(3));
+
+List expected = Arrays.asList(
+Arrays.asList(4L),
+Arrays.asList(3L),
+Arrays.asList(1L));
+
+JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, 
inputData, 1);
+JavaDStream count = stream.count();
+JavaTestUtils.attachTestOutputStream(count);
+List result = JavaTestUtils.runStreams(ssc, 3, 3);
+assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testMap() {
+

spark git commit: [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN subquery 3rd batch

2017-02-16 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master f041e55ee -> 3871d94a6


[SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN subquery 3rd batch

## What changes were proposed in this pull request?

This is 3ird batch of test case for IN/NOT IN subquery. In this PR, it has 
these test files:

`in-having.sql`
`in-joins.sql`
`in-multiple-columns.sql`

These are the queries and results from running on DB2.
[in-having DB2 
version](https://github.com/apache/spark/files/772668/in-having.sql.db2.txt)
[output of 
in-having](https://github.com/apache/spark/files/772670/in-having.sql.db2.out.txt)
[in-joins DB2 
version](https://github.com/apache/spark/files/772672/in-joins.sql.db2.txt)
[output of 
in-joins](https://github.com/apache/spark/files/772673/in-joins.sql.db2.out.txt)
[in-multiple-columns DB2 
version](https://github.com/apache/spark/files/772678/in-multiple-columns.sql.db2.txt)
[output of 
in-multiple-columns](https://github.com/apache/spark/files/772680/in-multiple-columns.sql.db2.out.txt)

## How was this patch tested?
This pr is adding new test cases. We compare the result from spark with the 
result from another RDBMS(We used DB2 LUW). If the results are the same, we 
assume the result is correct.

Author: Kevin Yu 

Closes #16841 from kevinyu98/spark-18871-33.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3871d94a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3871d94a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3871d94a

Branch: refs/heads/master
Commit: 3871d94a695d47169720e877f77ff1e4bede43ee
Parents: f041e55
Author: Kevin Yu 
Authored: Thu Feb 16 00:02:15 2017 -0800
Committer: Xiao Li 
Committed: Thu Feb 16 00:02:15 2017 -0800

--
 .../inputs/subquery/in-subquery/in-having.sql   | 152 
 .../inputs/subquery/in-subquery/in-joins.sql| 270 ++
 .../in-subquery/in-multiple-columns.sql | 127 +++
 .../subquery/in-subquery/in-having.sql.out  | 217 
 .../subquery/in-subquery/in-joins.sql.out   | 353 +++
 .../in-subquery/in-multiple-columns.sql.out | 178 ++
 6 files changed, 1297 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3871d94a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-having.sql
--
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-having.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-having.sql
new file mode 100644
index 000..8f98ae1
--- /dev/null
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-having.sql
@@ -0,0 +1,152 @@
+-- A test suite for IN HAVING in parent side, subquery, and both predicate 
subquery
+-- It includes correlated cases.
+
+create temporary view t1 as select * from values
+  ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 
01:00:00.000', date '2014-04-04'),
+  ("val1b", 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 
01:01:00.000', date '2014-05-04'),
+  ("val1a", 16S, 12, 21L, float(15.0), 20D, 20E2, timestamp '2014-06-04 
01:02:00.001', date '2014-06-04'),
+  ("val1a", 16S, 12, 10L, float(15.0), 20D, 20E2, timestamp '2014-07-04 
01:01:00.000', date '2014-07-04'),
+  ("val1c", 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 
01:02:00.001', date '2014-05-05'),
+  ("val1d", null, 16, 22L, float(17.0), 25D, 26E2, timestamp '2014-06-04 
01:01:00.000', null),
+  ("val1d", null, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-07-04 
01:02:00.001', null),
+  ("val1e", 10S, null, 25L, float(17.0), 25D, 26E2, timestamp '2014-08-04 
01:01:00.000', date '2014-08-04'),
+  ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2, timestamp '2014-09-04 
01:02:00.001', date '2014-09-04'),
+  ("val1d", 10S, null, 12L, float(17.0), 25D, 26E2, timestamp '2015-05-04 
01:01:00.000', date '2015-05-04'),
+  ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 
01:02:00.001', date '2014-04-04'),
+  ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 
01:01:00.000', date '2014-05-04')
+  as t1(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i);
+
+create temporary view t2 as select * from values
+  ("val2a", 6S, 12, 14L, float(15), 20D, 20E2, timestamp '2014-04-04 
01:01:00.000', date '2014-04-04'),
+  ("val1b", 10S, 12, 19L, float(17), 25D, 26E2, timestamp '2014-05-04 
01:01:00.000', date '2014-05-04'),
+  ("val1b", 8S, 16, 119L, float(17), 25D, 26E2, timestamp '2015-05-04 
01:01:00.000', date '2015-05-04'),
+  ("val1c", 12S, 16, 219L, float(17), 25D, 26E2, timestamp '2016-05-04 
01:01:00.000', date '2016-05-04'),
+  ("val1b", null, 16, 319L, float(17),