spark git commit: [SPARK-23918][SQL] Add array_min function

2018-04-17 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master fd990a908 -> 14844a62c


[SPARK-23918][SQL] Add array_min function

## What changes were proposed in this pull request?

The PR adds the SQL function `array_min`. It takes an array as argument and 
returns the minimum value in it.

## How was this patch tested?

added UTs

Author: Marco Gaido 

Closes #21025 from mgaido91/SPARK-23918.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14844a62
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14844a62
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14844a62

Branch: refs/heads/master
Commit: 14844a62c025e7299029d7452b8c4003bc221ac8
Parents: fd990a9
Author: Marco Gaido 
Authored: Tue Apr 17 17:55:35 2018 +0900
Committer: Takuya UESHIN 
Committed: Tue Apr 17 17:55:35 2018 +0900

--
 python/pyspark/sql/functions.py | 17 +-
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../sql/catalyst/expressions/arithmetic.scala   |  6 +-
 .../expressions/codegen/CodeGenerator.scala | 17 ++
 .../expressions/collectionOperations.scala  | 64 
 .../CollectionExpressionsSuite.scala| 10 +++
 .../scala/org/apache/spark/sql/functions.scala  |  8 +++
 .../spark/sql/DataFrameFunctionsSuite.scala | 14 +
 8 files changed, 131 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14844a62/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index f3492ae..6ca22b6 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2081,6 +2081,21 @@ def size(col):
 
 
 @since(2.4)
+def array_min(col):
+"""
+Collection function: returns the minimum value of the array.
+
+:param col: name of column or expression
+
+>>> df = spark.createDataFrame([([2, 1, 3],), ([None, 10, -1],)], ['data'])
+>>> df.select(array_min(df.data).alias('min')).collect()
+[Row(min=1), Row(min=-1)]
+"""
+sc = SparkContext._active_spark_context
+return Column(sc._jvm.functions.array_min(_to_java_column(col)))
+
+
+@since(2.4)
 def array_max(col):
 """
 Collection function: returns the maximum value of the array.
@@ -2108,7 +2123,7 @@ def sort_array(col, asc=True):
 [Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])]
 >>> df.select(sort_array(df.data, asc=False).alias('r')).collect()
 [Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])]
- """
+"""
 sc = SparkContext._active_spark_context
 return Column(sc._jvm.functions.sort_array(_to_java_column(col), asc))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14844a62/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 05bfa2d..4dd1ca5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -409,6 +409,7 @@ object FunctionRegistry {
 expression[MapValues]("map_values"),
 expression[Size]("size"),
 expression[SortArray]("sort_array"),
+expression[ArrayMin]("array_min"),
 expression[ArrayMax]("array_max"),
 CreateStruct.registryEntry,
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14844a62/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 942dfd4..d4e322d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -595,11 +595,7 @@ case class Least(children: Seq[Expression]) extends 
Expression {
 val evals = evalChildren.map(eval =>
   s"""
  |${eval.code}
