spark git commit: [SPARK-22290][CORE] Avoid creating Hive delegation tokens when not necessary.
Repository: spark Updated Branches: refs/heads/master 6f1d0dea1 -> dc2714da5 [SPARK-22290][CORE] Avoid creating Hive delegation tokens when not necessary. Hive delegation tokens are only needed when the Spark driver has no access to the kerberos TGT. That happens only in two situations: - when using a proxy user - when using cluster mode without a keytab This change modifies the Hive provider so that it only generates delegation tokens in those situations, and tweaks the YARN AM so that it makes the proper user visible to the Hive code when running with keytabs, so that the TGT can be used instead of a delegation token. The effect of this change is that now it's possible to initialize multiple, non-concurrent SparkContext instances in the same JVM. Before, the second invocation would fail to fetch a new Hive delegation token, which then could make the second (or third or...) application fail once the token expired. With this change, the TGT will be used to authenticate to the HMS instead. This change also avoids polluting the current logged in user's credentials when launching applications. The credentials are copied only when running applications as a proxy user. This makes it possible to implement SPARK-11035 later, where multiple threads might be launching applications, and each app should have its own set of credentials. Tested by verifying HDFS and Hive access in following scenarios: - client and cluster mode - client and cluster mode with proxy user - client and cluster mode with principal / keytab - long-running cluster app with principal / keytab - pyspark app that creates (and stops) multiple SparkContext instances through its lifetime Author: Marcelo Vanzin Closes #19509 from vanzin/SPARK-22290. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc2714da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc2714da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc2714da Branch: refs/heads/master Commit: dc2714da50ecba1bf1fdf555a82a4314f763a76e Parents: 6f1d0de Author: Marcelo Vanzin Authored: Thu Oct 19 14:56:48 2017 +0800 Committer: jerryshao Committed: Thu Oct 19 14:56:48 2017 +0800 -- .../apache/spark/deploy/SparkHadoopUtil.scala | 17 +++-- .../security/HBaseDelegationTokenProvider.scala | 4 +- .../security/HadoopDelegationTokenManager.scala | 2 +- .../HadoopDelegationTokenProvider.scala | 2 +- .../HadoopFSDelegationTokenProvider.scala | 4 +- .../security/HiveDelegationTokenProvider.scala | 20 +- docs/running-on-yarn.md | 9 +++ .../spark/deploy/yarn/ApplicationMaster.scala | 69 .../org/apache/spark/deploy/yarn/Client.scala | 5 +- .../org/apache/spark/deploy/yarn/config.scala | 4 ++ .../spark/sql/hive/client/HiveClientImpl.scala | 6 -- 11 files changed, 110 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 53775db..1fa10ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -61,13 +61,17 @@ class SparkHadoopUtil extends Logging { * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems */ def runAsSparkUser(func: () => Unit) { +createSparkUser().doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() +}) + } + + def createSparkUser(): UserGroupInformation = { val user = Utils.getCurrentUserName() -logDebug("running as user: " + user) +logDebug("creating UGI for user: " + user) val ugi = UserGroupInformation.createRemoteUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) -ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() -}) +ugi } def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { @@ -417,6 +421,11 @@ class SparkHadoopUtil extends Logging { creds.readTokenStorageStream(new DataInputStream(tokensBuf)) creds } + + def isProxyUser(ugi: UserGroupInformation): Boolean = { +ugi.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY + } + } object SparkHadoopUtil { http://git-wip-us.apache.org/repos/asf/spark/blob/dc2714da/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala -- diff --git a/core/src/main/scala/org/apache/spa
spark git commit: [SPARK-22300][BUILD] Update ORC to 1.4.1
Repository: spark Updated Branches: refs/heads/master 52facb006 -> 6f1d0dea1 [SPARK-22300][BUILD] Update ORC to 1.4.1 ## What changes were proposed in this pull request? Apache ORC 1.4.1 is released yesterday. - https://orc.apache.org/news/2017/10/16/ORC-1.4.1/ Like ORC-233 (Allow `orc.include.columns` to be empty), there are several important fixes. This PR updates Apache ORC dependency to use the latest one, 1.4.1. ## How was this patch tested? Pass the Jenkins. Author: Dongjoon Hyun Closes #19521 from dongjoon-hyun/SPARK-22300. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f1d0dea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f1d0dea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f1d0dea Branch: refs/heads/master Commit: 6f1d0dea1cdda558c998179789b386f6e52b9e36 Parents: 52facb0 Author: Dongjoon Hyun Authored: Thu Oct 19 13:30:55 2017 +0800 Committer: Wenchen Fan Committed: Thu Oct 19 13:30:55 2017 +0800 -- dev/deps/spark-deps-hadoop-2.6 | 6 +++--- dev/deps/spark-deps-hadoop-2.7 | 6 +++--- pom.xml| 6 +- 3 files changed, 11 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6f1d0dea/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 76fcbd1..6e2fc63 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -2,7 +2,7 @@ JavaEWAH-0.3.2.jar RoaringBitmap-0.5.11.jar ST4-4.0.4.jar activation-1.1.1.jar -aircompressor-0.3.jar +aircompressor-0.8.jar antlr-2.7.7.jar antlr-runtime-3.4.jar antlr4-runtime-4.7.jar @@ -149,8 +149,8 @@ netty-3.9.9.Final.jar netty-all-4.0.47.Final.jar objenesis-2.1.jar opencsv-2.3.jar -orc-core-1.4.0-nohive.jar -orc-mapreduce-1.4.0-nohive.jar +orc-core-1.4.1-nohive.jar +orc-mapreduce-1.4.1-nohive.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar http://git-wip-us.apache.org/repos/asf/spark/blob/6f1d0dea/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index cb20072..c2bbc25 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -2,7 +2,7 @@ JavaEWAH-0.3.2.jar RoaringBitmap-0.5.11.jar ST4-4.0.4.jar activation-1.1.1.jar -aircompressor-0.3.jar +aircompressor-0.8.jar antlr-2.7.7.jar antlr-runtime-3.4.jar antlr4-runtime-4.7.jar @@ -150,8 +150,8 @@ netty-3.9.9.Final.jar netty-all-4.0.47.Final.jar objenesis-2.1.jar opencsv-2.3.jar -orc-core-1.4.0-nohive.jar -orc-mapreduce-1.4.0-nohive.jar +orc-core-1.4.1-nohive.jar +orc-mapreduce-1.4.1-nohive.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar http://git-wip-us.apache.org/repos/asf/spark/blob/6f1d0dea/pom.xml -- diff --git a/pom.xml b/pom.xml index 9fac8b1..b9c9728 100644 --- a/pom.xml +++ b/pom.xml @@ -128,7 +128,7 @@ 1.2.1 10.12.1.1 1.8.2 -1.4.0 +1.4.1 nohive 1.6.0 9.3.20.v20170531 @@ -1712,6 +1712,10 @@ org.apache.hive hive-storage-api + +io.airlift +slice + - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21551][PYTHON] Increase timeout for PythonRDD.serveIterator
Repository: spark Updated Branches: refs/heads/branch-2.2 010b50cea -> f8c83fdc5 [SPARK-21551][PYTHON] Increase timeout for PythonRDD.serveIterator Backport of https://github.com/apache/spark/pull/18752 (https://issues.apache.org/jira/browse/SPARK-21551) (cherry picked from commit 9d3c6640f56e3e4fd195d3ad8cead09df67a72c7) Author: peay Closes #19512 from FRosner/branch-2.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8c83fdc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8c83fdc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8c83fdc Branch: refs/heads/branch-2.2 Commit: f8c83fdc52ba9120098e52a35085448150af6b50 Parents: 010b50c Author: peay Authored: Thu Oct 19 13:07:04 2017 +0900 Committer: hyukjinkwon Committed: Thu Oct 19 13:07:04 2017 +0900 -- .../src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 6 +++--- python/pyspark/rdd.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f8c83fdc/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b0dd2fc..807b51f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -683,7 +683,7 @@ private[spark] object PythonRDD extends Logging { * Create a socket server and a background thread to serve the data in `items`, * * The socket server can only accept one connection, or close if no connection - * in 3 seconds. + * in 15 seconds. * * Once a connection comes in, it tries to serialize all the data in `items` * and send them into this connection. @@ -692,8 +692,8 @@ private[spark] object PythonRDD extends Logging { */ def serveIterator[T](items: Iterator[T], threadName: String): Int = { val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost")) -// Close the socket if no connection in 3 seconds -serverSocket.setSoTimeout(3000) +// Close the socket if no connection in 15 seconds +serverSocket.setSoTimeout(15000) new Thread(threadName) { setDaemon(true) http://git-wip-us.apache.org/repos/asf/spark/blob/f8c83fdc/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6014179..aca00bc 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -127,7 +127,7 @@ def _load_from_socket(port, serializer): af, socktype, proto, canonname, sa = res sock = socket.socket(af, socktype, proto) try: -sock.settimeout(3) +sock.settimeout(15) sock.connect(sa) except socket.error: sock.close() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-14371][MLLIB] OnlineLDAOptimizer should not collect stats for each doc in mini-batch to driver
Repository: spark Updated Branches: refs/heads/master 1f25d8683 -> 52facb006 [SPARK-14371][MLLIB] OnlineLDAOptimizer should not collect stats for each doc in mini-batch to driver Hi, # What changes were proposed in this pull request? as it was proposed by jkbradley , ```gammat``` are not collected to the driver anymore. # How was this patch tested? existing test suite. Author: Valeriy Avanesov Author: Valeriy Avanesov Closes #18924 from akopich/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52facb00 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52facb00 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52facb00 Branch: refs/heads/master Commit: 52facb0062a4253fa45ac0c633d0510a9b684a62 Parents: 1f25d86 Author: Valeriy Avanesov Authored: Wed Oct 18 10:46:46 2017 -0700 Committer: Joseph K. Bradley Committed: Wed Oct 18 10:46:46 2017 -0700 -- .../spark/mllib/clustering/LDAOptimizer.scala | 82 ++-- 1 file changed, 57 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/52facb00/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index d633893..693a2a3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -26,6 +26,7 @@ import breeze.stats.distributions.{Gamma, RandBasis} import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.graphx._ import org.apache.spark.graphx.util.PeriodicGraphCheckpointer +import org.apache.spark.internal.Logging import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -259,7 +260,7 @@ final class EMLDAOptimizer extends LDAOptimizer { */ @Since("1.4.0") @DeveloperApi -final class OnlineLDAOptimizer extends LDAOptimizer { +final class OnlineLDAOptimizer extends LDAOptimizer with Logging { // LDA common parameters private var k: Int = 0 @@ -462,31 +463,61 @@ final class OnlineLDAOptimizer extends LDAOptimizer { val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta) val alpha = this.alpha.asBreeze val gammaShape = this.gammaShape - -val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs => +val optimizeDocConcentration = this.optimizeDocConcentration +// If and only if optimizeDocConcentration is set true, +// we calculate logphat in the same pass as other statistics. +// No calculation of loghat happens otherwise. +val logphatPartOptionBase = () => if (optimizeDocConcentration) { +Some(BDV.zeros[Double](k)) + } else { +None + } + +val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs => val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0) val stat = BDM.zeros[Double](k, vocabSize) - var gammaPart = List[BDV[Double]]() + val logphatPartOption = logphatPartOptionBase() + var nonEmptyDocCount: Long = 0L nonEmptyDocs.foreach { case (_, termCounts: Vector) => +nonEmptyDocCount += 1 val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference( termCounts, expElogbetaBc.value, alpha, gammaShape, k) -stat(::, ids) := stat(::, ids).toDenseMatrix + sstats -gammaPart = gammad :: gammaPart +stat(::, ids) := stat(::, ids) + sstats +logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad)) } - Iterator((stat, gammaPart)) -}.persist(StorageLevel.MEMORY_AND_DISK) -val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( - _ += _, _ += _) -val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) -stats.unpersist() + Iterator((stat, logphatPartOption, nonEmptyDocCount)) +} + +val elementWiseSum = ( +u: (BDM[Double], Option[BDV[Double]], Long), +v: (BDM[Double], Option[BDV[Double]], Long)) => { + u._1 += v._1 + u._2.foreach(_ += v._2.get) + (u._1, u._2, u._3 + v._3) +} + +val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats + .treeAggregate((BDM.zeros[Double](k,
spark git commit: [SPARK-22249][FOLLOWUP][SQL] Check if list of value for IN is empty in the optimizer
Repository: spark Updated Branches: refs/heads/branch-2.2 c42309143 -> 010b50cea [SPARK-22249][FOLLOWUP][SQL] Check if list of value for IN is empty in the optimizer ## What changes were proposed in this pull request? This PR addresses the comments by gatorsmile on [the previous PR](https://github.com/apache/spark/pull/19494). ## How was this patch tested? Previous UT and added UT. Author: Marco Gaido Closes #19522 from mgaido91/SPARK-22249_FOLLOWUP. (cherry picked from commit 1f25d8683a84a479fd7fc77b5a1ea980289b681b) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/010b50ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/010b50ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/010b50ce Branch: refs/heads/branch-2.2 Commit: 010b50cea6e1662ee79f9ac22ed75ec41ca0e483 Parents: c423091 Author: Marco Gaido Authored: Wed Oct 18 09:14:46 2017 -0700 Committer: gatorsmile Committed: Wed Oct 18 09:14:57 2017 -0700 -- .../sql/execution/columnar/InMemoryTableScanExec.scala | 4 ++-- .../execution/columnar/InMemoryColumnarQuerySuite.scala | 12 +++- 2 files changed, 13 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/010b50ce/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index e792a45..7c2c13e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -102,8 +102,8 @@ case class InMemoryTableScanExec( case IsNull(a: Attribute) => statsFor(a).nullCount > 0 case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 -case In(_: AttributeReference, list: Seq[Expression]) if list.isEmpty => Literal.FalseLiteral -case In(a: AttributeReference, list: Seq[Expression]) if list.forall(_.isInstanceOf[Literal]) => +case In(a: AttributeReference, list: Seq[Expression]) + if list.forall(_.isInstanceOf[Literal]) && list.nonEmpty => list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] && l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _) } http://git-wip-us.apache.org/repos/asf/spark/blob/010b50ce/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 67cff51..b049b60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -21,8 +21,9 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.apache.spark.sql.{DataFrame, QueryTest, Row} -import org.apache.spark.sql.catalyst.expressions.AttributeSet +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -444,4 +445,13 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { assert(dfNulls.filter($"id".isin(2, 3)).count() == 0) dfNulls.unpersist() } + + test("SPARK-22249: buildFilter should not throw exception when In contains an empty list") { +val attribute = AttributeReference("a", IntegerType)() +val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, + LocalTableScanExec(Seq(attribute), Nil), None) +val tableScanExec = InMemoryTableScanExec(Seq(attribute), + Seq(In(attribute, Nil)), testRelation) +assert(tableScanExec.partitionFilters.isEmpty) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22249][FOLLOWUP][SQL] Check if list of value for IN is empty in the optimizer
Repository: spark Updated Branches: refs/heads/master 72561ecf4 -> 1f25d8683 [SPARK-22249][FOLLOWUP][SQL] Check if list of value for IN is empty in the optimizer ## What changes were proposed in this pull request? This PR addresses the comments by gatorsmile on [the previous PR](https://github.com/apache/spark/pull/19494). ## How was this patch tested? Previous UT and added UT. Author: Marco Gaido Closes #19522 from mgaido91/SPARK-22249_FOLLOWUP. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f25d868 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f25d868 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f25d868 Branch: refs/heads/master Commit: 1f25d8683a84a479fd7fc77b5a1ea980289b681b Parents: 72561ec Author: Marco Gaido Authored: Wed Oct 18 09:14:46 2017 -0700 Committer: gatorsmile Committed: Wed Oct 18 09:14:46 2017 -0700 -- .../sql/execution/columnar/InMemoryTableScanExec.scala | 4 ++-- .../execution/columnar/InMemoryColumnarQuerySuite.scala | 12 +++- 2 files changed, 13 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f25d868/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 846ec03..139da1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -102,8 +102,8 @@ case class InMemoryTableScanExec( case IsNull(a: Attribute) => statsFor(a).nullCount > 0 case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 -case In(_: AttributeReference, list: Seq[Expression]) if list.isEmpty => Literal.FalseLiteral -case In(a: AttributeReference, list: Seq[Expression]) if list.forall(_.isInstanceOf[Literal]) => +case In(a: AttributeReference, list: Seq[Expression]) + if list.forall(_.isInstanceOf[Literal]) && list.nonEmpty => list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] && l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _) } http://git-wip-us.apache.org/repos/asf/spark/blob/1f25d868/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 75d17bc..2f249c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -21,8 +21,9 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.apache.spark.sql.{DataFrame, QueryTest, Row} -import org.apache.spark.sql.catalyst.expressions.AttributeSet +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -444,4 +445,13 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { assert(dfNulls.filter($"id".isin(2, 3)).count() == 0) dfNulls.unpersist() } + + test("SPARK-22249: buildFilter should not throw exception when In contains an empty list") { +val attribute = AttributeReference("a", IntegerType)() +val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, + LocalTableScanExec(Seq(attribute), Nil), None) +val tableScanExec = InMemoryTableScanExec(Seq(attribute), + Seq(In(attribute, Nil)), testRelation) +assert(tableScanExec.partitionFilters.isEmpty) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22266][SQL] The same aggregate function was evaluated multiple times
Repository: spark Updated Branches: refs/heads/master f3137feec -> 72561ecf4 [SPARK-22266][SQL] The same aggregate function was evaluated multiple times ## What changes were proposed in this pull request? To let the same aggregate function that appear multiple times in an Aggregate be evaluated only once, we need to deduplicate the aggregate expressions. The original code was trying to use a "distinct" call to get a set of aggregate expressions, but did not work, since the "distinct" did not compare semantic equality. And even if it did, further work should be done in result expression rewriting. In this PR, I changed the "set" to a map mapping the semantic identity of a aggregate expression to itself. Thus, later on, when rewriting result expressions (i.e., output expressions), the aggregate expression reference can be fixed. ## How was this patch tested? Added a new test in SQLQuerySuite Author: maryannxue Closes #19488 from maryannxue/spark-22266. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72561ecf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72561ecf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72561ecf Branch: refs/heads/master Commit: 72561ecf4b611d68f8bf695ddd0c4c2cce3a29d9 Parents: f3137fe Author: maryannxue Authored: Wed Oct 18 20:59:40 2017 +0800 Committer: Wenchen Fan Committed: Wed Oct 18 20:59:40 2017 +0800 -- .../spark/sql/catalyst/planning/patterns.scala | 16 +++- .../org/apache/spark/sql/SQLQuerySuite.scala| 26 2 files changed, 36 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72561ecf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 8d034c2..cc391aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -205,14 +205,17 @@ object PhysicalAggregation { case logical.Aggregate(groupingExpressions, resultExpressions, child) => // A single aggregate expression might appear multiple times in resultExpressions. // In order to avoid evaluating an individual aggregate function multiple times, we'll - // build a set of the distinct aggregate expressions and build a function which can - // be used to re-write expressions so that they reference the single copy of the - // aggregate function which actually gets computed. + // build a set of semantically distinct aggregate expressions and re-write expressions so + // that they reference the single copy of the aggregate function which actually gets computed. + // Non-deterministic aggregate expressions are not deduplicated. + val equivalentAggregateExpressions = new EquivalentExpressions val aggregateExpressions = resultExpressions.flatMap { expr => expr.collect { - case agg: AggregateExpression => agg + // addExpr() always returns false for non-deterministic expressions and do not add them. + case agg: AggregateExpression +if (!equivalentAggregateExpressions.addExpr(agg)) => agg } - }.distinct + } val namedGroupingExpressions = groupingExpressions.map { case ne: NamedExpression => ne -> ne @@ -236,7 +239,8 @@ object PhysicalAggregation { case ae: AggregateExpression => // The final aggregation buffer's attributes will be `finalAggregationAttributes`, // so replace each aggregate expression by its corresponding attribute in the set: -ae.resultAttribute +equivalentAggregateExpressions.getEquivalentExprs(ae).headOption + .getOrElse(ae).asInstanceOf[AggregateExpression].resultAttribute case expression => // Since we're using `namedGroupingAttributes` to extract the grouping key // columns, we need to replace grouping key expressions with their corresponding http://git-wip-us.apache.org/repos/asf/spark/blob/72561ecf/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f0c58e2..caf332d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryS