spark git commit: [SPARK-21694][MESOS] Support Mesos CNI network labels

2017-08-24 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 43cbfad99 -> ce0d3bb37


[SPARK-21694][MESOS] Support Mesos CNI network labels

JIRA ticket: https://issues.apache.org/jira/browse/SPARK-21694

## What changes were proposed in this pull request?

Spark already supports launching containers attached to a given CNI network by 
specifying it via the config `spark.mesos.network.name`.

This PR adds support to pass in network labels to CNI plugins via a new config 
option `spark.mesos.network.labels`. These network labels are key-value pairs 
that are set in the `NetworkInfo` of both the driver and executor tasks. More 
details in the related Mesos documentation:  
http://mesos.apache.org/documentation/latest/cni/#mesos-meta-data-to-cni-plugins

## How was this patch tested?

Unit tests, for both driver and executor tasks.
Manual integration test to submit a job with the `spark.mesos.network.labels` 
option, hit the mesos/state.json endpoint, and check that the labels are set in 
the driver and executor tasks.

ArtRand skonto

Author: Susan X. Huynh 

Closes #18910 from susanxhuynh/sh-mesos-cni-labels.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce0d3bb3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce0d3bb3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce0d3bb3

Branch: refs/heads/master
Commit: ce0d3bb377766bdf4df7852272557ae846408877
Parents: 43cbfad
Author: Susan X. Huynh 
Authored: Thu Aug 24 10:05:38 2017 +0100
Committer: Sean Owen 
Committed: Thu Aug 24 10:05:38 2017 +0100

--
 docs/running-on-mesos.md | 14 ++
 .../org/apache/spark/deploy/mesos/config.scala   | 19 +--
 .../MesosCoarseGrainedSchedulerBackend.scala |  2 +-
 .../mesos/MesosSchedulerBackendUtil.scala|  9 +++--
 .../mesos/MesosClusterSchedulerSuite.scala   |  9 +++--
 ...MesosCoarseGrainedSchedulerBackendSuite.scala |  9 +++--
 6 files changed, 53 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ce0d3bb3/docs/running-on-mesos.md
--
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index ae38550..0e5a20c 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -538,6 +538,20 @@ See the [configuration page](configuration.html) for 
information on Spark config
   
 
 
+  spark.mesos.network.labels
+  (none)
+  
+Pass network labels to CNI plugins.  This is a comma-separated list
+of key-value pairs, where each key-value pair has the format key:value.
+Example:
+
+key1:val1,key2:val2
+See
+http://mesos.apache.org/documentation/latest/cni/#mesos-meta-data-to-cni-plugins";>the
 Mesos CNI docs
+for more details.
+  
+
+
   spark.mesos.fetcherCache.enable
   false
   

http://git-wip-us.apache.org/repos/asf/spark/blob/ce0d3bb3/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
--
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 6c8619e..a5015b9 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -56,7 +56,7 @@ package object config {
   .stringConf
   .createOptional
 
-  private [spark] val DRIVER_LABELS =
+  private[spark] val DRIVER_LABELS =
 ConfigBuilder("spark.mesos.driver.labels")
   .doc("Mesos labels to add to the driver.  Labels are free-form key-value 
pairs.  Key-value " +
 "pairs should be separated by a colon, and commas used to list more 
than one." +
@@ -64,10 +64,25 @@ package object config {
   .stringConf
   .createOptional
 
-  private [spark] val DRIVER_FAILOVER_TIMEOUT =
+  private[spark] val DRIVER_FAILOVER_TIMEOUT =
 ConfigBuilder("spark.mesos.driver.failoverTimeout")
   .doc("Amount of time in seconds that the master will wait to hear from 
the driver, " +
   "during a temporary disconnection, before tearing down all the 
executors.")
   .doubleConf
   .createWithDefault(0.0)
+
+  private[spark] val NETWORK_NAME =
+ConfigBuilder("spark.mesos.network.name")
+  .doc("Attach containers to the given named network. If this job is 
launched " +
+"in cluster mode, also launch the driver in the given named network.")
+  .stringConf
+  .createOptional
+
+  private[spark] val NETWORK_LABELS =
+ConfigBuilder("spark.mesos.network.labels")
+  .doc("Network labels to pass to CNI plugins.  This is a comma-separated 

spark git commit: [MINOR][SQL] The comment of Class ExchangeCoordinator exist a typing and context error

2017-08-24 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master ce0d3bb37 -> 846bc61cf


[MINOR][SQL] The comment of Class ExchangeCoordinator exist a typing and 
context error

## What changes were proposed in this pull request?

The given example in the comment of Class ExchangeCoordinator is exist four 
post-shuffle partitions,but the current comment is “three”.

## How was this patch tested?

Author: lufei 

Closes #19028 from figo77/SPARK-21816.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/846bc61c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/846bc61c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/846bc61c

Branch: refs/heads/master
Commit: 846bc61cf5aa522dc755d50359ef3856ef2b17bf
Parents: ce0d3bb
Author: lufei 
Authored: Thu Aug 24 10:07:27 2017 +0100
Committer: Sean Owen 
Committed: Thu Aug 24 10:07:27 2017 +0100

--
 .../apache/spark/sql/execution/exchange/ExchangeCoordinator.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/846bc61c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
index deb2c24..9fc4ffb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
@@ -75,7 +75,7 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, 
SparkPlan}
  * For example, we have two stages with the following pre-shuffle partition 
size statistics:
  * stage 1: [100 MB, 20 MB, 100 MB, 10MB, 30 MB]
  * stage 2: [10 MB,  10 MB, 70 MB,  5 MB, 5 MB]
- * assuming the target input size is 128 MB, we will have three post-shuffle 
partitions,
+ * assuming the target input size is 128 MB, we will have four post-shuffle 
partitions,
  * which are:
  *  - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MB)
  *  - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MB)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21804][SQL] json_tuple returns null values within repeated columns except the first one

2017-08-24 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 846bc61cf -> 95713eb4f


[SPARK-21804][SQL] json_tuple returns null values within repeated columns 
except the first one

## What changes were proposed in this pull request?

When json_tuple in extracting values from JSON it returns null values within 
repeated columns except the first one as below:

``` scala
scala> spark.sql("""SELECT json_tuple('{"a":1, "b":2}', 'a', 'b', 
'a')""").show()
+---+---++
| c0| c1|  c2|
+---+---++
|  1|  2|null|
+---+---++
```

I think this should be consistent with Hive's implementation:
```
hive> SELECT json_tuple('{"a": 1, "b": 2}', 'a', 'a');
...
11
```

In this PR, we located all the matched indices in `fieldNames` instead of 
returning the first matched index, i.e., indexOf.

## How was this patch tested?

Added test in JsonExpressionsSuite.

Author: Jen-Ming Chung 

Closes #19017 from jmchung/SPARK-21804.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95713eb4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95713eb4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95713eb4

Branch: refs/heads/master
Commit: 95713eb4f22de4e16617a605f74a1d6373ed270b
Parents: 846bc61
Author: Jen-Ming Chung 
Authored: Thu Aug 24 19:24:00 2017 +0900
Committer: hyukjinkwon 
Committed: Thu Aug 24 19:24:00 2017 +0900

--
 .../sql/catalyst/expressions/jsonExpressions.scala  | 12 ++--
 .../sql/catalyst/expressions/JsonExpressionsSuite.scala | 10 ++
 2 files changed, 20 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/95713eb4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c375737..ee5da1a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -436,7 +436,8 @@ case class JsonTuple(children: Seq[Expression])
 while (parser.nextToken() != JsonToken.END_OBJECT) {
   if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
 // check to see if this field is desired in the output
-val idx = fieldNames.indexOf(parser.getCurrentName)
+val jsonField = parser.getCurrentName
+var idx = fieldNames.indexOf(jsonField)
 if (idx >= 0) {
   // it is, copy the child tree to the correct location in the output 
row
   val output = new ByteArrayOutputStream()
@@ -447,7 +448,14 @@ case class JsonTuple(children: Seq[Expression])
   generator => copyCurrentStructure(generator, parser)
 }
 
-row(idx) = UTF8String.fromBytes(output.toByteArray)
+val jsonValue = UTF8String.fromBytes(output.toByteArray)
+
+// SPARK-21804: json_tuple returns null values within repeated 
columns
+// except the first one; so that we need to check the remaining 
fields.
+do {
+  row(idx) = jsonValue
+  idx = fieldNames.indexOf(jsonField, idx + 1)
+} while (idx >= 0)
   }
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/95713eb4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 1cd2b4f..9991bda 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -373,6 +373,16 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   InternalRow(UTF8String.fromString("1"), null, 
UTF8String.fromString("2")))
   }
 