- |if (!${eval.isNull} && (${ev.isNull} ||
- |  ${ctx.genGreater(dataType, ev.value, eval.value)})) {
- |  ${ev.isNull} = false;
- |  ${ev.value} = ${eval.value};
- |}
+ |${ctx.reassignIfSmaller(dataType, ev, eval)}
   """.stripMargin
 )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14844a62/sql/catalyst/src/main/scala/org/apache/spark/sql/cat

spark git commit: [SPARK-23687][SS] Add a memory source for continuous processing.

2018-04-17 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 14844a62c -> 1cc66a072


[SPARK-23687][SS] Add a memory source for continuous processing.

## What changes were proposed in this pull request?

Add a memory source for continuous processing.

Note that only one of the ContinuousSuite tests is migrated to minimize the 
diff here. I'll submit a second PR for SPARK-23688 to change the rest and get 
rid of waitForRateSourceTriggers.

## How was this patch tested?

unit test

Author: Jose Torres 

Closes #20828 from jose-torres/continuousMemory.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cc66a07
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cc66a07
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cc66a07

Branch: refs/heads/master
Commit: 1cc66a072b7fd3bf140fa41596f6b18f8d1bd7b9
Parents: 14844a6
Author: Jose Torres 
Authored: Tue Apr 17 01:59:38 2018 -0700
Committer: Tathagata Das 
Committed: Tue Apr 17 01:59:38 2018 -0700

--
 .../continuous/ContinuousExecution.scala|   5 +-
 .../spark/sql/execution/streaming/memory.scala  |  59 --
 .../sources/ContinuousMemoryStream.scala| 211 +++
 .../apache/spark/sql/streaming/StreamTest.scala |   4 +-
 .../streaming/continuous/ContinuousSuite.scala  |  31 ++-
 5 files changed, 266 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1cc66a07/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 1758b38..951d694 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SQLExecution
 import 
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, 
WriteToDataSourceV2}
 import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
PartitionOffset}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
@@ -317,8 +318,10 @@ class ContinuousExecution(
 synchronized {
   if (queryExecutionThread.isAlive) {
 commitLog.add(epoch)
-val offset = offsetLog.get(epoch).get.offsets(0).get
+val offset =
+  
continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
 committedOffsets ++= Seq(continuousSources(0) -> offset)
+
continuousSources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset])
   } else {
 return
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1cc66a07/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 352d4ce..628923d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -24,17 +24,19 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, 
SupportsScanUnsafeRow}
 import org.apache.spark.sql.sources.v2.reader.streaming.{

spark git commit: [SPARK-23747][STRUCTURED STREAMING] Add EpochCoordinator unit tests

2018-04-17 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 1cc66a072 -> 05ae74778


[SPARK-23747][STRUCTURED STREAMING] Add EpochCoordinator unit tests

## What changes were proposed in this pull request?

Unit tests for EpochCoordinator that test correct sequencing of committed 
epochs. Several tests are ignored since they test functionality implemented in 
SPARK-23503 which is not yet merged, otherwise they fail.

Author: Efim Poberezkin 

Closes #20983 from efimpoberezkin/pr/EpochCoordinator-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05ae7477
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05ae7477
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05ae7477

Branch: refs/heads/master
Commit: 05ae74778a10fbdd7f2cbf7742de7855966b7d35
Parents: 1cc66a0
Author: Efim Poberezkin 
Authored: Tue Apr 17 04:13:17 2018 -0700
Committer: Tathagata Das 
Committed: Tue Apr 17 04:13:17 2018 -0700

--
 .../continuous/EpochCoordinatorSuite.scala  | 224 +++
 1 file changed, 224 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/05ae7477/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
new file mode 100644
index 000..99e3056
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.continuous
+
+import org.mockito.InOrder
+import org.mockito.Matchers.{any, eq => eqTo}
+import org.mockito.Mockito._
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.LocalSparkSession
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.test.TestSparkSession
+
+class EpochCoordinatorSuite
+  extends SparkFunSuite
+with LocalSparkSession
+with MockitoSugar
+with BeforeAndAfterEach {
+
+  private var epochCoordinator: RpcEndpointRef = _
+
+  private var writer: StreamWriter = _
+  private var query: ContinuousExecution = _
+  private var orderVerifier: InOrder = _
+
+  override def beforeEach(): Unit = {
+val reader = mock[ContinuousReader]
+writer = mock[StreamWriter]
+query = mock[ContinuousExecution]
+orderVerifier = inOrder(writer, query)
+
+spark = new TestSparkSession()
+
+epochCoordinator
+  = EpochCoordinatorRef.create(writer, reader, query, "test", 1, spark, 
SparkEnv.get)
+  }
+
+  test("single epoch") {
+setWriterPartitions(3)
+setReaderPartitions(2)
+
+commitPartitionEpoch(0, 1)
+commitPartitionEpoch(1, 1)
+commitPartitionEpoch(2, 1)
+reportPartitionOffset(0, 1)
+reportPartitionOffset(1, 1)
+
+// Here and in subsequent tests this is called to make a synchronous call 
to EpochCoordinator
+// so that mocks would have been acted upon by the time verification 
happens
+makeSynchronousCall()
+
+verifyCommit(1)
+  }
+
+  test("single epoch, all but one writer partition has committed") {
+setWriterPartitions(3)
+setReaderPartitions(2)
+
+commitPartitionEpoch(0, 1)
+commitPartitionEpoch(1, 1)
+reportPartitionOffset(0, 1)
+reportPartitionOffset(1, 1)
+
+makeSynchronousCall()
+
+verifyNoCommitFor(1)
+  }
+
+  test("single epoch, all but one reader partition has reported an offset") {
+setWriterPartitions(3)
+setReaderPartition

svn commit: r26374 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_17_04_04-1cc66a0-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-17 Thread pwendell
Author: pwendell
Date: Tue Apr 17 11:22:26 2018
New Revision: 26374

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_17_04_04-1cc66a0 docs


[This commit notification would consist of 1457 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23875][SQL] Add IndexedSeq wrapper for ArrayData

2018-04-17 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 05ae74778 -> 30ffb53ca


[SPARK-23875][SQL] Add IndexedSeq wrapper for ArrayData

## What changes were proposed in this pull request?

We don't have a good way to sequentially access `UnsafeArrayData` with a common 
interface such as `Seq`. An example is `MapObject` where we need to access 
several sequence collection types together. But `UnsafeArrayData` doesn't 
implement `ArrayData.array`. Calling `toArray` will copy the entire array. We 
can provide an `IndexedSeq` wrapper for `ArrayData`, so we can avoid copying 
the entire array.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh 

Closes #20984 from viirya/SPARK-23875.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30ffb53c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30ffb53c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30ffb53c

Branch: refs/heads/master
Commit: 30ffb53cad84283b4f7694bfd60bdd7e1101b04e
Parents: 05ae747
Author: Liang-Chi Hsieh 
Authored: Tue Apr 17 15:09:36 2018 +0200
Committer: Herman van Hovell 
Committed: Tue Apr 17 15:09:36 2018 +0200

--
 .../catalyst/expressions/objects/objects.scala  |   2 +-
 .../spark/sql/catalyst/util/ArrayData.scala |  30 +-
 .../util/ArrayDataIndexedSeqSuite.scala | 100 +++
 3 files changed, 130 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/30ffb53c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 77802e8..72b202b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -708,7 +708,7 @@ case class MapObjects private(
 }
   }
 case ArrayType(et, _) =>
-  _.asInstanceOf[ArrayData].array
+  _.asInstanceOf[ArrayData].toSeq[Any](et)
   }
 
   private lazy val mapElements: Seq[_] => Any = customCollectionCls match {

http://git-wip-us.apache.org/repos/asf/spark/blob/30ffb53c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala
index 9beef41..2cf59d5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.catalyst.util
 
 import scala.reflect.ClassTag
 
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, 
UnsafeArrayData}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types._
 
 object ArrayData {
   def toArrayData(input: Any): ArrayData = input match {
@@ -42,6 +43,9 @@ abstract class ArrayData extends SpecializedGetters with 
Serializable {
 
   def array: Array[Any]
 
+  def toSeq[T](dataType: DataType): IndexedSeq[T] =
+new ArrayDataIndexedSeq[T](this, dataType)
+
   def setNullAt(i: Int): Unit
 
   def update(i: Int, value: Any): Unit
@@ -164,3 +168,27 @@ abstract class ArrayData extends SpecializedGetters with 
Serializable {
 }
   }
 }
+
+/**
+ * Implements an `IndexedSeq` interface for `ArrayData`. Notice that if the 
original `ArrayData`
+ * is a primitive array and contains null elements, it is better to ask for 
`IndexedSeq[Any]`,
+ * instead of `IndexedSeq[Int]`, in order to keep the null elements.
+ */
+class ArrayDataIndexedSeq[T](arrayData: ArrayData, dataType: DataType) extends 
IndexedSeq[T] {
+
+  private val accessor: (SpecializedGetters, Int) => Any = 
InternalRow.getAccessor(dataType)
+
+  override def apply(idx: Int): T =
+if (0 <= idx && idx < arrayData.numElements()) {
+  if (arrayData.isNullAt(idx)) {
+null.asInstanceOf[T]
+  } else {
+accessor(arrayData, idx).asInstanceOf[T]
+  }
+} else {
+  throw new IndexOutOfBoundsException(
+s"Index $idx must be between 0 and the length of the ArrayData.")
+}
+
+  override def length: Int = arrayData.numElements()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/30ffb53c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayDataIndexedSeqSuite.scala
--

spark git commit: [SPARK-23835][SQL] Add not-null check to Tuples' arguments deserialization

2018-04-17 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 30ffb53ca -> 0a9172a05


[SPARK-23835][SQL] Add not-null check to Tuples' arguments deserialization

## What changes were proposed in this pull request?

There was no check on nullability for arguments of `Tuple`s. This could lead to 
have weird behavior when a null value had to be deserialized into a 
non-nullable Scala object: in those cases, the `null` got silently transformed 
in a valid value (like `-1` for `Int`), corresponding to the default value we 
are using in the SQL codebase. This situation was very likely to happen when 
deserializing to a Tuple of primitive Scala types (like Double, Int, ...).

The PR adds the `AssertNotNull` to arguments of tuples which have been asked to 
be converted to non-nullable types.

## How was this patch tested?

added UT

Author: Marco Gaido 

Closes #20976 from mgaido91/SPARK-23835.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a9172a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a9172a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a9172a0

Branch: refs/heads/master
Commit: 0a9172a05e604a4a94adbb9208c8c02362afca00
Parents: 30ffb53
Author: Marco Gaido 
Authored: Tue Apr 17 21:45:20 2018 +0800
Committer: Wenchen Fan 
Committed: Tue Apr 17 21:45:20 2018 +0800

--
 .../spark/sql/kafka010/KafkaContinuousSinkSuite.scala |  6 +++---
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala|  2 +-
 .../apache/spark/sql/catalyst/ScalaReflection.scala   | 14 +++---
 .../spark/sql/catalyst/ScalaReflectionSuite.scala | 12 +++-
 .../scala/org/apache/spark/sql/DatasetSuite.scala |  5 +
 5 files changed, 27 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a9172a0/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
index fc890a0..ddfc0c1 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -79,7 +79,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
 val reader = createKafkaReader(topic)
   .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
   .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
-  .as[(Int, Int)]
+  .as[(Option[Int], Int)]
   .map(_._2)
 
 try {
@@ -119,7 +119,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
 val reader = createKafkaReader(topic)
   .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
   .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
-  .as[(Int, Int)]
+  .as[(Option[Int], Int)]
   .map(_._2)
 
 try {
@@ -167,7 +167,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
 val reader = createKafkaReader(topic)
   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   .selectExpr("CAST(key AS INT)", "CAST(value AS INT)")
-  .as[(Int, Int)]
+  .as[(Option[Int], Int)]
   .map(_._2)
 
 try {

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9172a0/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 42f8b4c..7079ac6 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -138,7 +138,7 @@ class KafkaSinkSuite extends StreamTest with 
SharedSQLContext {
 val reader = createKafkaReader(topic)
   .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
   .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
-  .as[(Int, Int)]
+  .as[(Option[Int], Int)]
   .map(_._2)
 
 try {

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9172a0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/

spark git commit: [SPARK-23835][SQL] Add not-null check to Tuples' arguments deserialization

2018-04-17 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 d4f204c53 -> 9857e249c


[SPARK-23835][SQL] Add not-null check to Tuples' arguments deserialization

## What changes were proposed in this pull request?

There was no check on nullability for arguments of `Tuple`s. This could lead to 
have weird behavior when a null value had to be deserialized into a 
non-nullable Scala object: in those cases, the `null` got silently transformed 
in a valid value (like `-1` for `Int`), corresponding to the default value we 
are using in the SQL codebase. This situation was very likely to happen when 
deserializing to a Tuple of primitive Scala types (like Double, Int, ...).

The PR adds the `AssertNotNull` to arguments of tuples which have been asked to 
be converted to non-nullable types.

## How was this patch tested?

added UT

Author: Marco Gaido 

Closes #20976 from mgaido91/SPARK-23835.

(cherry picked from commit 0a9172a05e604a4a94adbb9208c8c02362afca00)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9857e249
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9857e249
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9857e249

Branch: refs/heads/branch-2.3
Commit: 9857e249c20f842868cb4681ea374b8e316c3ead
Parents: d4f204c
Author: Marco Gaido 
Authored: Tue Apr 17 21:45:20 2018 +0800
Committer: Wenchen Fan 
Committed: Tue Apr 17 21:46:00 2018 +0800

--
 .../spark/sql/kafka010/KafkaContinuousSinkSuite.scala |  6 +++---
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala|  2 +-
 .../apache/spark/sql/catalyst/ScalaReflection.scala   | 14 +++---
 .../spark/sql/catalyst/ScalaReflectionSuite.scala | 12 +++-
 .../scala/org/apache/spark/sql/DatasetSuite.scala |  5 +
 5 files changed, 27 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9857e249/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
index fc890a0..ddfc0c1 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -79,7 +79,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
 val reader = createKafkaReader(topic)
   .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
   .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
-  .as[(Int, Int)]
+  .as[(Option[Int], Int)]
   .map(_._2)
 
 try {
@@ -119,7 +119,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
 val reader = createKafkaReader(topic)
   .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
   .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
-  .as[(Int, Int)]
+  .as[(Option[Int], Int)]
   .map(_._2)
 
 try {
@@ -167,7 +167,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
 val reader = createKafkaReader(topic)
   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   .selectExpr("CAST(key AS INT)", "CAST(value AS INT)")
-  .as[(Int, Int)]
+  .as[(Option[Int], Int)]
   .map(_._2)
 
 try {

http://git-wip-us.apache.org/repos/asf/spark/blob/9857e249/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 42f8b4c..7079ac6 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -138,7 +138,7 @@ class KafkaSinkSuite extends StreamTest with 
SharedSQLContext {
 val reader = createKafkaReader(topic)
   .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
   .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
-  .as[(Int, Int)]
+  .as[(Option[Int], Int)]
   .map(_._2)
 
 try {

http://git-wip-us.apache.org/repos/asf/spark/blob/9857e249/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
--
diff --git 
a/sql/ca

spark git commit: [SPARK-22676] Avoid iterating all partition paths when spark.sql.hive.verifyPartitionPath=true

2018-04-17 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 0a9172a05 -> ed4101d29


[SPARK-22676] Avoid iterating all partition paths when 
spark.sql.hive.verifyPartitionPath=true

## What changes were proposed in this pull request?

In current code, it will scanning all partition paths when 
spark.sql.hive.verifyPartitionPath=true.
e.g. table like below:
```
CREATE TABLE `test`(
`id` int,
`age` int,
`name` string)
PARTITIONED BY (
`A` string,
`B` string)
load data local inpath '/tmp/data0' into table test partition(A='00', B='00')
load data local inpath '/tmp/data1' into table test partition(A='01', B='01')
load data local inpath '/tmp/data2' into table test partition(A='10', B='10')
load data local inpath '/tmp/data3' into table test partition(A='11', B='11')
```
If I query with SQL – "select * from test where A='00' and B='01'  ", current 
code will scan all partition paths including '/data/A=00/B=00', 
'/data/A=00/B=00', '/data/A=01/B=01', '/data/A=10/B=10', '/data/A=11/B=11'. It 
costs much time and memory cost.

This pr proposes to avoid iterating all partition paths. Add a config 
`spark.files.ignoreMissingFiles` and ignore the `file not found` when 
`getPartitions/compute`(for hive table scan). This is much like the logic 
brought by
`spark.sql.files.ignoreMissingFiles`(which is for datasource scan).

## How was this patch tested?
UT

Author: jinxing 

Closes #19868 from jinxing64/SPARK-22676.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed4101d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed4101d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed4101d2

Branch: refs/heads/master
Commit: ed4101d29f50d54fd7846421e4c00e9ecd3599d0
Parents: 0a9172a
Author: jinxing 
Authored: Tue Apr 17 21:52:33 2018 +0800
Committer: Wenchen Fan 
Committed: Tue Apr 17 21:52:33 2018 +0800

--
 .../apache/spark/internal/config/package.scala  |  6 ++
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 43 
 .../org/apache/spark/rdd/NewHadoopRDD.scala | 45 +
 .../test/scala/org/apache/spark/FileSuite.scala | 69 +++-
 .../org/apache/spark/sql/internal/SQLConf.scala |  3 +-
 .../spark/sql/hive/QueryPartitionSuite.scala| 40 
 6 files changed, 181 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 407545a..99d779f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -301,6 +301,12 @@ package object config {
 .booleanConf
 .createWithDefault(false)
 
+  private[spark] val IGNORE_MISSING_FILES = 
ConfigBuilder("spark.files.ignoreMissingFiles")
+.doc("Whether to ignore missing files. If true, the Spark jobs will 
continue to run when " +
+"encountering missing files and the contents that have been read will 
still be returned.")
+.booleanConf
+.createWithDefault(false)
+
   private[spark] val APP_CALLER_CONTEXT = 
ConfigBuilder("spark.log.callerContext")
 .stringConf
 .createOptional

http://git-wip-us.apache.org/repos/asf/spark/blob/ed4101d2/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2480559..44895ab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
 import java.text.SimpleDateFormat
 import java.util.{Date, Locale}
 
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapred.lib.CombineFileSplit
 import org.apache.hadoop.mapreduce.TaskType
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark._
@@ -134,6 +135,8 @@ class HadoopRDD[K, V](
 
   private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
 
+  private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
+
   private val ignoreEmptySplits = 
sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
 
   // Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.

spark git commit: [SPARK-23948] Trigger mapstage's job listener in submitMissingTasks

2018-04-17 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master ed4101d29 -> 3990daaf3


[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks

## What changes were proposed in this pull request?

SparkContext submitted a map stage from `submitMapStage` to `DAGScheduler`,
`markMapStageJobAsFinished` is called only in 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L933
 and 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1314);

But think about below scenario:
1. stage0 and stage1 are all `ShuffleMapStage` and stage1 depends on stage0;
2. We submit stage1 by `submitMapStage`;
3. When stage 1 running, `FetchFailed` happened, stage0 and stage1 got 
resubmitted as stage0_1 and stage1_1;
4. When stage0_1 running, speculated tasks in old stage1 come as succeeded, but 
stage1 is not inside `runningStages`. So even though all splits(including the 
speculated tasks) in stage1 succeeded, job listener in stage1 will not be 
called;
5. stage0_1 finished, stage1_1 starts running. When `submitMissingTasks`, there 
is no missing tasks. But in current code, job listener is not triggered.

We should call the job listener for map stage in `5`.

## How was this patch tested?

Not added yet.

Author: jinxing 

Closes #21019 from jinxing64/SPARK-23948.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3990daaf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3990daaf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3990daaf

Branch: refs/heads/master
Commit: 3990daaf3b6ca2c5a9f7790030096262efb12cb2
Parents: ed4101d
Author: jinxing 
Authored: Tue Apr 17 08:55:01 2018 -0500
Committer: Imran Rashid 
Committed: Tue Apr 17 08:55:01 2018 -0500

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 33 +++--
 .../spark/scheduler/DAGSchedulerSuite.scala | 52 
 2 files changed, 70 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3990daaf/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8c46a84..78b6b34 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1092,17 +1092,16 @@ class DAGScheduler(
   // the stage as completed here in case there are no tasks to run
   markStageAsFinished(stage, None)
 
-  val debugString = stage match {
+  stage match {
 case stage: ShuffleMapStage =>
-  s"Stage ${stage} is actually done; " +
-s"(available: ${stage.isAvailable}," +
-s"available outputs: ${stage.numAvailableOutputs}," +
-s"partitions: ${stage.numPartitions})"
+  logDebug(s"Stage ${stage} is actually done; " +
+  s"(available: ${stage.isAvailable}," +
+  s"available outputs: ${stage.numAvailableOutputs}," +
+  s"partitions: ${stage.numPartitions})")
+  markMapStageJobsAsFinished(stage)
 case stage : ResultStage =>
-  s"Stage ${stage} is actually done; (partitions: 
${stage.numPartitions})"
+  logDebug(s"Stage ${stage} is actually done; (partitions: 
${stage.numPartitions})")
   }
-  logDebug(debugString)
-
   submitWaitingChildStages(stage)
 }
   }
@@ -1307,13 +1306,7 @@ class DAGScheduler(
   shuffleStage.findMissingPartitions().mkString(", "))
 submitStage(shuffleStage)
   } else {
-// Mark any map-stage jobs waiting on this stage as finished
-if (shuffleStage.mapStageJobs.nonEmpty) {
-  val stats = 
mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
-  for (job <- shuffleStage.mapStageJobs) {
-markMapStageJobAsFinished(job, stats)
-  }
-}
+markMapStageJobsAsFinished(shuffleStage)
 submitWaitingChildStages(shuffleStage)
   }
 }
@@ -1433,6 +1426,16 @@ class DAGScheduler(
 }
   }
 
+  private[scheduler] def markMapStageJobsAsFinished(shuffleStage: 
ShuffleMapStage): Unit = {
+// Mark any map-stage jobs waiting on this stage as finished
+if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) {
+  val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
+  for (job <- shuffleStage.mapStageJobs) {
+markMapStageJobAsFinished(job, stats)
+  }
+}
+  }
+
   /**
  

svn commit: r26377 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_17_08_01-3990daa-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-17 Thread pwendell
Author: pwendell
Date: Tue Apr 17 15:19:23 2018
New Revision: 26377

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_17_08_01-3990daa docs


[This commit notification would consist of 1457 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23986][SQL] freshName can generate non-unique names

2018-04-17 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 3990daaf3 -> f39e82ce1


[SPARK-23986][SQL] freshName can generate non-unique names

## What changes were proposed in this pull request?

We are using `CodegenContext.freshName` to get a unique name for any new 
variable we are adding. Unfortunately, this method currently fails to create a 
unique name when we request more than one instance of variables with starting 
name `name1` and an instance with starting name `name11`.

The PR changes the way a new name is generated by `CodegenContext.freshName` so 
that we generate unique names in this scenario too.

## How was this patch tested?

added UT

Author: Marco Gaido 

Closes #21080 from mgaido91/SPARK-23986.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f39e82ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f39e82ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f39e82ce

Branch: refs/heads/master
Commit: f39e82ce150b6a7ea038e6858ba7adbaba3cad88
Parents: 3990daa
Author: Marco Gaido 
Authored: Wed Apr 18 00:35:44 2018 +0800
Committer: Wenchen Fan 
Committed: Wed Apr 18 00:35:44 2018 +0800

--
 .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 11 +++
 .../sql/catalyst/expressions/CodeGenerationSuite.scala   | 10 ++
 2 files changed, 13 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f39e82ce/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index d97611c..f6b6775 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -572,14 +572,9 @@ class CodegenContext {
 } else {
   s"${freshNamePrefix}_$name"
 }
-if (freshNameIds.contains(fullName)) {
-  val id = freshNameIds(fullName)
-  freshNameIds(fullName) = id + 1
-  s"$fullName$id"
-} else {
-  freshNameIds += fullName -> 1
-  fullName
-}
+val id = freshNameIds.getOrElse(fullName, 0)
+freshNameIds(fullName) = id + 1
+s"${fullName}_$id"
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f39e82ce/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index f7c0231..5b71bec 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -489,4 +489,14 @@ class CodeGenerationSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   assert(!ctx.subExprEliminationExprs.contains(ref))
 }
   }
+
+  test("SPARK-23986: freshName can generate duplicated names") {
+val ctx = new CodegenContext
+val names1 = ctx.freshName("myName1") :: ctx.freshName("myName1") ::
+  ctx.freshName("myName11") :: Nil
+assert(names1.distinct.length == 3)
+val names2 = ctx.freshName("a") :: ctx.freshName("a") ::
+  ctx.freshName("a_1") :: ctx.freshName("a_0") :: Nil
+assert(names2.distinct.length == 4)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23986][SQL] freshName can generate non-unique names

2018-04-17 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 9857e249c -> 564019b92


[SPARK-23986][SQL] freshName can generate non-unique names

## What changes were proposed in this pull request?

We are using `CodegenContext.freshName` to get a unique name for any new 
variable we are adding. Unfortunately, this method currently fails to create a 
unique name when we request more than one instance of variables with starting 
name `name1` and an instance with starting name `name11`.

The PR changes the way a new name is generated by `CodegenContext.freshName` so 
that we generate unique names in this scenario too.

## How was this patch tested?

added UT

Author: Marco Gaido 

Closes #21080 from mgaido91/SPARK-23986.

(cherry picked from commit f39e82ce150b6a7ea038e6858ba7adbaba3cad88)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/564019b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/564019b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/564019b9

Branch: refs/heads/branch-2.3
Commit: 564019b926eddec39d0485217e693a1e6b4b8e14
Parents: 9857e24
Author: Marco Gaido 
Authored: Wed Apr 18 00:35:44 2018 +0800
Committer: Wenchen Fan 
Committed: Wed Apr 18 00:37:38 2018 +0800

--
 .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 11 +++
 .../sql/catalyst/expressions/CodeGenerationSuite.scala   | 10 ++
 2 files changed, 13 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/564019b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 2631e7e..504e851 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -564,14 +564,9 @@ class CodegenContext {
 } else {
   s"${freshNamePrefix}_$name"
 }
-if (freshNameIds.contains(fullName)) {
-  val id = freshNameIds(fullName)
-  freshNameIds(fullName) = id + 1
-  s"$fullName$id"
-} else {
-  freshNameIds += fullName -> 1
-  fullName
-}
+val id = freshNameIds.getOrElse(fullName, 0)
+freshNameIds(fullName) = id + 1
+s"${fullName}_$id"
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/564019b9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index e3ce5af..d0d6318 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -486,4 +486,14 @@ class CodeGenerationSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   assert(!ctx.subExprEliminationExprs.contains(ref))
 }
   }
+
+  test("SPARK-23986: freshName can generate duplicated names") {
+val ctx = new CodegenContext
+val names1 = ctx.freshName("myName1") :: ctx.freshName("myName1") ::
+  ctx.freshName("myName11") :: Nil
+assert(names1.distinct.length == 3)
+val names2 = ctx.freshName("a") :: ctx.freshName("a") ::
+  ctx.freshName("a_1") :: ctx.freshName("a_0") :: Nil
+assert(names2.distinct.length == 4)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21741][ML][PYSPARK] Python API for DataFrame-based multivariate summarizer

2018-04-17 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master f39e82ce1 -> 1ca3c50fe


[SPARK-21741][ML][PYSPARK] Python API for DataFrame-based multivariate 
summarizer

## What changes were proposed in this pull request?

Python API for DataFrame-based multivariate summarizer.

## How was this patch tested?

doctest added.

Author: WeichenXu 

Closes #20695 from WeichenXu123/py_summarizer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ca3c50f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ca3c50f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ca3c50f

Branch: refs/heads/master
Commit: 1ca3c50fefb34532c78427fa74872db3ecbf7ba2
Parents: f39e82c
Author: WeichenXu 
Authored: Tue Apr 17 10:11:08 2018 -0700
Committer: Joseph K. Bradley 
Committed: Tue Apr 17 10:11:08 2018 -0700

--
 python/pyspark/ml/stat.py | 193 -
 1 file changed, 192 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1ca3c50f/python/pyspark/ml/stat.py
--
diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py
index 93d0f4f..a06ab31 100644
--- a/python/pyspark/ml/stat.py
+++ b/python/pyspark/ml/stat.py
@@ -19,7 +19,9 @@ import sys
 
 from pyspark import since, SparkContext
 from pyspark.ml.common import _java2py, _py2java
-from pyspark.ml.wrapper import _jvm
+from pyspark.ml.wrapper import JavaWrapper, _jvm
+from pyspark.sql.column import Column, _to_seq
+from pyspark.sql.functions import lit
 
 
 class ChiSquareTest(object):
@@ -195,6 +197,195 @@ class KolmogorovSmirnovTest(object):
  _jvm().PythonUtils.toSeq(params)))
 
 
+class Summarizer(object):
+"""
+.. note:: Experimental
+
+Tools for vectorized statistics on MLlib Vectors.
+The methods in this package provide various statistics for Vectors 
contained inside DataFrames.
+This class lets users pick the statistics they would like to extract for a 
given column.
+
+>>> from pyspark.ml.stat import Summarizer
+>>> from pyspark.sql import Row
+>>> from pyspark.ml.linalg import Vectors
+>>> summarizer = Summarizer.metrics("mean", "count")
+>>> df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 
1.0)),
+...  Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 
3.0))]).toDF()
+>>> df.select(summarizer.summary(df.features, 
df.weight)).show(truncate=False)
++---+
+|aggregate_metrics(features, weight)|
++---+
+|[[1.0,1.0,1.0], 1] |
++---+
+
+>>> df.select(summarizer.summary(df.features)).show(truncate=False)
+++
+|aggregate_metrics(features, 1.0)|
+++
+|[[1.0,1.5,2.0], 2]  |
+++
+
+>>> df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)
++--+
+|mean(features)|
++--+
+|[1.0,1.0,1.0] |
++--+
+
+>>> df.select(Summarizer.mean(df.features)).show(truncate=False)
++--+
+|mean(features)|
++--+
+|[1.0,1.5,2.0] |
++--+
+
+
+.. versionadded:: 2.4.0
+
+"""
+@staticmethod
+@since("2.4.0")
+def mean(col, weightCol=None):
+"""
+return a column of mean summary
+"""
+return Summarizer._get_single_metric(col, weightCol, "mean")
+
+@staticmethod
+@since("2.4.0")
+def variance(col, weightCol=None):
+"""
+return a column of variance summary
+"""
+return Summarizer._get_single_metric(col, weightCol, "variance")
+
+@staticmethod
+@since("2.4.0")
+def count(col, weightCol=None):
+"""
+return a column of count summary
+"""
+return Summarizer._get_single_metric(col, weightCol, "count")
+
+@staticmethod
+@since("2.4.0")
+def numNonZeros(col, weightCol=None):
+"""
+return a column of numNonZero summary
+"""
+return Summarizer._get_single_metric(col, weightCol, "numNonZeros")
+
+@staticmethod
+@since("2.4.0")
+def max(col, weightCol=None):
+"""
+return a column of max summary
+"""
+return Summarizer._get_single_metric(col, weightCol, "max")
+
+@staticmethod
+@since("2.4.0")
+def min(col, weightCol=None):
+"""
+return a column of min summary
+"""
+return Summarizer._get_single_metric(col, weightCol, "min")
+
+@staticmethod
+@since("2.4.0")
+  

svn commit: r26378 - in /dev/spark/2.3.1-SNAPSHOT-2018_04_17_10_01-564019b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-17 Thread pwendell
Author: pwendell
Date: Tue Apr 17 17:17:05 2018
New Revision: 26378

Log:
Apache Spark 2.3.1-SNAPSHOT-2018_04_17_10_01-564019b docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark-website git commit: Sync ASF git repo and Github

2018-04-17 Thread sameerag
Repository: spark-website
Updated Branches:
  refs/heads/asf-site f050f7e3d -> 0f049fd2e


Sync ASF git repo and Github


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/0f049fd2
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/0f049fd2
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/0f049fd2

Branch: refs/heads/asf-site
Commit: 0f049fd2e0d5d5f21f8f0f6a2b9584f1666f5cee
Parents: f050f7e
Author: Sameer Agarwal 
Authored: Tue Apr 17 11:00:08 2018 -0700
Committer: Sameer Agarwal 
Committed: Tue Apr 17 11:00:08 2018 -0700

--

--



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26380 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_17_12_04-1ca3c50-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-17 Thread pwendell
Author: pwendell
Date: Tue Apr 17 19:18:37 2018
New Revision: 26380

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_17_12_04-1ca3c50 docs


[This commit notification would consist of 1457 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23948] Trigger mapstage's job listener in submitMissingTasks

2018-04-17 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 564019b92 -> 6b99d5bc3


[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks

## What changes were proposed in this pull request?

SparkContext submitted a map stage from `submitMapStage` to `DAGScheduler`,
`markMapStageJobAsFinished` is called only in 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L933
 and 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1314);

But think about below scenario:
1. stage0 and stage1 are all `ShuffleMapStage` and stage1 depends on stage0;
2. We submit stage1 by `submitMapStage`;
3. When stage 1 running, `FetchFailed` happened, stage0 and stage1 got 
resubmitted as stage0_1 and stage1_1;
4. When stage0_1 running, speculated tasks in old stage1 come as succeeded, but 
stage1 is not inside `runningStages`. So even though all splits(including the 
speculated tasks) in stage1 succeeded, job listener in stage1 will not be 
called;
5. stage0_1 finished, stage1_1 starts running. When `submitMissingTasks`, there 
is no missing tasks. But in current code, job listener is not triggered.

We should call the job listener for map stage in `5`.

## How was this patch tested?

Not added yet.

Author: jinxing 

(cherry picked from commit 3990daaf3b6ca2c5a9f7790030096262efb12cb2)

Author: jinxing 

Closes #21085 from squito/cp.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b99d5bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b99d5bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b99d5bc

Branch: refs/heads/branch-2.3
Commit: 6b99d5bc3f3898a0aff30468a623a3f64bb20b62
Parents: 564019b
Author: jinxing 
Authored: Tue Apr 17 15:53:29 2018 -0500
Committer: Imran Rashid 
Committed: Tue Apr 17 15:53:29 2018 -0500

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 33 +++--
 .../spark/scheduler/DAGSchedulerSuite.scala | 52 
 2 files changed, 70 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6b99d5bc/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8c46a84..78b6b34 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1092,17 +1092,16 @@ class DAGScheduler(
   // the stage as completed here in case there are no tasks to run
   markStageAsFinished(stage, None)
 
-  val debugString = stage match {
+  stage match {
 case stage: ShuffleMapStage =>
-  s"Stage ${stage} is actually done; " +
-s"(available: ${stage.isAvailable}," +
-s"available outputs: ${stage.numAvailableOutputs}," +
-s"partitions: ${stage.numPartitions})"
+  logDebug(s"Stage ${stage} is actually done; " +
+  s"(available: ${stage.isAvailable}," +
+  s"available outputs: ${stage.numAvailableOutputs}," +
+  s"partitions: ${stage.numPartitions})")
+  markMapStageJobsAsFinished(stage)
 case stage : ResultStage =>
-  s"Stage ${stage} is actually done; (partitions: 
${stage.numPartitions})"
+  logDebug(s"Stage ${stage} is actually done; (partitions: 
${stage.numPartitions})")
   }
-  logDebug(debugString)
-
   submitWaitingChildStages(stage)
 }
   }
@@ -1307,13 +1306,7 @@ class DAGScheduler(
   shuffleStage.findMissingPartitions().mkString(", "))
 submitStage(shuffleStage)
   } else {
-// Mark any map-stage jobs waiting on this stage as finished
-if (shuffleStage.mapStageJobs.nonEmpty) {
-  val stats = 
mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
-  for (job <- shuffleStage.mapStageJobs) {
-markMapStageJobAsFinished(job, stats)
-  }
-}
+markMapStageJobsAsFinished(shuffleStage)
 submitWaitingChildStages(shuffleStage)
   }
 }
@@ -1433,6 +1426,16 @@ class DAGScheduler(
 }
   }
 
+  private[scheduler] def markMapStageJobsAsFinished(shuffleStage: 
ShuffleMapStage): Unit = {
+// Mark any map-stage jobs waiting on this stage as finished
+if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) {
+  val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
+  for (job <- shuffleStage.mapStageJob

svn commit: r26383 - in /dev/spark/2.3.1-SNAPSHOT-2018_04_17_14_01-6b99d5b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-17 Thread pwendell
Author: pwendell
Date: Tue Apr 17 21:16:26 2018
New Revision: 26383

Log:
Apache Spark 2.3.1-SNAPSHOT-2018_04_17_14_01-6b99d5b docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22968][DSTREAM] Throw an exception on partition revoking issue

2018-04-17 Thread koeninger
Repository: spark
Updated Branches:
  refs/heads/master 1ca3c50fe -> 5fccdae18


[SPARK-22968][DSTREAM] Throw an exception on partition revoking issue

## What changes were proposed in this pull request?

Kafka partitions can be revoked when new consumers joined in the consumer group 
to rebalance the partitions. But current Spark Kafka connector code makes sure 
there's no partition revoking scenarios, so trying to get latest offset from 
revoked partitions will throw exceptions as JIRA mentioned.

Partition revoking happens when new consumer joined the consumer group, which 
means different streaming apps are trying to use same group id. This is 
fundamentally not correct, different apps should use different consumer group. 
So instead of throwing an confused exception from Kafka, improve the exception 
message by identifying revoked partition and directly throw an meaningful 
exception when partition is revoked.

Besides, this PR also fixes bugs in `DirectKafkaWordCount`, this example simply 
cannot be worked without the fix.

```
8/01/05 09:48:27 INFO internals.ConsumerCoordinator: Revoking previously 
assigned partitions [kssh-7, kssh-4, kssh-3, kssh-6, kssh-5, kssh-0, kssh-2, 
kssh-1] for group use_a_separate_group_id_for_each_stream
18/01/05 09:48:27 INFO internals.AbstractCoordinator: (Re-)joining group 
use_a_separate_group_id_for_each_stream
18/01/05 09:48:27 INFO internals.AbstractCoordinator: Successfully joined group 
use_a_separate_group_id_for_each_stream with generation 4
18/01/05 09:48:27 INFO internals.ConsumerCoordinator: Setting newly assigned 
partitions [kssh-7, kssh-4, kssh-6, kssh-5] for group 
use_a_separate_group_id_for_each_stream
```

## How was this patch tested?

This is manually verified in local cluster, unfortunately I'm not sure how to 
simulate it in UT, so propose the PR without UT added.

Author: jerryshao 

Closes #21038 from jerryshao/SPARK-22968.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fccdae1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fccdae1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fccdae1

Branch: refs/heads/master
Commit: 5fccdae18911793967b315c02c058eb737e46174
Parents: 1ca3c50
Author: jerryshao 
Authored: Tue Apr 17 21:08:42 2018 -0500
Committer: cody koeninger 
Committed: Tue Apr 17 21:08:42 2018 -0500

--
 .../examples/streaming/DirectKafkaWordCount.scala  | 17 +
 .../kafka010/DirectKafkaInputDStream.scala | 12 
 2 files changed, 25 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5fccdae1/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
index def0602..2082fb7 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
@@ -18,6 +18,9 @@
 // scalastyle:off println
 package org.apache.spark.examples.streaming
 
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.StringDeserializer
+
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka010._
@@ -26,18 +29,20 @@ import org.apache.spark.streaming.kafka010._
  * Consumes messages from one or more topics in Kafka and does wordcount.
  * Usage: DirectKafkaWordCount  
  *is a list of one or more Kafka brokers
+ *is a consumer group name to consume from topics
  *is a list of one or more kafka topics to consume from
  *
  * Example:
  *$ bin/run-example streaming.DirectKafkaWordCount 
broker1-host:port,broker2-host:port \
- *topic1,topic2
+ *consumer-group topic1,topic2
  */
 object DirectKafkaWordCount {
   def main(args: Array[String]) {
-if (args.length < 2) {
+if (args.length < 3) {
   System.err.println(s"""
 |Usage: DirectKafkaWordCount  
 |   is a list of one or more Kafka brokers
