svn commit: r26387 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_18_00_01-cce4694-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-18 Thread pwendell
Author: pwendell
Date: Wed Apr 18 07:17:58 2018
New Revision: 26387

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_18_00_01-cce4694 docs


[This commit notification would consist of 1457 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23926][SQL] Extending reverse function to support ArrayType arguments

2018-04-18 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master cce469435 -> f81fa478f


[SPARK-23926][SQL] Extending reverse function to support ArrayType arguments

## What changes were proposed in this pull request?

This PR extends `reverse` functions to be able to operate over array columns 
and covers:
- Introduction of `Reverse` expression that represents logic for reversing 
arrays and also strings
- Removal of `StringReverse` expression
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(1, 3, 4, 2),
  null
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(reverse($"i")).debugCodegen
```
Result:
```
/* 032 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */ ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */ null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */ boolean filter_value = true;
/* 037 */
/* 038 */ if (!(!inputadapter_isNull)) {
/* 039 */   filter_value = inputadapter_isNull;
/* 040 */ }
/* 041 */ if (!filter_value) continue;
/* 042 */
/* 043 */ ((org.apache.spark.sql.execution.metric.SQLMetric) 
references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */ boolean project_isNull = inputadapter_isNull;
/* 046 */ ArrayData project_value = null;
/* 047 */
/* 048 */ if (!inputadapter_isNull) {
/* 049 */   final int project_length = inputadapter_value.numElements();
/* 050 */   project_value = inputadapter_value.copy();
/* 051 */   for(int k = 0; k < project_length / 2; k++) {
/* 052 */ int l = project_length - k - 1;
/* 053 */ boolean isNullAtK = project_value.isNullAt(k);
/* 054 */ boolean isNullAtL = project_value.isNullAt(l);
/* 055 */ if(!isNullAtK) {
/* 056 */   int el = project_value.getInt(k);
/* 057 */   if(!isNullAtL) {
/* 058 */ project_value.setInt(k, project_value.getInt(l));
/* 059 */   } else {
/* 060 */ project_value.setNullAt(k);
/* 061 */   }
/* 062 */   project_value.setInt(l, el);
/* 063 */ } else if (!isNullAtL) {
/* 064 */   project_value.setInt(k, project_value.getInt(l));
/* 065 */   project_value.setNullAt(l);
/* 066 */ }
/* 067 */   }
/* 068 */
/* 069 */ }
```
### Non-primitive type
```
val df = Seq(
  Seq("a", "c", "d", "b"),
  null
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(reverse($"s")).debugCodegen
```
Result:
```
/* 032 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 033 */ ArrayData inputadapter_value = inputadapter_isNull ?
/* 034 */ null : (inputadapter_row.getArray(0));
/* 035 */
/* 036 */ boolean filter_value = true;
/* 037 */
/* 038 */ if (!(!inputadapter_isNull)) {
/* 039 */   filter_value = inputadapter_isNull;
/* 040 */ }
/* 041 */ if (!filter_value) continue;
/* 042 */
/* 043 */ ((org.apache.spark.sql.execution.metric.SQLMetric) 
references[0] /* numOutputRows */).add(1);
/* 044 */
/* 045 */ boolean project_isNull = inputadapter_isNull;
/* 046 */ ArrayData project_value = null;
/* 047 */
/* 048 */ if (!inputadapter_isNull) {
/* 049 */   final int project_length = inputadapter_value.numElements();
/* 050 */   project_value = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(new Object[project_length]);
/* 051 */   for(int k = 0; k < project_length; k++) {
/* 052 */ int l = project_length - k - 1;
/* 053 */ project_value.update(k, 
inputadapter_value.getUTF8String(l));
/* 054 */   }
/* 055 */
/* 056 */ }
```

Author: mn-mikke 

Closes #21034 from mn-mikke/feature/array-api-reverse-to-master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f81fa478
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f81fa478
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f81fa478

Branch: refs/heads/master
Commit: f81fa478ff990146e2a8e463ac252271448d96f5
Parents: cce4694
Author: mn-mikke 
Authored: Wed Apr 18 18:41:55 2018 +0900
Committer: Takuya UESHIN 
Committed: Wed Apr 18 18:41:55 2018 +0900

--
 python/pyspark/sql/functions.py | 20 -
 .../catalyst/analysis/FunctionRegistry.scala|  2 +-
 .../expressions/collectionOperations.scala  | 88 ++
 .../expressions/stringExpressions.scala | 20 -
 .../CollectionExpressionsSuite.scala| 44 +
 .../expressions/StringExpressionsSuite.scala|  6 +-
 .../scala/org/apache/spark/sql/functions.scala  | 15 ++--

svn commit: r26391 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_18_04_03-f81fa47-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-18 Thread pwendell
Author: pwendell
Date: Wed Apr 18 11:22:58 2018
New Revision: 26391

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_18_04_03-f81fa47 docs


[This commit notification would consist of 1457 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.

2018-04-18 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master f81fa478f -> f09a9e941


[SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a 
wrong result by codegen.

## What changes were proposed in this pull request?

`EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result 
by codegen.

```scala
scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF()
df: org.apache.spark.sql.DataFrame = [_1: double, _2: double]

scala> df.show()
+++
|  _1|  _2|
+++
|-1.0|null|
|null|-1.0|
+++

scala> df.filter("_1 <=> _2").show()
+++
|  _1|  _2|
+++
|-1.0|null|
|null|-1.0|
+++
```

The result should be empty but the result remains two rows.

## How was this patch tested?

Added a test.

Author: Takuya UESHIN 

Closes #21094 from ueshin/issues/SPARK-24007/equalnullsafe.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f09a9e94
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f09a9e94
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f09a9e94

Branch: refs/heads/master
Commit: f09a9e9418c1697d198de18f340b1288f5eb025c
Parents: f81fa47
Author: Takuya UESHIN 
Authored: Wed Apr 18 08:22:05 2018 -0700
Committer: gatorsmile 
Committed: Wed Apr 18 08:22:05 2018 -0700

--
 .../sql/catalyst/expressions/codegen/CodeGenerator.scala  | 6 --
 .../spark/sql/catalyst/expressions/PredicateSuite.scala   | 7 +++
 2 files changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f09a9e94/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index f6b6775..cf0a91f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -582,8 +582,10 @@ class CodegenContext {
*/
   def genEqual(dataType: DataType, c1: String, c2: String): String = dataType 
match {
 case BinaryType => s"java.util.Arrays.equals($c1, $c2)"
-case FloatType => s"(java.lang.Float.isNaN($c1) && 
java.lang.Float.isNaN($c2)) || $c1 == $c2"
-case DoubleType => s"(java.lang.Double.isNaN($c1) && 
java.lang.Double.isNaN($c2)) || $c1 == $c2"
+case FloatType =>
+  s"((java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == 
$c2)"
+case DoubleType =>
+  s"((java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 
== $c2)"
 case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2"
 case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)"
 case array: ArrayType => genComp(array, c1, c2) + " == 0"