+  test("SPARK-21804: json_tuple returns null values within repeated columns 
except the first one") {
+checkJsonTuple(
+  JsonTuple(Literal("""{"f1": 1, "f2": 2}""") ::
+NonFoldableLiteral("f1") ::
+NonFoldableLiteral("cast(NULL AS STRING)") ::
+NonFoldableLiteral("f1") ::
+Nil),
+  InternalRow(UTF8String.fromString("1"), null, 
UTF8String.fromString("1")))
+  }
+
   val gmtId = Option(DateTimeUtils.TimeZoneGMT.getID)
 
   test("from_json") {


-

spark git commit: [SPARK-19165][PYTHON][SQL] PySpark APIs using columns as arguments should validate input types for column

2017-08-24 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 95713eb4f -> dc5d34d8d


[SPARK-19165][PYTHON][SQL] PySpark APIs using columns as arguments should 
validate input types for column

## What changes were proposed in this pull request?

While preparing to take over https://github.com/apache/spark/pull/16537, I 
realised a (I think) better approach to make the exception handling in one 
point.

This PR proposes to fix `_to_java_column` in `pyspark.sql.column`, which most 
of functions in `functions.py` and some other APIs use. This `_to_java_column` 
basically looks not working with other types than `pyspark.sql.column.Column` 
or string (`str` and `unicode`).

If this is not `Column`, then it calls `_create_column_from_name` which calls 
`functions.col` within JVM:

https://github.com/apache/spark/blob/42b9eda80e975d970c3e8da4047b318b83dd269f/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L76

And it looks we only have `String` one with `col`.

So, these should work:

```python
>>> from pyspark.sql.column import _to_java_column, Column
>>> _to_java_column("a")
JavaObject id=o28
>>> _to_java_column(u"a")
JavaObject id=o29
>>> _to_java_column(spark.range(1).id)
JavaObject id=o33
```

whereas these do not:

```python
>>> _to_java_column(1)
```
```
...
py4j.protocol.Py4JError: An error occurred while calling 
z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist
...
```

```python
>>> _to_java_column([])
```
```
...
py4j.protocol.Py4JError: An error occurred while calling 
z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist
...
```

```python
>>> class A(): pass
>>> _to_java_column(A())
```
```
...
AttributeError: 'A' object has no attribute '_get_object_id'
```

Meaning most of functions using `_to_java_column` such as `udf` or `to_json` or 
some other APIs throw an exception as below:

```python
>>> from pyspark.sql.functions import udf
>>> udf(lambda x: x)(None)
```

```
...
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.sql.functions.col.
: java.lang.NullPointerException
...
```

```python
>>> from pyspark.sql.functions import to_json
>>> to_json(None)
```

```
...
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.sql.functions.col.
: java.lang.NullPointerException
...
```

**After this PR**:

```python
>>> from pyspark.sql.functions import udf
>>> udf(lambda x: x)(None)
...
```

```
TypeError: Invalid argument, not a string or column: None of type . For column literals, use 'lit', 'array', 'struct' or 'create_map' 
functions.
```

```python
>>> from pyspark.sql.functions import to_json
>>> to_json(None)
```

```
...
TypeError: Invalid argument, not a string or column: None of type . For column literals, use 'lit', 'array', 'struct' or 'create_map' 
functions.
```

## How was this patch tested?

Unit tests added in `python/pyspark/sql/tests.py` and manual tests.

Author: hyukjinkwon 
Author: zero323 

Closes #19027 from HyukjinKwon/SPARK-19165.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc5d34d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc5d34d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc5d34d8

Branch: refs/heads/master
Commit: dc5d34d8dcd6526d1dfdac8606661561c7576a62
Parents: 95713eb
Author: hyukjinkwon 
Authored: Thu Aug 24 20:29:03 2017 +0900
Committer: hyukjinkwon 
Committed: Thu Aug 24 20:29:03 2017 +0900

--
 python/pyspark/sql/column.py |  8 +++-
 python/pyspark/sql/tests.py  | 25 +
 2 files changed, 32 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dc5d34d8/python/pyspark/sql/column.py
--
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index b172f38..43b38a2 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -44,8 +44,14 @@ def _create_column_from_name(name):
 def _to_java_column(col):
 if isinstance(col, Column):
 jcol = col._jc
-else:
+elif isinstance(col, basestring):
 jcol = _create_column_from_name(col)
+else:
+raise TypeError(
+"Invalid argument, not a string or column: "
+"{0} of type {1}. "
+"For column literals, use 'lit', 'array', 'struct' or 'create_map' 
"
+"function.".format(col, type(col)))
 return jcol
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dc5d34d8/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 45a3f

[2/2] spark git commit: [SPARK-21745][SQL] Refactor ColumnVector hierarchy to make ColumnVector read-only and to introduce WritableColumnVector.

2017-08-24 Thread wenchen
[SPARK-21745][SQL] Refactor ColumnVector hierarchy to make ColumnVector 
read-only and to introduce WritableColumnVector.

## What changes were proposed in this pull request?

This is a refactoring of `ColumnVector` hierarchy and related classes.

1. make `ColumnVector` read-only
2. introduce `WritableColumnVector` with write interface
3. remove `ReadOnlyColumnVector`

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN 

Closes #18958 from ueshin/issues/SPARK-21745.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e33954d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e33954d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e33954d

Branch: refs/heads/master
Commit: 9e33954ddfe1148f69e523c89827feb76ba892c9
Parents: dc5d34d
Author: Takuya UESHIN 
Authored: Thu Aug 24 21:13:44 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Aug 24 21:13:44 2017 +0800

--
 .../expressions/codegen/CodeGenerator.scala |  28 +-
 .../parquet/VectorizedColumnReader.java |  31 +-
 .../parquet/VectorizedParquetRecordReader.java  |  23 +-
 .../parquet/VectorizedPlainValuesReader.java|  16 +-
 .../parquet/VectorizedRleValuesReader.java  |  87 ++-
 .../parquet/VectorizedValuesReader.java |  16 +-
 .../execution/vectorized/AggregateHashMap.java  |  10 +-
 .../execution/vectorized/ArrowColumnVector.java |  45 +-
 .../sql/execution/vectorized/ColumnVector.java  | 632 +
 .../execution/vectorized/ColumnVectorUtils.java |  18 +-
 .../sql/execution/vectorized/ColumnarBatch.java | 106 +--
 .../vectorized/OffHeapColumnVector.java |  34 +-
 .../vectorized/OnHeapColumnVector.java  |  35 +-
 .../vectorized/ReadOnlyColumnVector.java| 251 ---
 .../vectorized/WritableColumnVector.java| 674 +++
 .../aggregate/VectorizedHashMapGenerator.scala  |  39 +-
 .../vectorized/ColumnarBatchBenchmark.scala |  23 +-
 .../vectorized/ColumnarBatchSuite.scala | 109 +--
 18 files changed, 1078 insertions(+), 1099 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 807765c..3853863 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -464,14 +464,13 @@ class CodegenContext {
   /**
* Returns the specialized code to set a given value in a column vector for 
a given `DataType`.
*/
-  def setValue(batch: String, row: String, dataType: DataType, ordinal: Int,
-  value: String): String = {
+  def setValue(vector: String, rowId: String, dataType: DataType, value: 
String): String = {
 val jt = javaType(dataType)
 dataType match {
   case _ if isPrimitiveType(jt) =>
-s"$batch.column($ordinal).put${primitiveTypeName(jt)}($row, $value);"
-  case t: DecimalType => s"$batch.column($ordinal).putDecimal($row, 
$value, ${t.precision});"
-  case t: StringType => s"$batch.column($ordinal).putByteArray($row, 
$value.getBytes());"
+s"$vector.put${primitiveTypeName(jt)}($rowId, $value);"
+  case t: DecimalType => s"$vector.putDecimal($rowId, $value, 
${t.precision});"
+  case t: StringType => s"$vector.putByteArray($rowId, $value.getBytes());"
   case _ =>
 throw new IllegalArgumentException(s"cannot generate code for 
unsupported type: $dataType")
 }
@@ -482,37 +481,36 @@ class CodegenContext {
* that could potentially be nullable.
*/
   def updateColumn(
-  batch: String,
-  row: String,
+  vector: String,
+  rowId: String,
   dataType: DataType,
-  ordinal: Int,
   ev: ExprCode,
   nullable: Boolean): String = {
 if (nullable) {
   s"""
  if (!${ev.isNull}) {
-   ${setValue(batch, row, dataType, ordinal, ev.value)}
+   ${setValue(vector, rowId, dataType, ev.value)}
  } else {
-   $batch.column($ordinal).putNull($row);
+   $vector.putNull($rowId);
  }
"""
 } else {
-  s"""${setValue(batch, row, dataType, ordinal, ev.value)};"""
+  s"""${setValue(vector, rowId, dataType, ev.value)};"""
 }
   }
 
   /**
* Returns the specialized code to access a value from a column vector for a 
given `DataType`.
*/
-  def getValue(batch: String, row: String, dataType: DataType, o

[1/2] spark git commit: [SPARK-21745][SQL] Refactor ColumnVector hierarchy to make ColumnVector read-only and to introduce WritableColumnVector.

2017-08-24 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master dc5d34d8d -> 9e33954dd


http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
new file mode 100644
index 000..b4f753c
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * This class adds write APIs to ColumnVector.
+ * It supports all the types and contains put APIs as well as their batched 
versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed capacity. 
It is the
+ * responsibility of the caller to call reserve() to ensure there is enough 
room before adding
+ * elements. This means that the put() APIs do not check as in common cases 
(i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * A ColumnVector should be considered immutable once originally created. In 
other words, it is not
+ * valid to call put APIs after reads until reset() is called.
+ */
+public abstract class WritableColumnVector extends ColumnVector {
+
+  /**
+   * Resets this column for writing. The currently stored values are no longer 
accessible.
+   */
+  public void reset() {
+if (isConstant) return;
+
+if (childColumns != null) {
+  for (ColumnVector c: childColumns) {
+((WritableColumnVector) c).reset();
+  }
+}
+numNulls = 0;
+elementsAppended = 0;
+if (anyNullsSet) {
+  putNotNulls(0, capacity);
+  anyNullsSet = false;
+}
+  }
+
+  public void reserve(int requiredCapacity) {
+if (requiredCapacity > capacity) {
+  int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
+  if (requiredCapacity <= newCapacity) {
+try {
+  reserveInternal(newCapacity);
+} catch (OutOfMemoryError outOfMemoryError) {
+  throwUnsupportedException(requiredCapacity, outOfMemoryError);
+}
+  } else {
+throwUnsupportedException(requiredCapacity, null);
+  }
+}
+  }
+
+  private void throwUnsupportedException(int requiredCapacity, Throwable 
cause) {
+String message = "Cannot reserve additional contiguous bytes in the 
vectorized reader " +
+"(requested = " + requiredCapacity + " bytes). As a workaround, you 
can disable the " +
+"vectorized reader by setting " + 
SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
+" to false.";
+throw new RuntimeException(message, cause);
+  }
+
+  @Override
+  public int numNulls() { return numNulls; }
+
+  @Override
+  public boolean anyNullsSet() { return anyNullsSet; }
+
+  /**
+   * Ensures that there is enough storage to store capacity elements. That is, 
the put() APIs
+   * must work for all rowIds < capacity.
+   */
+  protected abstract void reserveInternal(int capacity);
+
+  /**
+   * Sets the value at rowId to null/not null.
+   */
+  public abstract void putNotNull(int rowId);
+  public abstract void putNull(int rowId);
+
+  /**
+   * Sets the values from [rowId, rowId + count) to null/not null.
+   */
+  public abstract void putNulls(int rowId, int count);
+  public abstract void putNotNulls(int rowId, int count);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putBoolean(int rowId, boolean value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putBooleans(int rowId, int count, boolean value);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */

spark git commit: [SPARK-21759][SQL] In.checkInputDataTypes should not wrongly report unresolved plans for IN correlated subquery

2017-08-24 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 9e33954dd -> 183d4cb71


[SPARK-21759][SQL] In.checkInputDataTypes should not wrongly report unresolved 
plans for IN correlated subquery

## What changes were proposed in this pull request?

With the check for structural integrity proposed in SPARK-21726, it is found 
that the optimization rule `PullupCorrelatedPredicates` can produce unresolved 
plans.

For a correlated IN query looks like:

SELECT t1.a FROM t1
WHERE
t1.a IN (SELECT t2.c
FROM t2
WHERE t1.b < t2.d);

The query plan might look like:

Project [a#0]
+- Filter a#0 IN (list#4 [b#1])
   :  +- Project [c#2]
   : +- Filter (outer(b#1) < d#3)
   :+- LocalRelation , [c#2, d#3]
   +- LocalRelation , [a#0, b#1]

After `PullupCorrelatedPredicates`, it produces query plan like:

'Project [a#0]
+- 'Filter a#0 IN (list#4 [(b#1 < d#3)])
   :  +- Project [c#2, d#3]
   : +- LocalRelation , [c#2, d#3]
   +- LocalRelation , [a#0, b#1]

Because the correlated predicate involves another attribute `d#3` in subquery, 
it has been pulled out and added into the `Project` on the top of the subquery.

When `list` in `In` contains just one `ListQuery`, `In.checkInputDataTypes` 
checks if the size of `value` expressions matches the output size of subquery. 
In the above example, there is only `value` expression and the subquery output 
has two attributes `c#2, d#3`, so it fails the check and `In.resolved` returns 
`false`.

We should not let `In.checkInputDataTypes` wrongly report unresolved plans to 
fail the structural integrity check.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh 

Closes #18968 from viirya/SPARK-21759.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/183d4cb7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/183d4cb7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/183d4cb7

Branch: refs/heads/master
Commit: 183d4cb71fbcbf484fc85d8621e1fe04cbbc8195
Parents: 9e33954
Author: Liang-Chi Hsieh 
Authored: Thu Aug 24 21:46:58 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Aug 24 21:46:58 2017 +0800

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  6 +-
 .../sql/catalyst/analysis/TypeCoercion.scala|  5 +-
 .../sql/catalyst/expressions/predicates.scala   | 65 +---
 .../sql/catalyst/expressions/subquery.scala | 13 +++-
 .../spark/sql/catalyst/optimizer/subquery.scala | 10 +--
 .../PullupCorrelatedPredicatesSuite.scala   | 52 
 .../negative-cases/subq-input-typecheck.sql.out |  6 +-
 7 files changed, 106 insertions(+), 51 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/183d4cb7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 70a3885..1e934d0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1286,8 +1286,10 @@ class Analyzer(
   resolveSubQuery(s, plans)(ScalarSubquery(_, _, exprId))
 case e @ Exists(sub, _, exprId) if !sub.resolved =>
   resolveSubQuery(e, plans)(Exists(_, _, exprId))
-case In(value, Seq(l @ ListQuery(sub, _, exprId))) if value.resolved 
&& !sub.resolved =>
-  val expr = resolveSubQuery(l, plans)(ListQuery(_, _, exprId))
+case In(value, Seq(l @ ListQuery(sub, _, exprId, _))) if 
value.resolved && !l.resolved =>
+  val expr = resolveSubQuery(l, plans)((plan, exprs) => {
+ListQuery(plan, exprs, exprId, plan.output)
+  })
   In(value, Seq(expr))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/183d4cb7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 06d8350..9ffe646 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -402,7 +402,7 @@ object TypeCoercion {
 
   // Handle type casting required between value expression and subquery 
output
   // in IN subquery.
-  case i @ In(a, Seq(ListQuery(sub, children, exprI

spark git commit: [SPARK-21826][SQL] outer broadcast hash join should not throw NPE

2017-08-24 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 183d4cb71 -> 2dd37d827


[SPARK-21826][SQL] outer broadcast hash join should not throw NPE

## What changes were proposed in this pull request?

This is a bug introduced by 
https://github.com/apache/spark/pull/11274/files#diff-7adb688cbfa583b5711801f196a074bbL274
 .

Non-equal join condition should only be applied when the equal-join condition 
matches.

## How was this patch tested?

regression test

Author: Wenchen Fan 

Closes #19036 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2dd37d82
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2dd37d82
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2dd37d82

Branch: refs/heads/master
Commit: 2dd37d827f2e443dcb3eaf8a95437d179130d55c
Parents: 183d4cb
Author: Wenchen Fan 
Authored: Thu Aug 24 16:44:12 2017 +0200
Committer: Herman van Hovell 
Committed: Thu Aug 24 16:44:12 2017 +0200

--
 .../execution/joins/BroadcastHashJoinExec.scala |  2 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  | 20 
 2 files changed, 21 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2dd37d82/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index bfa1e9d..2f52a08 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -283,8 +283,8 @@ case class BroadcastHashJoinExec(
   s"""
  |boolean $conditionPassed = true;
  |${eval.trim}
- |${ev.code}
  |if ($matched != null) {
+ |  ${ev.code}
  |  $conditionPassed = !${ev.isNull} && ${ev.value};
  |}
""".stripMargin

http://git-wip-us.apache.org/repos/asf/spark/blob/2dd37d82/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 86fe09b..453052a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.language.existentials
 
@@ -26,6 +27,7 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
 
 class JoinSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -767,4 +769,22 @@ class JoinSuite extends QueryTest with SharedSQLContext {
   }
 }
   }
+
+  test("outer broadcast hash join should not throw NPE") {
+withTempView("v1", "v2") {
+  withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+Seq(2 -> 2).toDF("x", "y").createTempView("v1")
+
+spark.createDataFrame(
+  Seq(Row(1, "a")).asJava,
+  new StructType().add("i", "int", nullable = false).add("j", 
"string", nullable = false)
+).createTempView("v2")
+
+checkAnswer(
+  sql("select x, y, i, j from v1 left join v2 on x = i and y < 
length(j)"),
+  Row(2, 2, null, null)
+)
+  }
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21826][SQL] outer broadcast hash join should not throw NPE

2017-08-24 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 236b2f4d5 -> a58536741


[SPARK-21826][SQL] outer broadcast hash join should not throw NPE

This is a bug introduced by 
https://github.com/apache/spark/pull/11274/files#diff-7adb688cbfa583b5711801f196a074bbL274
 .

Non-equal join condition should only be applied when the equal-join condition 
matches.

regression test

Author: Wenchen Fan 

Closes #19036 from cloud-fan/bug.

(cherry picked from commit 2dd37d827f2e443dcb3eaf8a95437d179130d55c)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5853674
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5853674
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5853674

Branch: refs/heads/branch-2.2
Commit: a58536741f8365bb3fff01b588f3b42b219d11e5
Parents: 236b2f4
Author: Wenchen Fan 
Authored: Thu Aug 24 16:44:12 2017 +0200
Committer: Herman van Hovell 
Committed: Thu Aug 24 16:49:49 2017 +0200

--
 .../execution/joins/BroadcastHashJoinExec.scala |  2 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  | 20 
 2 files changed, 21 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a5853674/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 0bc261d..69715ab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -257,8 +257,8 @@ case class BroadcastHashJoinExec(
   s"""
  |boolean $conditionPassed = true;
  |${eval.trim}
- |${ev.code}
  |if ($matched != null) {
+ |  ${ev.code}
  |  $conditionPassed = !${ev.isNull} && ${ev.value};
  |}
""".stripMargin

http://git-wip-us.apache.org/repos/asf/spark/blob/a5853674/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 40bc1e9..95dc147 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.language.existentials
 
@@ -25,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
 
 class JoinSuite extends QueryTest with SharedSQLContext {
@@ -739,4 +741,22 @@ class JoinSuite extends QueryTest with SharedSQLContext {
   }
 }
   }
+
+  test("outer broadcast hash join should not throw NPE") {
+withTempView("v1", "v2") {
+  withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+Seq(2 -> 2).toDF("x", "y").createTempView("v1")
+
+spark.createDataFrame(
+  Seq(Row(1, "a")).asJava,
+  new StructType().add("i", "int", nullable = false).add("j", 
"string", nullable = false)
+).createTempView("v2")
+
+checkAnswer(
+  sql("select x, y, i, j from v1 left join v2 on x = i and y < 
length(j)"),
+  Row(2, 2, null, null)
+)
+  }
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark-website git commit: Remove old sentence about 4-month minor releases

2017-08-24 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 1a2e57670 -> cca972e7f


Remove old sentence about 4-month minor releases


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/cca972e7
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/cca972e7
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/cca972e7

Branch: refs/heads/asf-site
Commit: cca972e7fbf866df211076dcd5afb5f93af51994
Parents: 1a2e576
Author: Sean Owen 
Authored: Thu Aug 24 17:58:57 2017 +0100
Committer: Sean Owen 
Committed: Thu Aug 24 17:58:57 2017 +0100

--
 site/versioning-policy.html | 1 -
 versioning-policy.md| 1 -
 2 files changed, 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/cca972e7/site/versioning-policy.html
--
diff --git a/site/versioning-policy.html b/site/versioning-policy.html
index e0afa7c..82e39d9 100644
--- a/site/versioning-policy.html
+++ b/site/versioning-policy.html
@@ -209,7 +209,6 @@ These small differences account for Spark’s nature as 
a multi-module proje
 Major version numbers will remain stable over long periods of time. For 
instance, 1.X.Y may last 
 1 year or more.
   FEATURE: Feature releases will typically contain new 
features, improvements, and bug fixes. 
-The target frequency for feature releases is every 4 months. 
 Each feature release will have a merge window where new patches can be merged, 
a QA window when 
 only fixes can be merged, then a final period where voting occurs on release 
candidates. These 
 windows will be announced immediately after the previous feature release to 
give people plenty 

http://git-wip-us.apache.org/repos/asf/spark-website/blob/cca972e7/versioning-policy.md
--
diff --git a/versioning-policy.md b/versioning-policy.md
index 509b595..a6c47d2 100644
--- a/versioning-policy.md
+++ b/versioning-policy.md
@@ -21,7 +21,6 @@ Each Spark release will be versioned: 
`[MAJOR].[FEATURE].[MAINTENANCE]`
 Major version numbers will remain stable over long periods of time. For 
instance, 1.X.Y may last 
 1 year or more.
 - **FEATURE**: Feature releases will typically contain new features, 
improvements, and bug fixes. 
-The target frequency for feature releases is every 4 months. 
 Each feature release will have a merge window where new patches can be merged, 
a QA window when 
 only fixes can be merged, then a final period where voting occurs on release 
candidates. These 
 windows will be announced immediately after the previous feature release to 
give people plenty 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21681][ML] fix bug of MLOR do not work correctly when featureStd contains zero (backport PR for 2.2)

2017-08-24 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 a58536741 -> 2b4bd7910


[SPARK-21681][ML] fix bug of MLOR do not work correctly when featureStd 
contains zero (backport PR for 2.2)

## What changes were proposed in this pull request?

This is backport PR of https://github.com/apache/spark/pull/18896

fix bug of MLOR do not work correctly when featureStd contains zero

We can reproduce the bug through such dataset (features including zero 
variance), will generate wrong result (all coefficients becomes 0)
```
val multinomialDatasetWithZeroVar = {
  val nPoints = 100
  val coefficients = Array(
-0.57997, 0.912083, -0.371077,
-0.16624, -0.84355, -0.048509)

  val xMean = Array(5.843, 3.0)
  val xVariance = Array(0.6856, 0.0)  // including zero variance

  val testData = generateMultinomialLogisticInput(
coefficients, xMean, xVariance, addIntercept = true, nPoints, seed)

  val df = sc.parallelize(testData, 4).toDF().withColumn("weight", lit(1.0))
  df.cache()
  df
}
```
## How was this patch tested?

testcase added.

Author: WeichenXu 

Closes #19026 from WeichenXu123/fix_mlor_zero_var_bug_2_2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b4bd791
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b4bd791
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b4bd791

Branch: refs/heads/branch-2.2
Commit: 2b4bd7910fecc8b7b41c7d4388d2a8204c1901e8
Parents: a585367
Author: Weichen Xu 
Authored: Thu Aug 24 10:18:56 2017 -0700
Committer: Joseph K. Bradley 
Committed: Thu Aug 24 10:18:56 2017 -0700

--
 .../ml/classification/LogisticRegression.scala  | 12 ++--
 .../LogisticRegressionSuite.scala   | 75 
 2 files changed, 82 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2b4bd791/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 567af04..1de2373 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -1727,11 +1727,13 @@ private class LogisticAggregator(
 
 val margins = new Array[Double](numClasses)
 features.foreachActive { (index, value) =>
-  val stdValue = value / localFeaturesStd(index)
-  var j = 0
-  while (j < numClasses) {
-margins(j) += localCoefficients(index * numClasses + j) * stdValue
-j += 1
+  if (localFeaturesStd(index) != 0.0 && value != 0.0) {
+val stdValue = value / localFeaturesStd(index)
+var j = 0
+while (j < numClasses) {
+  margins(j) += localCoefficients(index * numClasses + j) * stdValue
+  j += 1
+}
   }
 }
 var i = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/2b4bd791/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 1ffd8dc..8461d64 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -45,6 +45,7 @@ class LogisticRegressionSuite
   @transient var smallMultinomialDataset: Dataset[_] = _
   @transient var binaryDataset: Dataset[_] = _
   @transient var multinomialDataset: Dataset[_] = _
+  @transient var multinomialDatasetWithZeroVar: Dataset[_] = _
   private val eps: Double = 1e-5
 
   override def beforeAll(): Unit = {
@@ -98,6 +99,23 @@ class LogisticRegressionSuite
   df.cache()
   df
 }
+
+multinomialDatasetWithZeroVar = {
+  val nPoints = 100
+  val coefficients = Array(
+-0.57997, 0.912083, -0.371077,
+-0.16624, -0.84355, -0.048509)
+
+  val xMean = Array(5.843, 3.0)
+  val xVariance = Array(0.6856, 0.0)
+
+  val testData = generateMultinomialLogisticInput(
+coefficients, xMean, xVariance, addIntercept = true, nPoints, seed)
+
+  val df = sc.parallelize(testData, 4).toDF().withColumn("weight", 
lit(1.0))
+  df.cache()
+  df
+}
   }
 
   /**
@@ -111,6 +129,11 @@ class LogisticRegressionSuite
 multinomialDataset.rdd.map { case Row(label: Double, features: Vector, 
weight: Double) =>
   l

spark git commit: [SPARK-21788][SS] Handle more exceptions when stopping a streaming query

2017-08-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 2dd37d827 -> d3abb3699


[SPARK-21788][SS] Handle more exceptions when stopping a streaming query

## What changes were proposed in this pull request?

Add more cases we should view as a normal query stop rather than a failure.

## How was this patch tested?

The new unit tests.

Author: Shixiong Zhu 

Closes #18997 from zsxwing/SPARK-21788.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3abb369
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3abb369
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3abb369

Branch: refs/heads/master
Commit: d3abb36990d928a8445a8c69ddebeabdfeb1484d
Parents: 2dd37d8
Author: Shixiong Zhu 
Authored: Thu Aug 24 10:23:59 2017 -0700
Committer: Tathagata Das 
Committed: Thu Aug 24 10:23:59 2017 -0700

--
 .../execution/streaming/StreamExecution.scala   | 34 ++-
 .../spark/sql/streaming/StreamSuite.scala   | 60 +++-
 2 files changed, 89 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d3abb369/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 432b2d4..c224f2f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.io.{InterruptedIOException, IOException}
+import java.io.{InterruptedIOException, IOException, UncheckedIOException}
+import java.nio.channels.ClosedByInterruptException
 import java.util.UUID
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.ReentrantLock
 
@@ -27,6 +28,7 @@ import scala.collection.mutable.{Map => MutableMap}
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
+import com.google.common.util.concurrent.UncheckedExecutionException
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
@@ -335,7 +337,7 @@ class StreamExecution(
 // `stop()` is already called. Let `finally` finish the cleanup.
   }
 } catch {
-  case _: InterruptedException | _: InterruptedIOException if state.get == 
TERMINATED =>
+  case e if isInterruptedByStop(e) =>
 // interrupted by stop()
 updateStatusMessage("Stopped")
   case e: IOException if e.getMessage != null
@@ -407,6 +409,32 @@ class StreamExecution(
 }
   }
 
+  private def isInterruptedByStop(e: Throwable): Boolean = {
+if (state.get == TERMINATED) {
+  e match {
+// InterruptedIOException - thrown when an I/O operation is interrupted
+// ClosedByInterruptException - thrown when an I/O operation upon a 
channel is interrupted
+case _: InterruptedException | _: InterruptedIOException | _: 
ClosedByInterruptException =>
+  true
+// The cause of the following exceptions may be one of the above 
exceptions:
+//
+// UncheckedIOException - thrown by codes that cannot throw a checked 
IOException, such as
+//BiFunction.apply
+// ExecutionException - thrown by codes running in a thread pool and 
these codes throw an
+//  exception
+// UncheckedExecutionException - thrown by codes that cannot throw a 
checked
+//   ExecutionException, such as 
BiFunction.apply
+case e2 @ (_: UncheckedIOException | _: ExecutionException | _: 
UncheckedExecutionException)
+  if e2.getCause != null =>
+  isInterruptedByStop(e2.getCause)
+case _ =>
+  false
+  }
+} else {
+  false
+}
+  }
+
   /**
* Populate the start offsets to start the execution at the current offsets 
stored in the sink
* (i.e. avoid reprocessing data that we have already processed). This 
function must be called

http://git-wip-us.apache.org/repos/asf/spark/blob/d3abb369/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 012cccf..d0b2041 100644
--- a/sql/core/sr

spark git commit: [SPARK-21826][SQL][2.1][2.0] outer broadcast hash join should not throw NPE

2017-08-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3d3be4dca -> 576975356


[SPARK-21826][SQL][2.1][2.0] outer broadcast hash join should not throw NPE

backport https://github.com/apache/spark/pull/19036 to branch 2.1 and 2.0

Author: Wenchen Fan 

Closes #19040 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57697535
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57697535
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57697535

Branch: refs/heads/branch-2.1
Commit: 576975356357ead203e452d0d794794349ba4578
Parents: 3d3be4d
Author: Wenchen Fan 
Authored: Thu Aug 24 10:36:37 2017 -0700
Committer: gatorsmile 
Committed: Thu Aug 24 10:36:37 2017 -0700

--
 .../execution/joins/BroadcastHashJoinExec.scala |  2 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  | 20 
 2 files changed, 21 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/57697535/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 0bc261d..69715ab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -257,8 +257,8 @@ case class BroadcastHashJoinExec(
   s"""
  |boolean $conditionPassed = true;
  |${eval.trim}
- |${ev.code}
  |if ($matched != null) {
+ |  ${ev.code}
  |  $conditionPassed = !${ev.isNull} && ${ev.value};
  |}
""".stripMargin

http://git-wip-us.apache.org/repos/asf/spark/blob/57697535/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 913b2ae..3416532 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
 import scala.language.existentials
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -24,6 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
 
 
 class JoinSuite extends QueryTest with SharedSQLContext {
@@ -604,4 +606,22 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 
 cartesianQueries.foreach(checkCartesianDetection)
   }
+
+  test("outer broadcast hash join should not throw NPE") {
+withTempView("v1", "v2") {
+  withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+Seq(2 -> 2).toDF("x", "y").createTempView("v1")
+
+spark.createDataFrame(
+  Seq(Row(1, "a")).asJava,
+  new StructType().add("i", "int", nullable = false).add("j", 
"string", nullable = false)
+).createTempView("v2")
+
+checkAnswer(
+  sql("select x, y, i, j from v1 left join v2 on x = i and y < 
length(j)"),
+  Row(2, 2, null, null)
+)
+  }
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21826][SQL][2.1][2.0] outer broadcast hash join should not throw NPE

2017-08-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 9f670ce5d -> bf1f30d7d


[SPARK-21826][SQL][2.1][2.0] outer broadcast hash join should not throw NPE

backport https://github.com/apache/spark/pull/19036 to branch 2.1 and 2.0

Author: Wenchen Fan 

Closes #19040 from cloud-fan/bug.

(cherry picked from commit 576975356357ead203e452d0d794794349ba4578)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf1f30d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf1f30d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf1f30d7

Branch: refs/heads/branch-2.0
Commit: bf1f30d7dcce81df6826d2630decfc8a93f5fa01
Parents: 9f670ce
Author: Wenchen Fan 
Authored: Thu Aug 24 10:36:37 2017 -0700
Committer: gatorsmile 
Committed: Thu Aug 24 10:39:40 2017 -0700

--
 .../execution/joins/BroadcastHashJoinExec.scala |  2 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  | 20 
 2 files changed, 21 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bf1f30d7/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 0f24baa..20b531b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -257,8 +257,8 @@ case class BroadcastHashJoinExec(
   s"""
  |boolean $conditionPassed = true;
  |${eval.trim}
- |${ev.code}
  |if ($matched != null) {
+ |  ${ev.code}
  |  $conditionPassed = !${ev.isNull} && ${ev.value};
  |}
""".stripMargin

http://git-wip-us.apache.org/repos/asf/spark/blob/bf1f30d7/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 44889d9..5e7c9f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
 import scala.language.existentials
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -24,6 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
 
 
 class JoinSuite extends QueryTest with SharedSQLContext {
@@ -573,4 +575,22 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 Row(3, 1) ::
 Row(3, 2) :: Nil)
   }
+
+  test("outer broadcast hash join should not throw NPE") {
+withTempView("v1", "v2") {
+  withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+Seq(2 -> 2).toDF("x", "y").createTempView("v1")
+
+spark.createDataFrame(
+  Seq(Row(1, "a")).asJava,
+  new StructType().add("i", "int", nullable = false).add("j", 
"string", nullable = false)
+).createTempView("v2")
+
+checkAnswer(
+  sql("select x, y, i, j from v1 left join v2 on x = i and y < 
length(j)"),
+  Row(2, 2, null, null)
+)
+  }
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF` and ` SO_SNDBUF` in SparkConf.

2017-08-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master d3abb3699 -> 763b83ee8


[SPARK-21701][CORE] Enable RPC client to use ` SO_RCVBUF` and ` SO_SNDBUF`  in 
SparkConf.

## What changes were proposed in this pull request?

TCP parameters like SO_RCVBUF and SO_SNDBUF can be set in SparkConf, and 
`org.apache.spark.network.server.TransportServe`r can use those parameters to 
build server by leveraging netty. But for TransportClientFactory, there is no 
such way to set those parameters from SparkConf. This could be inconsistent in 
server and client side when people set parameters in SparkConf. So this PR make 
RPC client to be enable to use those TCP parameters as well.

## How was this patch tested?

Existing tests.

Author: xu.zhang 

Closes #18964 from neoremind/add_client_param.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/763b83ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/763b83ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/763b83ee

Branch: refs/heads/master
Commit: 763b83ee84cbb6f263218c471dd9198dd6bee411
Parents: d3abb36
Author: xu.zhang 
Authored: Thu Aug 24 14:27:52 2017 -0700
Committer: Shixiong Zhu 
Committed: Thu Aug 24 14:27:52 2017 -0700

--
 .../apache/spark/network/client/TransportClientFactory.java  | 8 
 1 file changed, 8 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/763b83ee/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index b50e043..8add4e1 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -210,6 +210,14 @@ public class TransportClientFactory implements Closeable {
   .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
   .option(ChannelOption.ALLOCATOR, pooledAllocator);
 
+if (conf.receiveBuf() > 0) {
+  bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
+}
+
+if (conf.sendBuf() > 0) {
+  bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());
+}
+
 final AtomicReference clientRef = new AtomicReference<>();
 final AtomicReference channelRef = new AtomicReference<>();
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21830][SQL] Bump ANTLR version and fix a few issues.

2017-08-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 763b83ee8 -> 05af2de0f


[SPARK-21830][SQL] Bump ANTLR version and fix a few issues.

## What changes were proposed in this pull request?
This PR bumps the ANTLR version to 4.7, and fixes a number of small parser 
related issues uncovered by the bump.

The main reason for upgrading is that in some cases the current version of 
ANTLR (4.5) can exhibit exponential slowdowns if it needs to parse boolean 
predicates. For example the following query will take forever to parse:
```sql
SELECT *
FROM RANGE(1000)
WHERE
TRUE
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
AND NOT upper(DESCRIPTION) LIKE '%FOO%'
```

This is caused by a know bug in ANTLR 
(https://github.com/antlr/antlr4/issues/994), which was fixed in version 4.6.

## How was this patch tested?
Existing tests.

Author: Herman van Hovell 

Closes #19042 from hvanhovell/SPARK-21830.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05af2de0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05af2de0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05af2de0

Branch: refs/heads/master
Commit: 05af2de0fdce625041b99908adc320c576bac116
Parents: 763b83e
Author: Herman van Hovell 
Authored: Thu Aug 24 16:33:55 2017 -0700
Committer: gatorsmile 
Committed: Thu Aug 24 16:33:55 2017 -0700

--
 dev/deps/spark-deps-hadoop-2.6|  2 +-
 dev/deps/spark-deps-hadoop-2.7|  2 +-
 pom.xml   |  2 +-
 project/SparkBuild.scala  |  1 +
 .../org/apache/spark/sql/catalyst/parser/SqlBase.g4   |  6 +-
 .../apache/spark/sql/catalyst/parser/AstBuilder.scala |  4 
 .../spark/sql/catalyst/parser/ParseDriver.scala   |  2 +-
 .../sql/catalyst/parser/TableSchemaParserSuite.scala  | 14 --
 .../resources/sql-tests/results/show-tables.sql.out   |  4 ++--
 .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala|  2 +-
 10 files changed, 25 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/05af2de0/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 01af2c7..de17507 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -5,7 +5,7 @@ activation-1.1.1.jar
 aircompressor-0.3.jar
 antlr-2.7.7.jar
 antlr-runtime-3.4.jar
-antlr4-runtime-4.5.3.jar
+antlr4-runtime-4.7.jar
 aopalliance-1.0.jar
 aopalliance-repackaged-2.4.0-b34.jar
 apache-log4j-extras-1.2.17.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/05af2de0/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 69f3a4b..da826a7 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -5,7 +5,7 @@ activation-1.1.1.jar
 aircompressor-0.3.jar
 antlr-2.7.7.jar
 antlr-runtime-3.4.jar
-antlr4-runtime-4.5.3.jar
+antlr4-runtime-4.7.jar
 aopalliance-1.0.jar
 aopalliance-repackaged-2.4.0-b34.jar
 apache-log4j-extras-1.2.17.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/05af2de0/pom.xml
--
diff --git a/pom.xml b/pom.xml
index c0df3ef..8b4a6c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,7 +178,7 @@
 3.5.2
 1.3.9
 0.9.3
-4.5.3
+4.7
 1.1
 2.52.0
 2.6

http://git-wip-us.apache.org/repos/asf/spark/blob/05af2de0/project/SparkBuild.scala
--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 7565e14..18059ad 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -474,6 +474,7 @@ object OldDeps {
 
 object Catalyst {
   lazy val settings = antlr4Settings ++ Seq(
+antlr4Version in Antlr4 := "4.7",
 antlr4PackageName in Antlr4 := 
Some("org.apache.spark.sql.catalyst.parser"),
 antlr4GenListener in Antlr4 := true,
 antlr4GenVisitor in Antlr4 := true

http://g

spark git commit: [SPARK-21108][ML] convert LinearSVC to aggregator framework

2017-08-24 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 05af2de0f -> f3676d639


[SPARK-21108][ML] convert LinearSVC to aggregator framework

## What changes were proposed in this pull request?

convert LinearSVC to new aggregator framework

## How was this patch tested?

existing unit test.

Author: Yuhao Yang 

Closes #18315 from hhbyyh/svcAggregator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3676d63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3676d63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3676d63

Branch: refs/heads/master
Commit: f3676d63913e0706e071b71e1742b8d57b102fba
Parents: 05af2de
Author: Yuhao Yang 
Authored: Fri Aug 25 10:22:27 2017 +0800
Committer: Yanbo Liang 
Committed: Fri Aug 25 10:22:27 2017 +0800

--
 .../spark/ml/classification/LinearSVC.scala | 204 ++-
 .../ml/optim/aggregator/HingeAggregator.scala   | 105 ++
 .../ml/classification/LinearSVCSuite.scala  |   7 +-
 .../optim/aggregator/HingeAggregatorSuite.scala | 163 +++
 .../aggregator/LogisticAggregatorSuite.scala|   2 -
 5 files changed, 286 insertions(+), 195 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index 8d556de..3b0666c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -25,11 +25,11 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Experimental, Since}
-import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.linalg._
-import org.apache.spark.ml.linalg.BLAS._
+import org.apache.spark.ml.optim.aggregator.HingeAggregator
+import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
@@ -214,10 +214,20 @@ class LinearSVC @Since("2.2.0") (
   }
 
   val featuresStd = summarizer.variance.toArray.map(math.sqrt)
+  val getFeaturesStd = (j: Int) => featuresStd(j)
   val regParamL2 = $(regParam)
   val bcFeaturesStd = instances.context.broadcast(featuresStd)
-  val costFun = new LinearSVCCostFun(instances, $(fitIntercept),
-$(standardization), bcFeaturesStd, regParamL2, $(aggregationDepth))
+  val regularization = if (regParamL2 != 0.0) {
+val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
+Some(new L2Regularization(regParamL2, shouldApply,
+  if ($(standardization)) None else Some(getFeaturesStd)))
+  } else {
+None
+  }
+
+  val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, 
$(fitIntercept))(_)
+  val costFun = new RDDLossFunction(instances, getAggregatorFunc, 
regularization,
+$(aggregationDepth))
 
   def regParamL1Fun = (index: Int) => 0D
   val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, 
regParamL1Fun, $(tol))
@@ -372,189 +382,3 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] {
 }
   }
 }
-
-/**
- * LinearSVCCostFun implements Breeze's DiffFunction[T] for hinge loss function
- */
-private class LinearSVCCostFun(
-instances: RDD[Instance],
-fitIntercept: Boolean,
-standardization: Boolean,
-bcFeaturesStd: Broadcast[Array[Double]],
-regParamL2: Double,
-aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
-
-  override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
-val coeffs = Vectors.fromBreeze(coefficients)
-val bcCoeffs = instances.context.broadcast(coeffs)
-val featuresStd = bcFeaturesStd.value
-val numFeatures = featuresStd.length
-
-val svmAggregator = {
-  val seqOp = (c: LinearSVCAggregator, instance: Instance) => 
c.add(instance)
-  val combOp = (c1: LinearSVCAggregator, c2: LinearSVCAggregator) => 
c1.merge(c2)
-
-  instances.treeAggregate(
-new LinearSVCAggregator(bcCoeffs, bcFeaturesStd, fitIntercept)
-  )(seqOp, combOp, aggregationDepth)
-}
-
-val totalGradientArray = svmAggregator.gradient.toArray
-// regVal is the sum of coefficients squares excluding intercept for L2 
regularization.
-val regVal = if (regParamL2 == 0.0) {
-  0.0
-} else {
-  var sum = 0.0
-  coeffs.foreachActive { case (index, value) =>
-// We

spark git commit: [SPARK-21255][SQL][WIP] Fixed NPE when creating encoder for enum

2017-08-24 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master f3676d639 -> 7d16776d2


[SPARK-21255][SQL][WIP] Fixed NPE when creating encoder for enum

## What changes were proposed in this pull request?

Fixed NPE when creating encoder for enum.

When you try to create an encoder for Enum type (or bean with enum property) 
via Encoders.bean(...), it fails with NullPointerException at TypeToken:495.
I did a little research and it turns out, that in JavaTypeInference following 
code
```
  def getJavaBeanReadableProperties(beanClass: Class[_]): 
Array[PropertyDescriptor] = {
val beanInfo = Introspector.getBeanInfo(beanClass)
beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
  .filter(_.getReadMethod != null)
  }
```
filters out properties named "class", because we wouldn't want to serialize 
that. But enum types have another property of type Class named 
"declaringClass", which we are trying to inspect recursively. Eventually we try 
to inspect ClassLoader class, which has property "defaultAssertionStatus" with 
no read method, which leads to NPE at TypeToken:495.

I added property name "declaringClass" to filtering to resolve this.

## How was this patch tested?
Unit test in JavaDatasetSuite which creates an encoder for enum

Author: mike 
Author: Mikhail Sveshnikov 

Closes #18488 from mike0sv/enum-support.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d16776d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d16776d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d16776d

Branch: refs/heads/master
Commit: 7d16776d28da5bcf656f0d8556b15ed3a5edca44
Parents: f3676d6
Author: mike 
Authored: Fri Aug 25 07:22:34 2017 +0100
Committer: Sean Owen 
Committed: Fri Aug 25 07:22:34 2017 +0100

--
 .../spark/sql/catalyst/JavaTypeInference.scala  | 40 ++
 .../catalyst/encoders/ExpressionEncoder.scala   | 14 +++-
 .../catalyst/expressions/objects/objects.scala  |  4 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 77 
 4 files changed, 131 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7d16776d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 21363d3..33f6ce0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.objects._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, 
GenericArrayData}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
 
 /**
  * Type-inference utilities for POJOs and Java collections.
@@ -118,6 +119,10 @@ object JavaTypeInference {
 val (valueDataType, nullable) = inferDataType(valueType, seenTypeSet)
 (MapType(keyDataType, valueDataType, nullable), true)
 
+  case other if other.isEnum =>
+(StructType(Seq(StructField(typeToken.getRawType.getSimpleName,
+  StringType, nullable = false))), true)
+
   case other =>
 if (seenTypeSet.contains(other)) {
   throw new UnsupportedOperationException(
@@ -140,6 +145,7 @@ object JavaTypeInference {
   def getJavaBeanReadableProperties(beanClass: Class[_]): 
Array[PropertyDescriptor] = {
 val beanInfo = Introspector.getBeanInfo(beanClass)
 beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
+  .filterNot(_.getName == "declaringClass")
   .filter(_.getReadMethod != null)
   }
 
@@ -303,6 +309,11 @@ object JavaTypeInference {
   keyData :: valueData :: Nil,
   returnNullable = false)
 
+  case other if other.isEnum =>
+StaticInvoke(JavaTypeInference.getClass, ObjectType(other), 
"deserializeEnumName",
+  expressions.Literal.create(other.getEnumConstants.apply(0), 
ObjectType(other))
+:: getPath :: Nil)
+
   case other =>
 val properties = getJavaBeanReadableAndWritableProperties(other)
 val setters = properties.map { p =>
@@ -345,6 +356,30 @@ object JavaTypeInference {
 }
   }
 
+  /** Returns a mapping from enum value to int for given enum type */
+  def enumSerializer[T <: Enum[T]](enum: Class[T]): T => UTF8String = {
+assert(enum.isEnum)
+inputObject: T =>
+  UTF8String.fromString(inputObject.name())
+  }
+
+  /** Returns value index for given enum type and value */
+  def serializeEnumName[T <: Enum[T]](