+|   is a consumer group name to consume from topics
 |   is a list of one or more kafka topics to consume from
 |
 """.stripMargin)
@@ -46,7 +51,7 @@ object DirectKafkaWordCount {
 
 StreamingExamples.setStreamingLogLevels()
 
-val Array(brokers, topics) = args
+val Array(brokers, groupId, topics) = args
 
 // Create context with 2 second batch interval
 val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
@@ -54,7 +59,11 @@ object DirectKafkaWordCount {
 
 // Creat

spark git commit: [SPARK-21479][SQL] Outer join filter pushdown in null supplying table when condition is on one of the joined columns

2018-04-17 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 5fccdae18 -> 1e3b8762a


[SPARK-21479][SQL] Outer join filter pushdown in null supplying table when 
condition is on one of the joined columns

## What changes were proposed in this pull request?

Added `TransitPredicateInOuterJoin` optimization rule that transits constraints 
from the preserved side of an outer join to the null-supplying side. The 
constraints of the join operator will remain unchanged.

## How was this patch tested?

Added 3 tests in `InferFiltersFromConstraintsSuite`.

Author: maryannxue 

Closes #20816 from maryannxue/spark-21479.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e3b8762
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e3b8762
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e3b8762

Branch: refs/heads/master
Commit: 1e3b8762a854a07c317f69fba7fa1a7bcdc58ff3
Parents: 5fccdae
Author: maryannxue 
Authored: Wed Apr 18 10:36:41 2018 +0800
Committer: Wenchen Fan 
Committed: Wed Apr 18 10:36:41 2018 +0800

--
 .../sql/catalyst/optimizer/Optimizer.scala  | 42 ++--
 .../plans/logical/QueryPlanConstraints.scala| 25 ++--
 .../InferFiltersFromConstraintsSuite.scala  | 36 +
 3 files changed, 96 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1e3b8762/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 5fb59ef..913354e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -637,8 +637,11 @@ object CollapseWindow extends Rule[LogicalPlan] {
  * constraints. These filters are currently inserted to the existing 
conditions in the Filter
  * operators and on either side of Join operators.
  *
- * Note: While this optimization is applicable to all types of join, it 
primarily benefits Inner and
- * LeftSemi joins.
+ * In addition, for left/right outer joins, infer predicate from the preserved 
side of the Join
+ * operator and push the inferred filter over to the null-supplying side. For 
example, if the
+ * preserved side has constraints of the form 'a > 5' and the join condition 
is 'a = b', in
+ * which 'b' is an attribute from the null-supplying side, a [[Filter]] 
operator of 'b > 5' will
+ * be applied to the null-supplying side.
  */
 object InferFiltersFromConstraints extends Rule[LogicalPlan] with 
PredicateHelper {
 
@@ -671,11 +674,42 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
   val newConditionOpt = conditionOpt match {
 case Some(condition) =>
   val newFilters = additionalConstraints -- 
splitConjunctivePredicates(condition)
-  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else None
+  if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), 
condition)) else conditionOpt
 case None =>
   additionalConstraints.reduceOption(And)
   }
-  if (newConditionOpt.isDefined) Join(left, right, joinType, 
newConditionOpt) else join
+  // Infer filter for left/right outer joins
+  val newLeftOpt = joinType match {
+case RightOuter if newConditionOpt.isDefined =>
+  val inferredConstraints = left.getRelevantConstraints(
+left.constraints
+  .union(right.constraints)
+  .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+  val newFilters = inferredConstraints
+.filterNot(left.constraints.contains)
+.reduceLeftOption(And)
+  newFilters.map(Filter(_, left))
+case _ => None
+  }
+  val newRightOpt = joinType match {
+case LeftOuter if newConditionOpt.isDefined =>
+  val inferredConstraints = right.getRelevantConstraints(
+right.constraints
+  .union(left.constraints)
+  .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
+  val newFilters = inferredConstraints
+.filterNot(right.constraints.contains)
+.reduceLeftOption(And)
+  newFilters.map(Filter(_, right))
+case _ => None
+  }
+
+  if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
+|| newLeftOpt.isDefined || newRightOpt.isDefined) {
+Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), 
joinType, newConditionOpt)
+  } else {
+  

svn commit: r26386 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_17_20_01-1e3b876-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-17 Thread pwendell
Author: pwendell
Date: Wed Apr 18 03:16:32 2018
New Revision: 26386

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_17_20_01-1e3b876 docs


[This commit notification would consist of 1457 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23341][SQL] define some standard options for data source v2

2018-04-17 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 1e3b8762a -> 310a8cd06


[SPARK-23341][SQL] define some standard options for data source v2

## What changes were proposed in this pull request?

Each data source implementation can define its own options and teach its users 
how to set them. Spark doesn't have any restrictions about what options a data 
source should or should not have. It's possible that some options are very 
common and many data sources use them. However different data sources may 
define the common options(key and meaning) differently, which is quite 
confusing to end users.

This PR defines some standard options that data sources can optionally adopt: 
path, table and database.

## How was this patch tested?

a new test case.

Author: Wenchen Fan 

Closes #20535 from cloud-fan/options.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/310a8cd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/310a8cd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/310a8cd0

Branch: refs/heads/master
Commit: 310a8cd06299e434d94a1e391a6eb62944112446
Parents: 1e3b876
Author: Wenchen Fan 
Authored: Wed Apr 18 11:51:10 2018 +0800
Committer: Wenchen Fan 
Committed: Wed Apr 18 11:51:10 2018 +0800

--
 .../spark/sql/sources/v2/DataSourceOptions.java | 100 +++
 .../org/apache/spark/sql/DataFrameReader.scala  |  14 ++-
 .../sql/sources/v2/DataSourceOptionsSuite.scala |  25 +
 3 files changed, 135 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/310a8cd0/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
index c320535..83df3be 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java
@@ -17,16 +17,61 @@
 
 package org.apache.spark.sql.sources.v2;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Stream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
  * An immutable string-to-string map in which keys are case-insensitive. This 
is used to represent
  * data source options.
+ *
+ * Each data source implementation can define its own options and teach its 
users how to set them.
+ * Spark doesn't have any restrictions about what options a data source should 
or should not have.
+ * Instead Spark defines some standard options that data sources can 
optionally adopt. It's possible
+ * that some options are very common and many data sources use them. However 
different data
+ * sources may define the common options(key and meaning) differently, which 
is quite confusing to
+ * end users.
+ *
+ * The standard options defined by Spark:
+ * 
+ *   
+ * Option key
+ * Option value
+ *   
+ *   
+ * path
+ * A path string of the data files/directories, like
+ * path1, /absolute/file2, path3/*. 
The path can
+ * either be relative or absolute, points to either file or directory, and 
can contain
+ * wildcards. This option is commonly used by file-based data sources.
+ *   
+ *   
+ * paths
+ * A JSON array style paths string of the data files/directories, like
+ * ["path1", "/absolute/file2"]. The format of each path is 
same as the
+ * path option, plus it should follow JSON string literal 
format, e.g. quotes
+ * should be escaped, pa\"th means pa"th.
+ * 
+ *   
+ *   
+ * table
+ * A table name string representing the table name directly without 
any interpretation.
+ * For example, db.tbl means a table called db.tbl, not a 
table called tbl
+ * inside database db. `t*b.l` means a table called `t*b.l`, 
not t*b.l.
+ *   
+ *   
+ * database
+ * A database name string representing the database name directly 
without any
+ * interpretation, which is very similar to the table name option.
+ *   
+ * 
  */
 @InterfaceStability.Evolving
 public class DataSourceOptions {
@@ -97,4 +142,59 @@ public class DataSourceOptions {
 return keyLowerCasedMap.containsKey(lcaseKey) ?
   Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue;
   }
+
+  /**
+   * The option key for singular path.
+   */
+  public static final String PATH_KEY = "path";
+
+  /**
+   * The option key for multiple paths.
+   */
+  public static final String PATHS_KEY = "paths";
+
+  /**
+   * The option key for table name.

spark git commit: [SPARK-24002][SQL] Task not serializable caused by org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes

2018-04-17 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 310a8cd06 -> cce469435


[SPARK-24002][SQL] Task not serializable caused by 
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes

## What changes were proposed in this pull request?
```
Py4JJavaError: An error occurred while calling o153.sql.
: org.apache.spark.SparkException: Job aborted.
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:223)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:189)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:3021)
at 
org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:89)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:127)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3020)
at org.apache.spark.sql.Dataset.(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:646)
at sun.reflect.GeneratedMethodAccessor153.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:293)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:226)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Exception thrown in Future.get:
at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:190)
at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:267)
at 
org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doConsume(BroadcastNestedLoopJoinExec.scala:530)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
at 
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:37)
at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:69)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
at 
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:144)
...
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
... 23 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.spark.SparkException: Task not serializable
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:179)
... 276 more
Caused by: org.apache.spark.SparkException: Task not serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2380)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:417)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
at 
org.apache.spark.sql.execution.SparkPlan$$anonf