spark git commit: [SPARK-22290][CORE] Avoid creating Hive delegation tokens when not necessary.

2017-10-18 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master 6f1d0dea1 -> dc2714da5


[SPARK-22290][CORE] Avoid creating Hive delegation tokens when not necessary.

Hive delegation tokens are only needed when the Spark driver has no access
to the kerberos TGT. That happens only in two situations:

- when using a proxy user
- when using cluster mode without a keytab

This change modifies the Hive provider so that it only generates delegation
tokens in those situations, and tweaks the YARN AM so that it makes the proper
user visible to the Hive code when running with keytabs, so that the TGT
can be used instead of a delegation token.

The effect of this change is that now it's possible to initialize multiple,
non-concurrent SparkContext instances in the same JVM. Before, the second
invocation would fail to fetch a new Hive delegation token, which then could
make the second (or third or...) application fail once the token expired.
With this change, the TGT will be used to authenticate to the HMS instead.

This change also avoids polluting the current logged in user's credentials
when launching applications. The credentials are copied only when running
applications as a proxy user. This makes it possible to implement SPARK-11035
later, where multiple threads might be launching applications, and each app
should have its own set of credentials.

Tested by verifying HDFS and Hive access in following scenarios:
- client and cluster mode
- client and cluster mode with proxy user
- client and cluster mode with principal / keytab
- long-running cluster app with principal / keytab
- pyspark app that creates (and stops) multiple SparkContext instances
  through its lifetime

Author: Marcelo Vanzin 

Closes #19509 from vanzin/SPARK-22290.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc2714da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc2714da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc2714da

Branch: refs/heads/master
Commit: dc2714da50ecba1bf1fdf555a82a4314f763a76e
Parents: 6f1d0de
Author: Marcelo Vanzin 
Authored: Thu Oct 19 14:56:48 2017 +0800
Committer: jerryshao 
Committed: Thu Oct 19 14:56:48 2017 +0800

