Repository: spark Updated Branches: refs/heads/branch-2.0 62e5158f1 -> d1b5df83d
[SPARK-15392][SQL] fix default value of size estimation of logical plan ## What changes were proposed in this pull request? We use autoBroadcastJoinThreshold + 1L as the default value of size estimation, that is not good in 2.0, because we will calculate the size based on size of schema, then the estimation could be less than autoBroadcastJoinThreshold if you have an SELECT on top of an DataFrame created from RDD. This PR change the default value to Long.MaxValue. ## How was this patch tested? Added regression tests. Author: Davies Liu <dav...@databricks.com> Closes #13183 from davies/fix_default_size. (cherry picked from commit 5ccecc078aa757d3f1f6632aa6df5659490f602f) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1b5df83 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1b5df83 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1b5df83 Branch: refs/heads/branch-2.0 Commit: d1b5df83d789510340e20a98bee8fd3e0e55b8f8 Parents: 62e5158 Author: Davies Liu <dav...@databricks.com> Authored: Thu May 19 12:12:42 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Thu May 19 12:12:51 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/dataframe.py | 2 +- .../org/apache/spark/sql/internal/SQLConf.scala | 6 ++-- .../scala/org/apache/spark/sql/JoinSuite.scala | 2 +- .../org/apache/spark/sql/StatisticsSuite.scala | 34 ++++++++++++++++++++ 4 files changed, 39 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d1b5df83/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a68ef33..4fa799a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -576,7 +576,7 @@ class DataFrame(object): >>> df_as2 = df.alias("df_as2") >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner') >>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect() - [Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)] + [Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', name=u'Alice', age=2)] """ assert isinstance(alias, basestring), "alias should be a string" return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx) http://git-wip-us.apache.org/repos/asf/spark/blob/d1b5df83/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 248c6e3..5d18689 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -120,8 +120,8 @@ object SQLConf { "nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " + "Note that currently statistics are only supported for Hive Metastore tables where the " + "command<code>ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan</code> has been run.") - .intConf - .createWithDefault(10 * 1024 * 1024) + .longConf + .createWithDefault(10L * 1024 * 1024) val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes") .internal() @@ -599,7 +599,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def subexpressionEliminationEnabled: Boolean = getConf(SUBEXPRESSION_ELIMINATION_ENABLED) - def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD) + def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD) def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) http://git-wip-us.apache.org/repos/asf/spark/blob/d1b5df83/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index a6b83b3..a5d8cb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -438,7 +438,7 @@ class JoinSuite extends QueryTest with SharedSQLContext { spark.cacheManager.clearCache() sql("CACHE TABLE testData") - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1000000000") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) { Seq( ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[BroadcastHashJoinExec]), http://git-wip-us.apache.org/repos/asf/spark/blob/d1b5df83/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala new file mode 100644 index 0000000..9523f6f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +class StatisticsSuite extends QueryTest with SharedSQLContext { + + test("SPARK-15392: DataFrame created from RDD should not be broadcasted") { + val rdd = sparkContext.range(1, 100).map(i => Row(i, i)) + val df = spark.createDataFrame(rdd, new StructType().add("a", LongType).add("b", LongType)) + assert(df.queryExecution.analyzed.statistics.sizeInBytes > + spark.wrapped.conf.autoBroadcastJoinThreshold) + assert(df.selectExpr("a").queryExecution.analyzed.statistics.sizeInBytes > + spark.wrapped.conf.autoBroadcastJoinThreshold) + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org