http://git-wip-us.apache.org/repos/asf/spark/blob/f09a9e94/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
index 8a8f8e1..1bfd180 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
@@ -442,4 +442,11 @@ class PredicateSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 InSet(Literal(1), Set(1, 2, 3, 4)).genCode(ctx)
 assert(ctx.inlinedMutableStates.isEmpty)
   }
+
+  test("SPARK-24007: EqualNullSafe for FloatType and DoubleType might generate 
a wrong result") {
+checkEvaluation(EqualNullSafe(Literal(null, FloatType), Literal(-1.0f)), 
false)
+checkEvaluation(EqualNullSafe(Literal(-1.0f), Literal(null, FloatType)), 
false)
+checkEvaluation(EqualNullSafe(Literal(null, DoubleType), Literal(-1.0d)), 
false)
+checkEvaluation(EqualNullSafe(Literal(-1.0d), Literal(null, DoubleType)), 
false)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.

2018-04-18 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 6b99d5bc3 -> a1c56b669


[SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a 
wrong result by codegen.

## What changes were proposed in this pull request?

`EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result 
by codegen.

```scala
scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF()
df: org.apache.spark.sql.DataFrame = [_1: double, _2: double]

scala> df.show()
+++
|  _1|  _2|
+++
|-1.0|null|
|null|-1.0|
+++

scala> df.filter("_1 <=> _2").show()
+++
|  _1|  _2|
+++
|-1.0|null|
|null|-1.0|
+++
```

The result should be empty but the result remains two rows.

## How was this patch tested?

Added a test.

Author: Takuya UESHIN 

Closes #21094 from ueshin/issues/SPARK-24007/equalnullsafe.

(cherry picked from commit f09a9e9418c1697d198de18f340b1288f5eb025c)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1c56b66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1c56b66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1c56b66

Branch: refs/heads/branch-2.3
Commit: a1c56b66970a683e458e3f44fd6788110e869093
Parents: 6b99d5b
Author: Takuya UESHIN 
Authored: Wed Apr 18 08:22:05 2018 -0700
Committer: gatorsmile 
Committed: Wed Apr 18 08:22:16 2018 -0700

--
 .../sql/catalyst/expressions/codegen/CodeGenerator.scala  | 6 --
 .../spark/sql/catalyst/expressions/PredicateSuite.scala   | 7 +++
 2 files changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a1c56b66/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 504e851..9cf5839 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -764,8 +764,10 @@ class CodegenContext {
*/
   def genEqual(dataType: DataType, c1: String, c2: String): String = dataType 
match {
 case BinaryType => s"java.util.Arrays.equals($c1, $c2)"
-case FloatType => s"(java.lang.Float.isNaN($c1) && 
java.lang.Float.isNaN($c2)) || $c1 == $c2"
-case DoubleType => s"(java.lang.Double.isNaN($c1) && 
java.lang.Double.isNaN($c2)) || $c1 == $c2"
+case FloatType =>
+  s"((java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == 
$c2)"
+case DoubleType =>
+  s"((java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 
== $c2)"
 case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2"
 case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)"
 case array: ArrayType => genComp(array, c1, c2) + " == 0"

http://git-wip-us.apache.org/repos/asf/spark/blob/a1c56b66/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
index 8a8f8e1..1bfd180 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
@@ -442,4 +442,11 @@ class PredicateSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 InSet(Literal(1), Set(1, 2, 3, 4)).genCode(ctx)
 assert(ctx.inlinedMutableStates.isEmpty)
   }
+
+  test("SPARK-24007: EqualNullSafe for FloatType and DoubleType might generate 
a wrong result") {
+checkEvaluation(EqualNullSafe(Literal(null, FloatType), Literal(-1.0f)), 
false)
+checkEvaluation(EqualNullSafe(Literal(-1.0f), Literal(null, FloatType)), 
false)
+checkEvaluation(EqualNullSafe(Literal(null, DoubleType), Literal(-1.0d)), 
false)
+checkEvaluation(EqualNullSafe(Literal(-1.0d), Literal(null, DoubleType)), 
false)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.

2018-04-18 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 e957c4e88 -> a902323fb


[SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a 
wrong result by codegen.

`EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result 
by codegen.

```scala
scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF()
df: org.apache.spark.sql.DataFrame = [_1: double, _2: double]

scala> df.show()
+++
|  _1|  _2|
+++
|-1.0|null|
|null|-1.0|
+++

scala> df.filter("_1 <=> _2").show()
+++
|  _1|  _2|
+++
|-1.0|null|
|null|-1.0|
+++
```

The result should be empty but the result remains two rows.

Added a test.

Author: Takuya UESHIN 

Closes #21094 from ueshin/issues/SPARK-24007/equalnullsafe.

(cherry picked from commit f09a9e9418c1697d198de18f340b1288f5eb025c)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a902323f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a902323f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a902323f

Branch: refs/heads/branch-2.2
Commit: a902323fbf7be27a7ca747105eedd61b1d57b9d4
Parents: e957c4e
Author: Takuya UESHIN 
Authored: Wed Apr 18 08:22:05 2018 -0700
Committer: gatorsmile 
Committed: Wed Apr 18 08:23:46 2018 -0700

--
 .../sql/catalyst/expressions/codegen/CodeGenerator.scala  | 6 --
 .../spark/sql/catalyst/expressions/PredicateSuite.scala   | 7 +++
 2 files changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a902323f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 3964471..9e5eaf6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -482,8 +482,10 @@ class CodegenContext {
*/
   def genEqual(dataType: DataType, c1: String, c2: String): String = dataType 
match {
 case BinaryType => s"java.util.Arrays.equals($c1, $c2)"
-case FloatType => s"(java.lang.Float.isNaN($c1) && 
java.lang.Float.isNaN($c2)) || $c1 == $c2"
-case DoubleType => s"(java.lang.Double.isNaN($c1) && 
java.lang.Double.isNaN($c2)) || $c1 == $c2"
+case FloatType =>
+  s"((java.lang.Float.isNaN($c1) && java.lang.Float.isNaN($c2)) || $c1 == 
$c2)"
+case DoubleType =>
+  s"((java.lang.Double.isNaN($c1) && java.lang.Double.isNaN($c2)) || $c1 
== $c2)"
 case dt: DataType if isPrimitiveType(dt) => s"$c1 == $c2"
 case dt: DataType if dt.isInstanceOf[AtomicType] => s"$c1.equals($c2)"
 case array: ArrayType => genComp(array, c1, c2) + " == 0"

http://git-wip-us.apache.org/repos/asf/spark/blob/a902323f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
index bf3b184..15ae624 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala
@@ -340,4 +340,11 @@ class PredicateSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 val infinity = Literal(Double.PositiveInfinity)
 checkEvaluation(EqualTo(infinity, infinity), true)
   }
+
+  test("SPARK-24007: EqualNullSafe for FloatType and DoubleType might generate 
a wrong result") {
+checkEvaluation(EqualNullSafe(Literal(null, FloatType), Literal(-1.0f)), 
false)
+checkEvaluation(EqualNullSafe(Literal(-1.0f), Literal(null, FloatType)), 
false)
+checkEvaluation(EqualNullSafe(Literal(null, DoubleType), Literal(-1.0d)), 
false)
+checkEvaluation(EqualNullSafe(Literal(-1.0d), Literal(null, DoubleType)), 
false)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table

2018-04-18 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 a1c56b669 -> 5bcb7bdcc


[SPARK-23963][SQL] Properly handle large number of columns in query on 
text-based Hive table

## What changes were proposed in this pull request?

TableReader would get disproportionately slower as the number of columns in the 
query increased.

I fixed the way TableReader was looking up metadata for each column in the row. 
Previously, it had been looking up this data in linked lists, accessing each 
linked list by an index (column number). Now it looks up this data in arrays, 
where indexing by column number works better.

## How was this patch tested?

Manual testing
All sbt unit tests
python sql tests

Author: Bruce Robbins 

Closes #21043 from bersprockets/tabreadfix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5bcb7bdc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5bcb7bdc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5bcb7bdc

Branch: refs/heads/branch-2.3
Commit: 5bcb7bdccf967ff5ad3d8c76f4ad8c9c4031e7c2
Parents: a1c56b6
Author: Bruce Robbins 
Authored: Fri Apr 13 14:05:04 2018 -0700
Committer: gatorsmile 
Committed: Wed Apr 18 09:48:49 2018 -0700

--
 .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5bcb7bdc/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index cc8907a..b5444a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -381,7 +381,7 @@ private[hive] object HadoopTableReader extends 
HiveInspectors with Logging {
 
 val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, 
ordinal) =>
   soi.getStructFieldRef(attr.name) -> ordinal
-}.unzip
+}.toArray.unzip
 
 /**
  * Builds specific unwrappers ahead of time according to object inspector


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table

2018-04-18 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 a902323fb -> 041aec4e1


[SPARK-23963][SQL] Properly handle large number of columns in query on 
text-based Hive table

## What changes were proposed in this pull request?

TableReader would get disproportionately slower as the number of columns in the 
query increased.

I fixed the way TableReader was looking up metadata for each column in the row. 
Previously, it had been looking up this data in linked lists, accessing each 
linked list by an index (column number). Now it looks up this data in arrays, 
where indexing by column number works better.

## How was this patch tested?

Manual testing
All sbt unit tests
python sql tests

Author: Bruce Robbins 

Closes #21043 from bersprockets/tabreadfix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/041aec4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/041aec4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/041aec4e

Branch: refs/heads/branch-2.2
Commit: 041aec4e1bfb4f3c2d4db6761486f3523102c75e
Parents: a902323
Author: Bruce Robbins 
Authored: Fri Apr 13 14:05:04 2018 -0700
Committer: gatorsmile 
Committed: Wed Apr 18 09:50:13 2018 -0700

--
 .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/041aec4e/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index a0e379f..11795ff 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -381,7 +381,7 @@ private[hive] object HadoopTableReader extends 
HiveInspectors with Logging {
 
 val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, 
ordinal) =>
   soi.getStructFieldRef(attr.name) -> ordinal
-}.unzip
+}.toArray.unzip
 
 /**
  * Builds specific unwrappers ahead of time according to object inspector


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26400 - in /dev/spark/2.3.1-SNAPSHOT-2018_04_18_10_01-5bcb7bd-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-18 Thread pwendell
Author: pwendell
Date: Wed Apr 18 17:16:04 2018
New Revision: 26400

Log:
Apache Spark 2.3.1-SNAPSHOT-2018_04_18_10_01-5bcb7bd docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26402 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_18_12_02-f09a9e9-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-18 Thread pwendell
Author: pwendell
Date: Wed Apr 18 19:16:09 2018
New Revision: 26402

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_18_12_02-f09a9e9 docs


[This commit notification would consist of 1457 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23875][SQL][FOLLOWUP] Add IndexedSeq wrapper for ArrayData

2018-04-18 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master f09a9e941 -> a9066478f


[SPARK-23875][SQL][FOLLOWUP] Add IndexedSeq wrapper for ArrayData

## What changes were proposed in this pull request?

Use specified accessor in `ArrayData.foreach` and `toArray`.

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh 

Closes #21099 from viirya/SPARK-23875-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9066478
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9066478
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9066478

Branch: refs/heads/master
Commit: a9066478f6d98c3ae634c3bb9b09ee20bd60e111
Parents: f09a9e9
Author: Liang-Chi Hsieh 
Authored: Thu Apr 19 00:05:47 2018 +0200
Committer: Herman van Hovell 
Committed: Thu Apr 19 00:05:47 2018 +0200

--
 .../scala/org/apache/spark/sql/catalyst/util/ArrayData.scala  | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a9066478/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala
index 2cf59d5..104b428 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala
@@ -141,28 +141,29 @@ abstract class ArrayData extends SpecializedGetters with 
Serializable {
 
   def toArray[T: ClassTag](elementType: DataType): Array[T] = {
 val size = numElements()
+val accessor = InternalRow.getAccessor(elementType)
 val values = new Array[T](size)
 var i = 0
 while (i < size) {
   if (isNullAt(i)) {
 values(i) = null.asInstanceOf[T]
   } else {
-values(i) = get(i, elementType).asInstanceOf[T]
+values(i) = accessor(this, i).asInstanceOf[T]
   }
   i += 1
 }
 values
   }
 
-  // todo: specialize this.
   def foreach(elementType: DataType, f: (Int, Any) => Unit): Unit = {
 val size = numElements()
+val accessor = InternalRow.getAccessor(elementType)
 var i = 0
 while (i < size) {
   if (isNullAt(i)) {
 f(i, null)
   } else {
-f(i, get(i, elementType))
+f(i, accessor(this, i))
   }
   i += 1
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26405 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_18_16_01-a906647-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-18 Thread pwendell
Author: pwendell
Date: Wed Apr 18 23:16:22 2018
New Revision: 26405

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_18_16_01-a906647 docs


[This commit notification would consist of 1457 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky

2018-04-18 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master a9066478f -> 0c94e48bc


[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky

## What changes were proposed in this pull request?

DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays 
sometimes in an infinite loop and times out the build.

There were multiple issues with the test:

1. The first valid stageId is zero when the test started alone and not in a 
suite and the following code waits until timeout:

```
eventually(timeout(10.seconds), interval(1.millis)) {
  assert(DataFrameRangeSuite.stageToKill > 0)
}
```

2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread 
after the reset which ended up in canceling the same stage 2 times. This caused 
the infinite wait.

This PR solves this mentioned flakyness by removing the shared 
`DataFrameRangeSuite.stageToKill` and using `wait` and `CountDownLatch` for 
synhronization.

## How was this patch tested?

Existing unit test.

Author: Gabor Somogyi 

Closes #20888 from gaborgsomogyi/SPARK-23775.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c94e48b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c94e48b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c94e48b

Branch: refs/heads/master
Commit: 0c94e48bc50717e1627c0d2acd5382d9adc73c97
Parents: a906647
Author: Gabor Somogyi 
Authored: Wed Apr 18 16:37:41 2018 -0700
Committer: Marcelo Vanzin 
Committed: Wed Apr 18 16:37:41 2018 -0700

--
 .../apache/spark/sql/DataFrameRangeSuite.scala  | 78 +++-
 1 file changed, 45 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c94e48b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 57a930d..a0fd740 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.sql
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
 import scala.concurrent.duration._
 import scala.math.abs
 import scala.util.Random
 
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkException, TaskContext}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -152,39 +154,53 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
   }
 
   test("Cancelling stage in a query with Range.") {
-val listener = new SparkListener {
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-eventually(timeout(10.seconds), interval(1.millis)) {
-  assert(DataFrameRangeSuite.stageToKill > 0)
+// Save and restore the value because SparkContext is shared
+val savedInterruptOnCancel = sparkContext
+  .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
+
+try {
+  
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, 
"true")
+
+  for (codegen <- Seq(true, false)) {
+// This countdown latch used to make sure with all the stages 
cancelStage called in listener
+val latch = new CountDownLatch(2)
+
+val listener = new SparkListener {
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+sparkContext.cancelStage(taskStart.stageId)
+latch.countDown()
+  }
 }
-sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
-  }
-}
 
-sparkContext.addSparkListener(listener)
-for (codegen <- Seq(true, false)) {
-  withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
-DataFrameRangeSuite.stageToKill = -1
-val ex = intercept[SparkException] {
-  spark.range(0, 1000L, 1, 1).map { x =>
-DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
-x
-  }.toDF("id").agg(sum("id")).collect()
+sparkContext.addSparkListener(listener)
+withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
+  val ex = intercept[SparkException] {
+sparkContext.range(0, 1L, numSlices = 10).mapPartitions { x =>
+  x.synchronized {
+x.wait()
+  

spark git commit: [SPARK-23775][TEST] Make DataFrameRangeSuite not flaky

2018-04-18 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 5bcb7bdcc -> 130641102


[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky

## What changes were proposed in this pull request?

DataFrameRangeSuite.test("Cancelling stage in a query with Range.") stays 
sometimes in an infinite loop and times out the build.

There were multiple issues with the test:

1. The first valid stageId is zero when the test started alone and not in a 
suite and the following code waits until timeout:

```
eventually(timeout(10.seconds), interval(1.millis)) {
  assert(DataFrameRangeSuite.stageToKill > 0)
}
```

2. The `DataFrameRangeSuite.stageToKill` was overwritten by the task's thread 
after the reset which ended up in canceling the same stage 2 times. This caused 
the infinite wait.

This PR solves this mentioned flakyness by removing the shared 
`DataFrameRangeSuite.stageToKill` and using `wait` and `CountDownLatch` for 
synhronization.

## How was this patch tested?

Existing unit test.

Author: Gabor Somogyi 

Closes #20888 from gaborgsomogyi/SPARK-23775.

(cherry picked from commit 0c94e48bc50717e1627c0d2acd5382d9adc73c97)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13064110
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13064110
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13064110

Branch: refs/heads/branch-2.3
Commit: 130641102ceecf2a795d7f0dc6412c7e56eb03a8
Parents: 5bcb7bd
Author: Gabor Somogyi 
Authored: Wed Apr 18 16:37:41 2018 -0700
Committer: Marcelo Vanzin 
Committed: Wed Apr 18 16:37:52 2018 -0700

--
 .../apache/spark/sql/DataFrameRangeSuite.scala  | 78 +++-
 1 file changed, 45 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/13064110/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 57a930d..a0fd740 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.sql
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
 import scala.concurrent.duration._
 import scala.math.abs
 import scala.util.Random
 
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkException, TaskContext}
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -152,39 +154,53 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
   }
 
   test("Cancelling stage in a query with Range.") {
-val listener = new SparkListener {
-  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
-eventually(timeout(10.seconds), interval(1.millis)) {
-  assert(DataFrameRangeSuite.stageToKill > 0)
+// Save and restore the value because SparkContext is shared
+val savedInterruptOnCancel = sparkContext
+  .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
+
+try {
+  
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, 
"true")
+
+  for (codegen <- Seq(true, false)) {
+// This countdown latch used to make sure with all the stages 
cancelStage called in listener
+val latch = new CountDownLatch(2)
+
+val listener = new SparkListener {
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+sparkContext.cancelStage(taskStart.stageId)
+latch.countDown()
+  }
 }
-sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
-  }
-}
 
-sparkContext.addSparkListener(listener)
-for (codegen <- Seq(true, false)) {
-  withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
-DataFrameRangeSuite.stageToKill = -1
-val ex = intercept[SparkException] {
-  spark.range(0, 1000L, 1, 1).map { x =>
-DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
-x
-  }.toDF("id").agg(sum("id")).collect()
+sparkContext.addSparkListener(listener)
+withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
+  val ex = intercept[SparkException] {
+sparkContext.range(0, 1L, n

spark git commit: [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener

2018-04-18 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 130641102 -> 32bec6ca3


[SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener

## What changes were proposed in this pull request?

The `StreamingListener` in PySpark side seems to be lack of 
`onStreamingStarted` method. This patch adds it and a test for it.

This patch also includes a trivial doc improvement for `createDirectStream`.

Original PR is #21057.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh 

Closes #21098 from viirya/SPARK-24014.

(cherry picked from commit 8bb0df2c65355dfdcd28e362ff661c6c7ebc99c0)
Signed-off-by: jerryshao 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32bec6ca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32bec6ca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32bec6ca

Branch: refs/heads/branch-2.3
Commit: 32bec6ca3d9e47587c84f928d4166475fe29f596
Parents: 1306411
Author: Liang-Chi Hsieh 
Authored: Thu Apr 19 10:00:57 2018 +0800
Committer: jerryshao 
Committed: Thu Apr 19 10:01:13 2018 +0800

--
 python/pyspark/streaming/kafka.py| 3 ++-
 python/pyspark/streaming/listener.py | 6 ++
 python/pyspark/streaming/tests.py| 7 +++
 3 files changed, 15 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/kafka.py
--
diff --git a/python/pyspark/streaming/kafka.py 
b/python/pyspark/streaming/kafka.py
index fdb9308..ed2e0e7 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -104,7 +104,8 @@ class KafkaUtils(object):
 :param topics:  list of topic_name to consume.
 :param kafkaParams: Additional params for Kafka.
 :param fromOffsets: Per-topic/partition Kafka offsets defining the 
(inclusive) starting
-point of the stream.
+point of the stream (a dictionary mapping 
`TopicAndPartition` to
+integers).
 :param keyDecoder:  A function used to decode key (default is 
utf8_decoder).
 :param valueDecoder:  A function used to decode value (default is 
utf8_decoder).
 :param messageHandler: A function used to convert 
KafkaMessageAndMetadata. You can assess

http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/listener.py
--
diff --git a/python/pyspark/streaming/listener.py 
b/python/pyspark/streaming/listener.py
index b830797..d4ecc21 100644
--- a/python/pyspark/streaming/listener.py
+++ b/python/pyspark/streaming/listener.py
@@ -23,6 +23,12 @@ class StreamingListener(object):
 def __init__(self):
 pass
 
+def onStreamingStarted(self, streamingStarted):
+"""
+Called when the streaming has been started.
+"""
+pass
+
 def onReceiverStarted(self, receiverStarted):
 """
 Called when a receiver has been started

http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index ca28c9b..1ec418a 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -507,6 +507,10 @@ class StreamingListenerTests(PySparkStreamingTestCase):
 self.batchInfosCompleted = []
 self.batchInfosStarted = []
 self.batchInfosSubmitted = []
+self.streamingStartedTime = []
+
+def onStreamingStarted(self, streamingStarted):
+self.streamingStartedTime.append(streamingStarted.time)
 
 def onBatchSubmitted(self, batchSubmitted):
 self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
@@ -530,9 +534,12 @@ class StreamingListenerTests(PySparkStreamingTestCase):
 batchInfosSubmitted = batch_collector.batchInfosSubmitted
 batchInfosStarted = batch_collector.batchInfosStarted
 batchInfosCompleted = batch_collector.batchInfosCompleted
+streamingStartedTime = batch_collector.streamingStartedTime
 
 self.wait_for(batchInfosCompleted, 4)
 
+self.assertEqual(len(streamingStartedTime), 1)
+
 self.assertGreaterEqual(len(batchInfosSubmitted), 4)
 for info in batchInfosSubmitted:
 self.assertGreaterEqual(info.batchTime().milliseconds(), 0)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener

2018-04-18 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master 0c94e48bc -> 8bb0df2c6


[SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener

## What changes were proposed in this pull request?

The `StreamingListener` in PySpark side seems to be lack of 
`onStreamingStarted` method. This patch adds it and a test for it.

This patch also includes a trivial doc improvement for `createDirectStream`.

Original PR is #21057.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh 

Closes #21098 from viirya/SPARK-24014.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bb0df2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bb0df2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bb0df2c

Branch: refs/heads/master
Commit: 8bb0df2c65355dfdcd28e362ff661c6c7ebc99c0
Parents: 0c94e48
Author: Liang-Chi Hsieh 
Authored: Thu Apr 19 10:00:57 2018 +0800
Committer: jerryshao 
Committed: Thu Apr 19 10:00:57 2018 +0800

--
 python/pyspark/streaming/kafka.py| 3 ++-
 python/pyspark/streaming/listener.py | 6 ++
 python/pyspark/streaming/tests.py| 7 +++
 3 files changed, 15 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/kafka.py
--
diff --git a/python/pyspark/streaming/kafka.py 
b/python/pyspark/streaming/kafka.py
index fdb9308..ed2e0e7 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -104,7 +104,8 @@ class KafkaUtils(object):
 :param topics:  list of topic_name to consume.
 :param kafkaParams: Additional params for Kafka.
 :param fromOffsets: Per-topic/partition Kafka offsets defining the 
(inclusive) starting
-point of the stream.
+point of the stream (a dictionary mapping 
`TopicAndPartition` to
+integers).
 :param keyDecoder:  A function used to decode key (default is 
utf8_decoder).
 :param valueDecoder:  A function used to decode value (default is 
utf8_decoder).
 :param messageHandler: A function used to convert 
KafkaMessageAndMetadata. You can assess

http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/listener.py
--
diff --git a/python/pyspark/streaming/listener.py 
b/python/pyspark/streaming/listener.py
index b830797..d4ecc21 100644
--- a/python/pyspark/streaming/listener.py
+++ b/python/pyspark/streaming/listener.py
@@ -23,6 +23,12 @@ class StreamingListener(object):
 def __init__(self):
 pass
 
+def onStreamingStarted(self, streamingStarted):
+"""
+Called when the streaming has been started.
+"""
+pass
+
 def onReceiverStarted(self, receiverStarted):
 """
 Called when a receiver has been started

http://git-wip-us.apache.org/repos/asf/spark/blob/8bb0df2c/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 7dde7c0..1039409 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -507,6 +507,10 @@ class StreamingListenerTests(PySparkStreamingTestCase):
 self.batchInfosCompleted = []
 self.batchInfosStarted = []
 self.batchInfosSubmitted = []
+self.streamingStartedTime = []
+
+def onStreamingStarted(self, streamingStarted):
+self.streamingStartedTime.append(streamingStarted.time)
 
 def onBatchSubmitted(self, batchSubmitted):
 self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
@@ -530,9 +534,12 @@ class StreamingListenerTests(PySparkStreamingTestCase):
 batchInfosSubmitted = batch_collector.batchInfosSubmitted
 batchInfosStarted = batch_collector.batchInfosStarted
 batchInfosCompleted = batch_collector.batchInfosCompleted
+streamingStartedTime = batch_collector.streamingStartedTime
 
 self.wait_for(batchInfosCompleted, 4)
 
+self.assertEqual(len(streamingStartedTime), 1)
+
 self.assertGreaterEqual(len(batchInfosSubmitted), 4)
 for info in batchInfosSubmitted:
 self.assertGreaterEqual(info.batchTime().milliseconds(), 0)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23919][SQL] Add array_position function

2018-04-18 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master 8bb0df2c6 -> d5bec48b9


[SPARK-23919][SQL] Add array_position function

## What changes were proposed in this pull request?

The PR adds the SQL function `array_position`. The behavior of the function is 
based on Presto's one.

The function returns the position of the first occurrence of the element in 
array x (or 0 if not found) using 1-based index as BigInt.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki 

Closes #21037 from kiszk/SPARK-23919.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5bec48b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5bec48b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5bec48b

Branch: refs/heads/master
Commit: d5bec48b9cb225c19b43935c07b24090c51cacce
Parents: 8bb0df2
Author: Kazuaki Ishizaki 
Authored: Thu Apr 19 11:59:17 2018 +0900
Committer: Takuya UESHIN 
Committed: Thu Apr 19 11:59:17 2018 +0900

--
 python/pyspark/sql/functions.py | 17 ++
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../expressions/collectionOperations.scala  | 56 
 .../CollectionExpressionsSuite.scala| 22 
 .../scala/org/apache/spark/sql/functions.scala  | 14 +
 .../spark/sql/DataFrameFunctionsSuite.scala | 34 
 6 files changed, 144 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d5bec48b/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index d3bb0a5..36dcabc 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1845,6 +1845,23 @@ def array_contains(col, value):
 return Column(sc._jvm.functions.array_contains(_to_java_column(col), 
value))
 
 
+@since(2.4)
+def array_position(col, value):
+"""
+Collection function: Locates the position of the first occurrence of the 
given value
+in the given array. Returns null if either of the arguments are null.
+
+.. note:: The position is not zero based, but 1 based index. Returns 0 if 
the given
+value could not be found in the array.
+
+>>> df = spark.createDataFrame([(["c", "b", "a"],), ([],)], ['data'])
+>>> df.select(array_position(df.data, "a")).collect()
+[Row(array_position(data, a)=3), Row(array_position(data, a)=0)]
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.array_position(_to_java_column(col), 
value))
+
+
 @since(1.4)
 def explode(col):
 """Returns a new row for each element in the given array or map.

http://git-wip-us.apache.org/repos/asf/spark/blob/d5bec48b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 38c874a..74095fe 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -402,6 +402,7 @@ object FunctionRegistry {
 // collection functions
 expression[CreateArray]("array"),
 expression[ArrayContains]("array_contains"),
+expression[ArrayPosition]("array_position"),
 expression[CreateMap]("map"),
 expression[CreateNamedStruct]("named_struct"),
 expression[MapKeys]("map_keys"),

http://git-wip-us.apache.org/repos/asf/spark/blob/d5bec48b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 76b71f5..e6a05f5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -505,3 +505,59 @@ case class ArrayMax(child: Expression) extends 
UnaryExpression with ImplicitCast
 
   override def prettyName: String = "array_max"
 }
+
+
+/**
+ * Returns the position of the first occurrence of element in the given array 
as long.
+ * Returns 0 if the given value could not be found in the array. Returns null 
if either of
+ * the arguments are null
+ *
+ * NOTE: that this is not zero based, but 1-based index. The first element in 

svn commit: r26412 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_18_20_01-d5bec48-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-18 Thread pwendell
Author: pwendell
Date: Thu Apr 19 03:16:17 2018
New Revision: 26412

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_18_20_01-d5bec48 docs


[This commit notification would consist of 1457 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26415 - in /dev/spark/2.3.1-SNAPSHOT-2018_04_18_22_01-32bec6c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-18 Thread pwendell
Author: pwendell
Date: Thu Apr 19 05:17:07 2018
New Revision: 26415

Log:
Apache Spark 2.3.1-SNAPSHOT-2018_04_18_22_01-32bec6c docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org