--
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 17 +++--
 .../security/HBaseDelegationTokenProvider.scala |  4 +-
 .../security/HadoopDelegationTokenManager.scala |  2 +-
 .../HadoopDelegationTokenProvider.scala |  2 +-
 .../HadoopFSDelegationTokenProvider.scala   |  4 +-
 .../security/HiveDelegationTokenProvider.scala  | 20 +-
 docs/running-on-yarn.md |  9 +++
 .../spark/deploy/yarn/ApplicationMaster.scala   | 69 
 .../org/apache/spark/deploy/yarn/Client.scala   |  5 +-
 .../org/apache/spark/deploy/yarn/config.scala   |  4 ++
 .../spark/sql/hive/client/HiveClientImpl.scala  |  6 --
 11 files changed, 110 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 53775db..1fa10ab 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -61,13 +61,17 @@ class SparkHadoopUtil extends Logging {
* do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
*/
   def runAsSparkUser(func: () => Unit) {
+createSparkUser().doAs(new PrivilegedExceptionAction[Unit] {
+  def run: Unit = func()
+})
+  }
+
+  def createSparkUser(): UserGroupInformation = {
 val user = Utils.getCurrentUserName()
-logDebug("running as user: " + user)
+logDebug("creating UGI for user: " + user)
 val ugi = UserGroupInformation.createRemoteUser(user)
 transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
-ugi.doAs(new PrivilegedExceptionAction[Unit] {
-  def run: Unit = func()
-})
+ugi
   }
 
   def transferCredentials(source: UserGroupInformation, dest: 
UserGroupInformation) {
@@ -417,6 +421,11 @@ class SparkHadoopUtil extends Logging {
 creds.readTokenStorageStream(new DataInputStream(tokensBuf))
 creds
   }
+
+  def isProxyUser(ugi: UserGroupInformation): Boolean = {
+ugi.getAuthenticationMethod() == 
UserGroupInformation.AuthenticationMethod.PROXY
+  }
+
 }
 
 object SparkHadoopUtil {

http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
--
diff --git 
a/core/src/main/scala/org/apache/spa

spark git commit: [SPARK-22300][BUILD] Update ORC to 1.4.1

2017-10-18 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 52facb006 -> 6f1d0dea1


[SPARK-22300][BUILD] Update ORC to 1.4.1

## What changes were proposed in this pull request?

Apache ORC 1.4.1 is released yesterday.
- https://orc.apache.org/news/2017/10/16/ORC-1.4.1/

Like ORC-233 (Allow `orc.include.columns` to be empty), there are several 
important fixes.
This PR updates Apache ORC dependency to use the latest one, 1.4.1.

## How was this patch tested?

Pass the Jenkins.

Author: Dongjoon Hyun 

Closes #19521 from dongjoon-hyun/SPARK-22300.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f1d0dea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f1d0dea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f1d0dea

Branch: refs/heads/master
Commit: 6f1d0dea1cdda558c998179789b386f6e52b9e36
Parents: 52facb0
Author: Dongjoon Hyun 
Authored: Thu Oct 19 13:30:55 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Oct 19 13:30:55 2017 +0800

--
 dev/deps/spark-deps-hadoop-2.6 | 6 +++---
 dev/deps/spark-deps-hadoop-2.7 | 6 +++---
 pom.xml| 6 +-
 3 files changed, 11 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6f1d0dea/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 76fcbd1..6e2fc63 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -2,7 +2,7 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.1.jar
-aircompressor-0.3.jar
+aircompressor-0.8.jar
 antlr-2.7.7.jar
 antlr-runtime-3.4.jar
 antlr4-runtime-4.7.jar
@@ -149,8 +149,8 @@ netty-3.9.9.Final.jar
 netty-all-4.0.47.Final.jar
 objenesis-2.1.jar
 opencsv-2.3.jar
-orc-core-1.4.0-nohive.jar
-orc-mapreduce-1.4.0-nohive.jar
+orc-core-1.4.1-nohive.jar
+orc-mapreduce-1.4.1-nohive.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/6f1d0dea/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index cb20072..c2bbc25 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -2,7 +2,7 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.1.jar
-aircompressor-0.3.jar
+aircompressor-0.8.jar
 antlr-2.7.7.jar
 antlr-runtime-3.4.jar
 antlr4-runtime-4.7.jar
@@ -150,8 +150,8 @@ netty-3.9.9.Final.jar
 netty-all-4.0.47.Final.jar
 objenesis-2.1.jar
 opencsv-2.3.jar
-orc-core-1.4.0-nohive.jar
-orc-mapreduce-1.4.0-nohive.jar
+orc-core-1.4.1-nohive.jar
+orc-mapreduce-1.4.1-nohive.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/6f1d0dea/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 9fac8b1..b9c9728 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,7 +128,7 @@
 1.2.1
 10.12.1.1
 1.8.2
-1.4.0
+1.4.1
 nohive
 1.6.0
 9.3.20.v20170531
@@ -1712,6 +1712,10 @@
 org.apache.hive
 hive-storage-api
   
+  
+io.airlift
+slice
+  
 
   
   


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21551][PYTHON] Increase timeout for PythonRDD.serveIterator

2017-10-18 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 010b50cea -> f8c83fdc5


[SPARK-21551][PYTHON] Increase timeout for PythonRDD.serveIterator

Backport of https://github.com/apache/spark/pull/18752 
(https://issues.apache.org/jira/browse/SPARK-21551)

(cherry picked from commit 9d3c6640f56e3e4fd195d3ad8cead09df67a72c7)

Author: peay 

Closes #19512 from FRosner/branch-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8c83fdc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8c83fdc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8c83fdc

Branch: refs/heads/branch-2.2
Commit: f8c83fdc52ba9120098e52a35085448150af6b50
Parents: 010b50c
Author: peay 
Authored: Thu Oct 19 13:07:04 2017 +0900
Committer: hyukjinkwon 
Committed: Thu Oct 19 13:07:04 2017 +0900

--
 .../src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 6 +++---
 python/pyspark/rdd.py  | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f8c83fdc/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index b0dd2fc..807b51f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -683,7 +683,7 @@ private[spark] object PythonRDD extends Logging {
* Create a socket server and a background thread to serve the data in 
`items`,
*
* The socket server can only accept one connection, or close if no 
connection
-   * in 3 seconds.
+   * in 15 seconds.
*
* Once a connection comes in, it tries to serialize all the data in `items`
* and send them into this connection.
@@ -692,8 +692,8 @@ private[spark] object PythonRDD extends Logging {
*/
   def serveIterator[T](items: Iterator[T], threadName: String): Int = {
 val serverSocket = new ServerSocket(0, 1, 
InetAddress.getByName("localhost"))
-// Close the socket if no connection in 3 seconds
-serverSocket.setSoTimeout(3000)
+// Close the socket if no connection in 15 seconds
+serverSocket.setSoTimeout(15000)
 
 new Thread(threadName) {
   setDaemon(true)

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c83fdc/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 6014179..aca00bc 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -127,7 +127,7 @@ def _load_from_socket(port, serializer):
 af, socktype, proto, canonname, sa = res
 sock = socket.socket(af, socktype, proto)
 try:
-sock.settimeout(3)
+sock.settimeout(15)
 sock.connect(sa)
 except socket.error:
 sock.close()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-14371][MLLIB] OnlineLDAOptimizer should not collect stats for each doc in mini-batch to driver

2017-10-18 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 1f25d8683 -> 52facb006


[SPARK-14371][MLLIB] OnlineLDAOptimizer should not collect stats for each doc 
in mini-batch to driver

Hi,

# What changes were proposed in this pull request?

as it was proposed by jkbradley , ```gammat``` are not collected to the driver 
anymore.

# How was this patch tested?
existing test suite.

Author: Valeriy Avanesov 
Author: Valeriy Avanesov 

Closes #18924 from akopich/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52facb00
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52facb00
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52facb00

Branch: refs/heads/master
Commit: 52facb0062a4253fa45ac0c633d0510a9b684a62
Parents: 1f25d86
Author: Valeriy Avanesov 
Authored: Wed Oct 18 10:46:46 2017 -0700
Committer: Joseph K. Bradley 
Committed: Wed Oct 18 10:46:46 2017 -0700

--
 .../spark/mllib/clustering/LDAOptimizer.scala   | 82 ++--
 1 file changed, 57 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/52facb00/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index d633893..693a2a3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -26,6 +26,7 @@ import breeze.stats.distributions.{Gamma, RandBasis}
 import org.apache.spark.annotation.{DeveloperApi, Since}
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.PeriodicGraphCheckpointer
+import org.apache.spark.internal.Logging
 import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, 
Vector, Vectors}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
@@ -259,7 +260,7 @@ final class EMLDAOptimizer extends LDAOptimizer {
  */
 @Since("1.4.0")
 @DeveloperApi
-final class OnlineLDAOptimizer extends LDAOptimizer {
+final class OnlineLDAOptimizer extends LDAOptimizer with Logging {
 
   // LDA common parameters
   private var k: Int = 0
@@ -462,31 +463,61 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
 val alpha = this.alpha.asBreeze
 val gammaShape = this.gammaShape
-
-val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { 
docs =>
+val optimizeDocConcentration = this.optimizeDocConcentration
+// If and only if optimizeDocConcentration is set true,
+// we calculate logphat in the same pass as other statistics.
+// No calculation of loghat happens otherwise.
+val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+Some(BDV.zeros[Double](k))
+  } else {
+None
+  }
+
+val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
   val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
   val stat = BDM.zeros[Double](k, vocabSize)
-  var gammaPart = List[BDV[Double]]()
+  val logphatPartOption = logphatPartOptionBase()
+  var nonEmptyDocCount: Long = 0L
   nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+nonEmptyDocCount += 1
 val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
   termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
-gammaPart = gammad :: gammaPart
+stat(::, ids) := stat(::, ids) + sstats
+logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
   }
-  Iterator((stat, gammaPart))
-}.persist(StorageLevel.MEMORY_AND_DISK)
-val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
-  _ += _, _ += _)
-val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-  stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
-stats.unpersist()
+  Iterator((stat, logphatPartOption, nonEmptyDocCount))
+}
+
+val elementWiseSum = (
+u: (BDM[Double], Option[BDV[Double]], Long),
+v: (BDM[Double], Option[BDV[Double]], Long)) => {
+  u._1 += v._1
+  u._2.foreach(_ += v._2.get)
+  (u._1, u._2, u._3 + v._3)
+}
+
+val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
+  .treeAggregate((BDM.zeros[Double](k, 

spark git commit: [SPARK-22249][FOLLOWUP][SQL] Check if list of value for IN is empty in the optimizer

2017-10-18 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c42309143 -> 010b50cea


[SPARK-22249][FOLLOWUP][SQL] Check if list of value for IN is empty in the 
optimizer

## What changes were proposed in this pull request?

This PR addresses the comments by gatorsmile on [the previous 
PR](https://github.com/apache/spark/pull/19494).

## How was this patch tested?

Previous UT and added UT.

Author: Marco Gaido 

Closes #19522 from mgaido91/SPARK-22249_FOLLOWUP.

(cherry picked from commit 1f25d8683a84a479fd7fc77b5a1ea980289b681b)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/010b50ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/010b50ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/010b50ce

Branch: refs/heads/branch-2.2
Commit: 010b50cea6e1662ee79f9ac22ed75ec41ca0e483
Parents: c423091
Author: Marco Gaido 
Authored: Wed Oct 18 09:14:46 2017 -0700
Committer: gatorsmile 
Committed: Wed Oct 18 09:14:57 2017 -0700

--
 .../sql/execution/columnar/InMemoryTableScanExec.scala  |  4 ++--
 .../execution/columnar/InMemoryColumnarQuerySuite.scala | 12 +++-
 2 files changed, 13 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/010b50ce/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index e792a45..7c2c13e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -102,8 +102,8 @@ case class InMemoryTableScanExec(
 case IsNull(a: Attribute) => statsFor(a).nullCount > 0
 case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount 
> 0
 
-case In(_: AttributeReference, list: Seq[Expression]) if list.isEmpty => 
Literal.FalseLiteral
-case In(a: AttributeReference, list: Seq[Expression]) if 
list.forall(_.isInstanceOf[Literal]) =>
+case In(a: AttributeReference, list: Seq[Expression])
+  if list.forall(_.isInstanceOf[Literal]) && list.nonEmpty =>
   list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] &&
 l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/010b50ce/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 67cff51..b049b60 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -21,8 +21,9 @@ import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 
 import org.apache.spark.sql.{DataFrame, QueryTest, Row}
-import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, In}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.execution.LocalTableScanExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -444,4 +445,13 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 assert(dfNulls.filter($"id".isin(2, 3)).count() == 0)
 dfNulls.unpersist()
   }
+
+  test("SPARK-22249: buildFilter should not throw exception when In contains 
an empty list") {
+val attribute = AttributeReference("a", IntegerType)()
+val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY,
+  LocalTableScanExec(Seq(attribute), Nil), None)
+val tableScanExec = InMemoryTableScanExec(Seq(attribute),
+  Seq(In(attribute, Nil)), testRelation)
+assert(tableScanExec.partitionFilters.isEmpty)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22249][FOLLOWUP][SQL] Check if list of value for IN is empty in the optimizer

2017-10-18 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 72561ecf4 -> 1f25d8683


[SPARK-22249][FOLLOWUP][SQL] Check if list of value for IN is empty in the 
optimizer

## What changes were proposed in this pull request?

This PR addresses the comments by gatorsmile on [the previous 
PR](https://github.com/apache/spark/pull/19494).

## How was this patch tested?

Previous UT and added UT.

Author: Marco Gaido 

Closes #19522 from mgaido91/SPARK-22249_FOLLOWUP.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f25d868
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f25d868
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f25d868

Branch: refs/heads/master
Commit: 1f25d8683a84a479fd7fc77b5a1ea980289b681b
Parents: 72561ec
Author: Marco Gaido 
Authored: Wed Oct 18 09:14:46 2017 -0700
Committer: gatorsmile 
Committed: Wed Oct 18 09:14:46 2017 -0700

--
 .../sql/execution/columnar/InMemoryTableScanExec.scala  |  4 ++--
 .../execution/columnar/InMemoryColumnarQuerySuite.scala | 12 +++-
 2 files changed, 13 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1f25d868/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 846ec03..139da1c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -102,8 +102,8 @@ case class InMemoryTableScanExec(
 case IsNull(a: Attribute) => statsFor(a).nullCount > 0
 case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount 
> 0
 
-case In(_: AttributeReference, list: Seq[Expression]) if list.isEmpty => 
Literal.FalseLiteral
-case In(a: AttributeReference, list: Seq[Expression]) if 
list.forall(_.isInstanceOf[Literal]) =>
+case In(a: AttributeReference, list: Seq[Expression])
+  if list.forall(_.isInstanceOf[Literal]) && list.nonEmpty =>
   list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] &&
 l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1f25d868/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 75d17bc..2f249c8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -21,8 +21,9 @@ import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 
 import org.apache.spark.sql.{DataFrame, QueryTest, Row}
-import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, In}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.execution.LocalTableScanExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -444,4 +445,13 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 assert(dfNulls.filter($"id".isin(2, 3)).count() == 0)
 dfNulls.unpersist()
   }
+
+  test("SPARK-22249: buildFilter should not throw exception when In contains 
an empty list") {
+val attribute = AttributeReference("a", IntegerType)()
+val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY,
+  LocalTableScanExec(Seq(attribute), Nil), None)
+val tableScanExec = InMemoryTableScanExec(Seq(attribute),
+  Seq(In(attribute, Nil)), testRelation)
+assert(tableScanExec.partitionFilters.isEmpty)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22266][SQL] The same aggregate function was evaluated multiple times

2017-10-18 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master f3137feec -> 72561ecf4


[SPARK-22266][SQL] The same aggregate function was evaluated multiple times

## What changes were proposed in this pull request?

To let the same aggregate function that appear multiple times in an Aggregate 
be evaluated only once, we need to deduplicate the aggregate expressions. The 
original code was trying to use a "distinct" call to get a set of aggregate 
expressions, but did not work, since the "distinct" did not compare semantic 
equality. And even if it did, further work should be done in result expression 
rewriting.
In this PR, I changed the "set" to a map mapping the semantic identity of a 
aggregate expression to itself. Thus, later on, when rewriting result 
expressions (i.e., output expressions), the aggregate expression reference can 
be fixed.

## How was this patch tested?

Added a new test in SQLQuerySuite

Author: maryannxue 

Closes #19488 from maryannxue/spark-22266.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72561ecf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72561ecf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72561ecf

Branch: refs/heads/master
Commit: 72561ecf4b611d68f8bf695ddd0c4c2cce3a29d9
Parents: f3137fe
Author: maryannxue 
Authored: Wed Oct 18 20:59:40 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Oct 18 20:59:40 2017 +0800

--
 .../spark/sql/catalyst/planning/patterns.scala  | 16 +++-
 .../org/apache/spark/sql/SQLQuerySuite.scala| 26 
 2 files changed, 36 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/72561ecf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 8d034c2..cc391aa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -205,14 +205,17 @@ object PhysicalAggregation {
 case logical.Aggregate(groupingExpressions, resultExpressions, child) =>
   // A single aggregate expression might appear multiple times in 
resultExpressions.
   // In order to avoid evaluating an individual aggregate function 
multiple times, we'll
-  // build a set of the distinct aggregate expressions and build a 
function which can
-  // be used to re-write expressions so that they reference the single 
copy of the
-  // aggregate function which actually gets computed.
+  // build a set of semantically distinct aggregate expressions and 
re-write expressions so
+  // that they reference the single copy of the aggregate function which 
actually gets computed.
+  // Non-deterministic aggregate expressions are not deduplicated.
+  val equivalentAggregateExpressions = new EquivalentExpressions
   val aggregateExpressions = resultExpressions.flatMap { expr =>
 expr.collect {
-  case agg: AggregateExpression => agg
+  // addExpr() always returns false for non-deterministic expressions 
and do not add them.
+  case agg: AggregateExpression
+if (!equivalentAggregateExpressions.addExpr(agg)) => agg
 }
-  }.distinct
+  }
 
   val namedGroupingExpressions = groupingExpressions.map {
 case ne: NamedExpression => ne -> ne
@@ -236,7 +239,8 @@ object PhysicalAggregation {
   case ae: AggregateExpression =>
 // The final aggregation buffer's attributes will be 
`finalAggregationAttributes`,
 // so replace each aggregate expression by its corresponding 
attribute in the set:
-ae.resultAttribute
+equivalentAggregateExpressions.getEquivalentExprs(ae).headOption
+  .getOrElse(ae).asInstanceOf[AggregateExpression].resultAttribute
   case expression =>
 // Since we're using `namedGroupingAttributes` to extract the 
grouping key
 // columns, we need to replace grouping key expressions with their 
corresponding

http://git-wip-us.apache.org/repos/asf/spark/blob/72561ecf/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index f0c58e2..caf332d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryS