[spark] branch branch-3.1 updated: [SPARK-34275][CORE][SQL][MLLIB] Replaces filter and size with count
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new a9048fd [SPARK-34275][CORE][SQL][MLLIB] Replaces filter and size with count a9048fd is described below commit a9048fdeb4d266fe3f1c14a33d8bedba4b88e6d2 Author: yangjie01 AuthorDate: Thu Jan 28 15:27:07 2021 +0900 [SPARK-34275][CORE][SQL][MLLIB] Replaces filter and size with count ### What changes were proposed in this pull request? Use `count` to simplify `find + size(or length)` operation, it's semantically consistent, but looks simpler. **Before** ``` seq.filter(p).size ``` **After** ``` seq.count(p) ``` ### Why are the changes needed? Code Simpilefications. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #31374 from LuciferYang/SPARK-34275. Authored-by: yangjie01 Signed-off-by: HyukjinKwon (cherry picked from commit 15445a8d9e8dd8660aa668a5b82ba2cbc6a5a233) Signed-off-by: HyukjinKwon --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- .../org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala | 6 +++--- core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 4 ++-- .../spark/storage/BlockManagerDecommissionIntegrationSuite.scala | 8 .../org/apache/spark/ml/classification/NaiveBayesSuite.scala | 4 ++-- .../org/apache/spark/sql/TypedImperativeAggregateSuite.scala | 4 ++-- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index a83762f..bdb768e 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -908,7 +908,7 @@ private[spark] class ExecutorAllocationManager( */ def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = { val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq - attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size + attempts.count(attempt => unschedulableTaskSets.contains(attempt)) } def hasPendingTasks: Boolean = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 8dbdc84..b244475 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -183,7 +183,7 @@ private[spark] class ExecutorMonitor( def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => exec.pendingRemoval } def pendingRemovalCountPerResourceProfileId(id: Int): Int = { -executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size +executors.asScala.count { case (k, v) => v.resourceProfileId == id && v.pendingRemoval } } def decommissioningCount: Int = executors.asScala.count { case (_, exec) => @@ -191,9 +191,9 @@ private[spark] class ExecutorMonitor( } def decommissioningPerResourceProfileId(id: Int): Int = { -executors.asScala.filter { case (k, v) => +executors.asScala.count { case (k, v) => v.resourceProfileId == id && v.decommissioning -}.size +} } override def onJobStart(event: SparkListenerJobStart): Unit = { diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 8c9c217..a728108 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -154,7 +154,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } x }).count() -assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1) +assert(sc.listFiles().count(_.contains("somesuffix1")) == 1) } finally { sc.stop() } @@ -245,7 +245,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu try { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) sc.addJar(jarPath.toString) - assert(sc.listJars().filter(_.contains("TestUDTF.jar")).size == 1) + assert(sc.listJars().count(_.contains("TestUDTF.jar")) == 1) } finally { sc.stop() } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scal
[spark] branch master updated (d242166 -> 15445a8)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d242166 [SPARK-34262][SQL] Refresh cached data of v1 table in `ALTER TABLE .. SET LOCATION` add 15445a8 [SPARK-34275][CORE][SQL][MLLIB] Replaces filter and size with count No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- .../org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala | 6 +++--- core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 4 ++-- .../spark/storage/BlockManagerDecommissionIntegrationSuite.scala | 8 .../org/apache/spark/ml/classification/NaiveBayesSuite.scala | 4 ++-- .../org/apache/spark/sql/TypedImperativeAggregateSuite.scala | 4 ++-- 6 files changed, 14 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-34262][SQL] Refresh cached data of v1 table in `ALTER TABLE .. SET LOCATION`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d242166 [SPARK-34262][SQL] Refresh cached data of v1 table in `ALTER TABLE .. SET LOCATION` d242166 is described below commit d242166b8fd741fdd46d9048f847b2fd6e1d07b1 Author: Max Gekk AuthorDate: Thu Jan 28 15:05:22 2021 +0900 [SPARK-34262][SQL] Refresh cached data of v1 table in `ALTER TABLE .. SET LOCATION` ### What changes were proposed in this pull request? Invoke `CatalogImpl.refreshTable()` in v1 implementation of the `ALTER TABLE .. SET LOCATION` command to refresh cached table data. ### Why are the changes needed? The example below portraits the issue: - Create a source table: ```sql spark-sql> CREATE TABLE src_tbl (c0 int, part int) USING hive PARTITIONED BY (part); spark-sql> INSERT INTO src_tbl PARTITION (part=0) SELECT 0; spark-sql> SHOW TABLE EXTENDED LIKE 'src_tbl' PARTITION (part=0); default src_tbl false Partition Values: [part=0] Location: file:/Users/maximgekk/proj/refresh-cache-set-location/spark-warehouse/src_tbl/part=0 ... ``` - Set new location for the empty partition (part=0): ```sql spark-sql> CREATE TABLE dst_tbl (c0 int, part int) USING hive PARTITIONED BY (part); spark-sql> ALTER TABLE dst_tbl ADD PARTITION (part=0); spark-sql> INSERT INTO dst_tbl PARTITION (part=1) SELECT 1; spark-sql> CACHE TABLE dst_tbl; spark-sql> SELECT * FROM dst_tbl; 1 1 spark-sql> ALTER TABLE dst_tbl PARTITION (part=0) SET LOCATION '/Users/maximgekk/proj/refresh-cache-set-location/spark-warehouse/src_tbl/part=0'; spark-sql> SELECT * FROM dst_tbl; 1 1 ``` The last query does not return new loaded data. ### Does this PR introduce _any_ user-facing change? Yes. After the changes, the example above works correctly: ```sql spark-sql> ALTER TABLE dst_tbl PARTITION (part=0) SET LOCATION '/Users/maximgekk/proj/refresh-cache-set-location/spark-warehouse/src_tbl/part=0'; spark-sql> SELECT * FROM dst_tbl; 0 0 1 1 ``` ### How was this patch tested? Added new test to `org.apache.spark.sql.hive.CachedTableSuite`: ``` $ build/sbt -Phive -Phive-thriftserver "test:testOnly *CachedTableSuite" ``` Closes #31361 from MaxGekk/refresh-cache-set-location. Authored-by: Max Gekk Signed-off-by: HyukjinKwon --- .../apache/spark/sql/execution/command/ddl.scala | 2 +- .../apache/spark/sql/hive/CachedTableSuite.scala | 39 +- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 886bc49..9803c4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -835,7 +835,7 @@ case class AlterTableSetLocationCommand( // No partition spec is specified, so we set the location for the table itself catalog.alterTable(table.withNewStorage(locationUri = Some(locUri))) } - +sparkSession.catalog.refreshTable(table.identifier.quotedString) CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 765cc18..e43dfab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -477,24 +477,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } } + private def getPartitionLocation(t: String, partition: String): String = { +val information = sql(s"SHOW TABLE EXTENDED LIKE '$t' PARTITION ($partition)") + .select("information") + .first().getString(0) +information + .split("\\r?\\n") + .filter(_.startsWith("Location:")) + .head + .replace("Location: file:", "") + } + test("SPARK-34213: LOAD DATA refreshes cached table") { withTable("src_tbl") { withTable("dst_tbl") { sql("CREATE TABLE src_tbl (c0 int, part int) USING hive PARTITIONED BY (part)") sql("INSERT INTO src_tbl PARTITION (part=0) SELECT 0") -val information = sql("SHOW TABLE EXTENDED LIKE 'src_tbl' PARTITION (part=0)") - .select("information") - .first().getString(0) -val location = information - .split("\\r?\\n") - .filter(_.startsWith("Location:")) - .head - .replace("Location: file:", "") sql("CREATE TABL
[spark] branch branch-3.0 updated (8b3739e0 -> 19540b2)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git. from 8b3739e0 [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function add 19540b2 [SPARK-34260][SQL][3.0] Fix UnresolvedException when creating temp view twice No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/command/views.scala | 8 .../org/apache/spark/sql/execution/SQLViewSuite.scala | 17 + 2 files changed, 21 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (850990f -> b12e9a4)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 850990f [SPARK-34238][SQL] Unify output of SHOW PARTITIONS and pass output attributes properly add b12e9a4 [SPARK-33542][SQL][FOLLOWUP] Group exception messages in catalyst/catalog No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 3 +-- .../scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala | 6 ++ 2 files changed, 7 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-34238][SQL] Unify output of SHOW PARTITIONS and pass output attributes properly
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 850990f [SPARK-34238][SQL] Unify output of SHOW PARTITIONS and pass output attributes properly 850990f is described below commit 850990f40e5cd71e4c455320965b26df9f3be202 Author: Angerszh AuthorDate: Thu Jan 28 05:13:19 2021 + [SPARK-34238][SQL] Unify output of SHOW PARTITIONS and pass output attributes properly ### What changes were proposed in this pull request? Passing around the output attributes should have more benefits like keeping the expr ID unchanged to avoid bugs when we apply more operators above the command output dataframe. This PR keep SHOW PARTITIONS command's output attribute exprId unchanged. And benefit for https://issues.apache.org/jira/browse/SPARK-34238 ### Why are the changes needed? Keep SHOW PARTITIONS command's output attribute exprid unchanged. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #31341 from AngersZh/SPARK-34238. Authored-by: Angerszh Signed-off-by: Wenchen Fan --- .../apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala| 3 ++- .../main/scala/org/apache/spark/sql/execution/command/tables.scala| 4 +--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 16cd206..ae74a7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -414,11 +414,12 @@ class ResolveSessionCatalog( ident.asTableIdentifier, partitionSpec) -case ShowPartitions( +case s @ ShowPartitions( ResolvedV1TableOrViewIdentifier(ident), pattern @ (None | Some(UnresolvedPartitionSpec(_, _ => ShowPartitionsCommand( ident.asTableIdentifier, +s.output, pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec)) case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 63efbb6..cd89872 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -968,10 +968,8 @@ case class ShowColumnsCommand( */ case class ShowPartitionsCommand( tableName: TableIdentifier, +override val output: Seq[Attribute], spec: Option[TablePartitionSpec]) extends RunnableCommand { - override val output: Seq[Attribute] = { -AttributeReference("partition", StringType, nullable = false)() :: Nil - } override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 86eb199 [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function 86eb199 is described below commit 86eb199a12447e67969e136a7f61a45544c22e5a Author: Yuming Wang AuthorDate: Thu Jan 28 14:06:36 2021 +0900 [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function ### What changes were proposed in this pull request? This pr correct the documentation of the `concat_ws` function. ### Why are the changes needed? `concat_ws` doesn't need any str or array(str) arguments: ``` scala> sql("""select concat_ws("s")""").show ++ |concat_ws(s)| ++ || ++ ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` build/sbt "sql/testOnly *.ExpressionInfoSuite" ``` Closes #31370 from wangyum/SPARK-34268. Authored-by: Yuming Wang Signed-off-by: HyukjinKwon (cherry picked from commit 01d11da84ef7c3abbfd1072c421505589ac1e9b2) Signed-off-by: HyukjinKwon --- .../org/apache/spark/sql/catalyst/expressions/stringExpressions.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index f0c3208..17e368f 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -45,11 +45,13 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(sep, [str | array(str)]+) - Returns the concatenation of the strings separated by `sep`.", + usage = "_FUNC_(sep[, str | array(str)]+) - Returns the concatenation of the strings separated by `sep`.", examples = """ Examples: > SELECT _FUNC_(' ', 'Spark', 'SQL'); Spark SQL + > SELECT _FUNC_('s'); + """, since = "1.5.0") // scalastyle:on line.size.limit - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8b3739e0 [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function 8b3739e0 is described below commit 8b3739e0fbe274f72ad225c0e9c0ba636179348b Author: Yuming Wang AuthorDate: Thu Jan 28 14:06:36 2021 +0900 [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function ### What changes were proposed in this pull request? This pr correct the documentation of the `concat_ws` function. ### Why are the changes needed? `concat_ws` doesn't need any str or array(str) arguments: ``` scala> sql("""select concat_ws("s")""").show ++ |concat_ws(s)| ++ || ++ ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` build/sbt "sql/testOnly *.ExpressionInfoSuite" ``` Closes #31370 from wangyum/SPARK-34268. Authored-by: Yuming Wang Signed-off-by: HyukjinKwon (cherry picked from commit 01d11da84ef7c3abbfd1072c421505589ac1e9b2) Signed-off-by: HyukjinKwon --- .../org/apache/spark/sql/catalyst/expressions/stringExpressions.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 2cd92d7..e121b36 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -48,11 +48,13 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(sep, [str | array(str)]+) - Returns the concatenation of the strings separated by `sep`.", + usage = "_FUNC_(sep[, str | array(str)]+) - Returns the concatenation of the strings separated by `sep`.", examples = """ Examples: > SELECT _FUNC_(' ', 'Spark', 'SQL'); Spark SQL + > SELECT _FUNC_('s'); + """, since = "1.5.0") // scalastyle:on line.size.limit - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new d147799 [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function d147799 is described below commit d1477990064b3b08f4b240fe373c59f57a39259f Author: Yuming Wang AuthorDate: Thu Jan 28 14:06:36 2021 +0900 [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function ### What changes were proposed in this pull request? This pr correct the documentation of the `concat_ws` function. ### Why are the changes needed? `concat_ws` doesn't need any str or array(str) arguments: ``` scala> sql("""select concat_ws("s")""").show ++ |concat_ws(s)| ++ || ++ ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` build/sbt "sql/testOnly *.ExpressionInfoSuite" ``` Closes #31370 from wangyum/SPARK-34268. Authored-by: Yuming Wang Signed-off-by: HyukjinKwon (cherry picked from commit 01d11da84ef7c3abbfd1072c421505589ac1e9b2) Signed-off-by: HyukjinKwon --- .../org/apache/spark/sql/catalyst/expressions/stringExpressions.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 9f92181..37ca8ee 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -49,11 +49,13 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(sep, [str | array(str)]+) - Returns the concatenation of the strings separated by `sep`.", + usage = "_FUNC_(sep[, str | array(str)]+) - Returns the concatenation of the strings separated by `sep`.", examples = """ Examples: > SELECT _FUNC_(' ', 'Spark', 'SQL'); Spark SQL + > SELECT _FUNC_('s'); + """, since = "1.5.0") // scalastyle:on line.size.limit - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6ec3cf6 -> 01d11da)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 6ec3cf6 [SPARK-34271][SQL] Use majorMinorPatchVersion for Hive version parsing add 01d11da [SPARK-34268][SQL][DOCS] Correct the documentation of the concat_ws function No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/catalyst/expressions/stringExpressions.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-34260][SQL] Fix UnresolvedException when creating temp view twice
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 0c8f111 [SPARK-34260][SQL] Fix UnresolvedException when creating temp view twice 0c8f111 is described below commit 0c8f111af4303abf7bfcecba4ec2a3c1f39fb7e8 Author: Linhong Liu AuthorDate: Wed Jan 27 20:59:23 2021 -0800 [SPARK-34260][SQL] Fix UnresolvedException when creating temp view twice ### What changes were proposed in this pull request? In PR #30140, it will compare new and old plans when replacing view and uncache data if the view has changed. But the compared new plan is not analyzed which will cause `UnresolvedException` when calling `sameResult`. So in this PR, we use the analyzed plan to compare to fix this problem. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? newly added tests Closes #31360 from linhongliu-db/SPARK-34260. Authored-by: Linhong Liu Signed-off-by: Dongjoon Hyun (cherry picked from commit cf1400c8ddc3bd534455227c40e5fb53ecf9cdee) Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/sql/execution/command/views.scala | 8 .../scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala | 9 + 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 7b8c44e..81f2c0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -114,13 +114,13 @@ case class CreateViewCommand( verifyTemporaryObjectsNotExists(catalog, isTemporary, name, child) if (viewType == LocalTempView) { + val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) if (replace && catalog.getRawTempView(name.table).isDefined && - !catalog.getRawTempView(name.table).get.sameResult(child)) { + !catalog.getRawTempView(name.table).get.sameResult(aliasedPlan)) { logInfo(s"Try to uncache ${name.quotedString} before replacing.") checkCyclicViewReference(analyzedPlan, Seq(name), name) CommandUtils.uncacheTableOrView(sparkSession, name.quotedString) } - val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) // If there is no sql text (e.g. from Dataset API), we will always store the analyzed plan val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) { TemporaryViewRelation( @@ -138,13 +138,13 @@ case class CreateViewCommand( } else if (viewType == GlobalTempView) { val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) val viewIdent = TableIdentifier(name.table, Option(db)) + val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) if (replace && catalog.getRawGlobalTempView(name.table).isDefined && - !catalog.getRawGlobalTempView(name.table).get.sameResult(child)) { + !catalog.getRawGlobalTempView(name.table).get.sameResult(aliasedPlan)) { logInfo(s"Try to uncache ${viewIdent.quotedString} before replacing.") checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString) } - val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) { TemporaryViewRelation( prepareTemporaryView( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 8c3d923..68e1a68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -249,6 +249,15 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils { } } } + + test("SPARK-34260: replace existing view using CREATE OR REPLACE") { +val viewName = createView("testView", "SELECT * FROM (SELECT 1)") +withView(viewName) { + checkViewOutput(viewName, Seq(Row(1))) + createView("testView", "SELECT * FROM (SELECT 2)", replace = true) + checkViewOutput(viewName, Seq(Row(2))) +} + } } class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@sp
[spark] branch master updated (cf1400c -> 6ec3cf6)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from cf1400c [SPARK-34260][SQL] Fix UnresolvedException when creating temp view twice add 6ec3cf6 [SPARK-34271][SQL] Use majorMinorPatchVersion for Hive version parsing No new revisions were added by this update. Summary of changes: .../sql/hive/client/IsolatedClientLoader.scala | 32 -- 1 file changed, 17 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (829f118 -> cf1400c)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 829f118 [SPARK-33867][SQL] Instant and LocalDate values aren't handled when generating SQL queries add cf1400c [SPARK-34260][SQL] Fix UnresolvedException when creating temp view twice No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/execution/command/views.scala | 8 .../scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala | 9 + 2 files changed, 13 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: Revert "[SPARK-34233][SQL] FIX NPE for char padding in binary comparison"
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new ed3c479 Revert "[SPARK-34233][SQL] FIX NPE for char padding in binary comparison" ed3c479 is described below commit ed3c479a3664919515bc74a3212fc96f8d2feff7 Author: Dongjoon Hyun AuthorDate: Wed Jan 27 20:07:08 2021 -0800 Revert "[SPARK-34233][SQL] FIX NPE for char padding in binary comparison" This reverts commit cf21e8898ab484a833b6696d0cf4bb0c871e7ff6. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 22 --- .../apache/spark/sql/CharVarcharTestSuite.scala| 43 +- 2 files changed, 10 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6fd6901..fb95323a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3888,15 +3888,13 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] { if attr.dataType == StringType && list.forall(_.foldable) => CharVarcharUtils.getRawType(attr.metadata).flatMap { case CharType(length) => - val (nulls, literalChars) = -list.map(_.eval().asInstanceOf[UTF8String]).partition(_ == null) - val literalCharLengths = literalChars.map(_.numChars()) + val literalCharLengths = list.map(_.eval().asInstanceOf[UTF8String].numChars()) val targetLen = (length +: literalCharLengths).max Some(i.copy( value = addPadding(attr, length, targetLen), list = list.zip(literalCharLengths).map { case (lit, charLength) => addPadding(lit, charLength, targetLen) -} ++ nulls.map(Literal.create(_, StringType +})) case _ => None }.getOrElse(i) @@ -3917,17 +3915,13 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] { CharVarcharUtils.getRawType(attr.metadata).flatMap { case CharType(length) => val str = lit.eval().asInstanceOf[UTF8String] - if (str == null) { -None + val stringLitLen = str.numChars() + if (length < stringLitLen) { +Some(Seq(StringRPad(attr, Literal(stringLitLen)), lit)) + } else if (length > stringLitLen) { +Some(Seq(attr, StringRPad(lit, Literal(length } else { -val stringLitLen = str.numChars() -if (length < stringLitLen) { - Some(Seq(StringRPad(attr, Literal(stringLitLen)), lit)) -} else if (length > stringLitLen) { - Some(Seq(attr, StringRPad(lit, Literal(length -} else { - None -} +None } case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 744757b..ff8820a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -152,22 +152,6 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { } } - test("SPARK-34233: char/varchar with null value for partitioned columns") { -Seq("CHAR(5)", "VARCHAR(5)").foreach { typ => - withTable("t") { -sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)") -sql("INSERT INTO t VALUES ('1', null)") -checkPlainResult(spark.table("t"), typ, null) -sql("INSERT OVERWRITE t VALUES ('1', null)") -checkPlainResult(spark.table("t"), typ, null) -sql("INSERT OVERWRITE t PARTITION (c=null) VALUES ('1')") -checkPlainResult(spark.table("t"), typ, null) -sql("ALTER TABLE t DROP PARTITION(c=null)") -checkAnswer(spark.table("t"), Nil) - } -} - } - test("char/varchar type values length check: partitioned columns of other types") { // DSV2 doesn't support DROP PARTITION yet. assume(format != "foo") @@ -451,8 +435,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { ("c1 IN ('a', 'b')", true), ("c1 = c2", true), ("c1 < c2", false), -("c1 IN (c2)", true), -("c1 <=> null", false))) +("c1 IN (c2)", true))) } } @@ -468,29 +451,7 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { ("c1 IN ('a', 'b')", true), ("c1 = c2", true), ("c1 < c2", false), -("c1 IN (c2)", true), -
[spark] branch branch-3.1 updated: [SPARK-33867][SQL] Instant and LocalDate values aren't handled when generating SQL queries
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 4ca628eb [SPARK-33867][SQL] Instant and LocalDate values aren't handled when generating SQL queries 4ca628eb is described below commit 4ca628eb2f54c3e039867c5ccbb0cde7413c18e4 Author: Chircu AuthorDate: Thu Jan 28 11:58:20 2021 +0900 [SPARK-33867][SQL] Instant and LocalDate values aren't handled when generating SQL queries ### What changes were proposed in this pull request? When generating SQL queries only the old date time API types are handled for values in org.apache.spark.sql.jdbc.JdbcDialect#compileValue. If the new API is used (spark.sql.datetime.java8API.enabled=true) Instant and LocalDate values are not quoted and errors are thrown. The change proposed is to handle Instant and LocalDate values the same way that Timestamp and Date are. ### Why are the changes needed? In the current state if an Instant is used in a filter, an exception will be thrown. Ex (dataset was read from PostgreSQL): dataset.filter(current_timestamp().gt(col(VALID_FROM))) Stacktrace (the T11 is from an instant formatted like -MM-dd'T'HH:mm:ss.SS'Z'): Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or near "T11"Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or near "T11" Position: 285 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2103) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1836) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:257) at org.postgresql.jdbc2.AbstractJdbc2Statement. [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test added Closes #31148 from cristichircu/SPARK-33867. Lead-authored-by: Chircu Co-authored-by: Cristi Chircu Signed-off-by: Takeshi Yamamuro (cherry picked from commit 829f118f98ef0732c8dd784f06298465e47ee3a0) Signed-off-by: Takeshi Yamamuro --- .../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 10 ++ .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 ++ 2 files changed, 24 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index ead0a1a..6c72172 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, Date, Timestamp} +import java.time.{Instant, LocalDate} import scala.collection.mutable.ArrayBuilder @@ -26,9 +27,11 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -175,7 +178,14 @@ abstract class JdbcDialect extends Serializable with Logging{ def compileValue(value: Any): Any = value match { case stringValue: String => s"'${escapeSql(stringValue)}'" case timestampValue: Timestamp => "'" + timestampValue + "'" +case timestampValue: Instant => + val timestampFormatter = TimestampFormatter.getFractionFormatter( +DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)) + s"'${timestampFormatter.format(timestampValue)}'" case dateValue: Date => "'" + dateValue + "'" +case dateValue: LocalDate => + val dateFormatter = DateFormatter(DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)) + s"'${dateFormatter.format(dateValue)}'" case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") case _ => value } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index b81824d..70f5508 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal import java.sql.{Date, DriverManager, SQLException, Timestamp} +import java.time.{Instant, LocalDate} import java.util.{Calendar, GregorianCalendar, Properties}
[spark] branch master updated (0dedf24 -> 829f118)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0dedf24 [SPARK-34154][YARN] Extend LocalityPlacementStrategySuite's test with a timeout add 829f118 [SPARK-33867][SQL] Instant and LocalDate values aren't handled when generating SQL queries No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 10 ++ .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 14 ++ 2 files changed, 24 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9d83d62 -> 0dedf24)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9d83d62 [SPARK-34193][CORE] TorrentBroadcast block manager decommissioning race fix add 0dedf24 [SPARK-34154][YARN] Extend LocalityPlacementStrategySuite's test with a timeout No new revisions were added by this update. Summary of changes: .../spark/deploy/yarn/LocalityPlacementStrategySuite.scala | 14 +++--- 1 file changed, 11 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-34154][YARN] Extend LocalityPlacementStrategySuite's test with a timeout
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 6f1bd9b [SPARK-34154][YARN] Extend LocalityPlacementStrategySuite's test with a timeout 6f1bd9b is described below commit 6f1bd9b4b92ee6ff083ef9fe1b4b02f7c9f7cf0e Author: “attilapiros” AuthorDate: Thu Jan 28 08:04:25 2021 +0900 [SPARK-34154][YARN] Extend LocalityPlacementStrategySuite's test with a timeout ### What changes were proposed in this pull request? This PR extends the `handle large number of containers and tasks (SPARK-18750)` test with a time limit and in case of timeout it saves the stack trace of the running thread to provide extra information about the reason why it got stuck. ### Why are the changes needed? This is a flaky test which sometime runs for hours without stopping. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I checked it with a temporary code change: by adding a `Thread.sleep` to `LocalityPreferredContainerPlacementStrategy#expectedHostToContainerCount`. The stack trace showed the correct method: ``` [info] LocalityPlacementStrategySuite: [info] - handle large number of containers and tasks (SPARK-18750) *** FAILED *** (30 seconds, 26 milliseconds) [info] Failed with an exception or a timeout at thread join: [info] [info] java.lang.RuntimeException: Timeout at waiting for thread to stop (its stack trace is added to the exception) [info] at java.lang.Thread.sleep(Native Method) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.$anonfun$expectedHostToContainerCount$1(LocalityPreferredContainerPlacementStrategy.scala:198) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy$$Lambda$281/381161906.apply(Unknown Source) [info] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) [info] at scala.collection.TraversableLike$$Lambda$16/322836221.apply(Unknown Source) [info] at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) [info] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468) [info] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468) [info] at scala.collection.TraversableLike.map(TraversableLike.scala:238) [info] at scala.collection.TraversableLike.map$(TraversableLike.scala:231) [info] at scala.collection.AbstractTraversable.map(Traversable.scala:108) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.expectedHostToContainerCount(LocalityPreferredContainerPlacementStrategy.scala:188) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.localityOfRequestedContainers(LocalityPreferredContainerPlacementStrategy.scala:112) [info] at org.apache.spark.deploy.yarn.LocalityPlacementStrategySuite.org$apache$spark$deploy$yarn$LocalityPlacementStrategySuite$$runTest(LocalityPlacementStrategySuite.scala:94) [info] at org.apache.spark.deploy.yarn.LocalityPlacementStrategySuite$$anon$1.run(LocalityPlacementStrategySuite.scala:40) [info] at java.lang.Thread.run(Thread.java:748) (LocalityPlacementStrategySuite.scala:61) ... ``` Closes #31363 from attilapiros/SPARK-34154. Authored-by: “attilapiros” Signed-off-by: HyukjinKwon (cherry picked from commit 0dedf24cd0359b36f655adbf22bd5048b7288ba5) Signed-off-by: HyukjinKwon --- .../spark/deploy/yarn/LocalityPlacementStrategySuite.scala | 14 +++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala index d239750..465de48 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala @@ -33,7 +33,7 @@ class LocalityPlacementStrategySuite extends SparkFunSuite { test("handle large number of containers and tasks (SPARK-18750)") { // Run the test in a thread with a small stack size, since the original issue // surfaced as a StackOverflowError. -var error: Throwable = null +@volatile var error: Throwable = null val runnable = new Runnable() { override def run(): Unit = try { @@ -44,13 +44,21 @@ class LocalityPlacementStrategySuite extends SparkFunSuite { } val thread = new Thread
[spark] branch branch-3.0 updated: [SPARK-34154][YARN] Extend LocalityPlacementStrategySuite's test with a timeout
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 58dab6e [SPARK-34154][YARN] Extend LocalityPlacementStrategySuite's test with a timeout 58dab6e is described below commit 58dab6e7d20221248d2c6db199b70a7713d6323e Author: “attilapiros” AuthorDate: Thu Jan 28 08:04:25 2021 +0900 [SPARK-34154][YARN] Extend LocalityPlacementStrategySuite's test with a timeout ### What changes were proposed in this pull request? This PR extends the `handle large number of containers and tasks (SPARK-18750)` test with a time limit and in case of timeout it saves the stack trace of the running thread to provide extra information about the reason why it got stuck. ### Why are the changes needed? This is a flaky test which sometime runs for hours without stopping. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I checked it with a temporary code change: by adding a `Thread.sleep` to `LocalityPreferredContainerPlacementStrategy#expectedHostToContainerCount`. The stack trace showed the correct method: ``` [info] LocalityPlacementStrategySuite: [info] - handle large number of containers and tasks (SPARK-18750) *** FAILED *** (30 seconds, 26 milliseconds) [info] Failed with an exception or a timeout at thread join: [info] [info] java.lang.RuntimeException: Timeout at waiting for thread to stop (its stack trace is added to the exception) [info] at java.lang.Thread.sleep(Native Method) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.$anonfun$expectedHostToContainerCount$1(LocalityPreferredContainerPlacementStrategy.scala:198) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy$$Lambda$281/381161906.apply(Unknown Source) [info] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) [info] at scala.collection.TraversableLike$$Lambda$16/322836221.apply(Unknown Source) [info] at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) [info] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468) [info] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468) [info] at scala.collection.TraversableLike.map(TraversableLike.scala:238) [info] at scala.collection.TraversableLike.map$(TraversableLike.scala:231) [info] at scala.collection.AbstractTraversable.map(Traversable.scala:108) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.expectedHostToContainerCount(LocalityPreferredContainerPlacementStrategy.scala:188) [info] at org.apache.spark.deploy.yarn.LocalityPreferredContainerPlacementStrategy.localityOfRequestedContainers(LocalityPreferredContainerPlacementStrategy.scala:112) [info] at org.apache.spark.deploy.yarn.LocalityPlacementStrategySuite.org$apache$spark$deploy$yarn$LocalityPlacementStrategySuite$$runTest(LocalityPlacementStrategySuite.scala:94) [info] at org.apache.spark.deploy.yarn.LocalityPlacementStrategySuite$$anon$1.run(LocalityPlacementStrategySuite.scala:40) [info] at java.lang.Thread.run(Thread.java:748) (LocalityPlacementStrategySuite.scala:61) ... ``` Closes #31363 from attilapiros/SPARK-34154. Authored-by: “attilapiros” Signed-off-by: HyukjinKwon (cherry picked from commit 0dedf24cd0359b36f655adbf22bd5048b7288ba5) Signed-off-by: HyukjinKwon --- .../spark/deploy/yarn/LocalityPlacementStrategySuite.scala | 14 +++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala index cf2c384..14f1ec2 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala @@ -32,7 +32,7 @@ class LocalityPlacementStrategySuite extends SparkFunSuite { test("handle large number of containers and tasks (SPARK-18750)") { // Run the test in a thread with a small stack size, since the original issue // surfaced as a StackOverflowError. -var error: Throwable = null +@volatile var error: Throwable = null val runnable = new Runnable() { override def run(): Unit = try { @@ -43,13 +43,21 @@ class LocalityPlacementStrategySuite extends SparkFunSuite { } val thread = new Thread
[spark] branch branch-3.1 updated: [SPARK-34193][CORE] TorrentBroadcast block manager decommissioning race fix
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 46dce63 [SPARK-34193][CORE] TorrentBroadcast block manager decommissioning race fix 46dce63 is described below commit 46dce636e9a03ee15a9f622a6136e832174ac90b Author: Holden Karau AuthorDate: Thu Jan 28 06:15:35 2021 +0900 [SPARK-34193][CORE] TorrentBroadcast block manager decommissioning race fix ### What changes were proposed in this pull request? Allow broadcast blocks to be put during decommissioning since migrations don't apply to them and they may be stored as part of job exec. ### Why are the changes needed? Potential race condition. ### Does this PR introduce _any_ user-facing change? Removal of race condition. ### How was this patch tested? New unit test. Closes #31298 from holdenk/SPARK-34193-torrentbroadcast-blockmanager-decommissioning-potential-race-condition. Authored-by: Holden Karau Signed-off-by: HyukjinKwon (cherry picked from commit 9d83d62f142ba89518194f176bb81adadc28951b) Signed-off-by: HyukjinKwon --- .../scala/org/apache/spark/storage/BlockManager.scala | 17 +++-- .../org/apache/spark/storage/BlockManagerSuite.scala| 12 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a5b8d5d..4c09e16 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -258,6 +258,15 @@ private[spark] class BlockManager( @inline final private def isDecommissioning() = { decommissioner.isDefined } + + @inline final private def checkShouldStore(blockId: BlockId) = { +// Don't reject broadcast blocks since they may be stored during task exec and +// don't need to be migrated. +if (isDecommissioning() && !blockId.isBroadcast) { +throw new BlockSavedOnDecommissionedBlockManagerException(blockId) +} + } + // This is a lazy val so someone can migrating RDDs even if they don't have a MigratableResolver // for shuffles. Used in BlockManagerDecommissioner & block puts. private[storage] lazy val migratableResolver: MigratableResolver = { @@ -670,9 +679,7 @@ private[spark] class BlockManager( level: StorageLevel, classTag: ClassTag[_]): StreamCallbackWithID = { -if (isDecommissioning()) { - throw new BlockSavedOnDecommissionedBlockManagerException(blockId) -} +checkShouldStore(blockId) if (blockId.isShuffle) { logDebug(s"Putting shuffle block ${blockId}") @@ -1321,9 +1328,7 @@ private[spark] class BlockManager( require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") -if (isDecommissioning()) { - throw new BlockSavedOnDecommissionedBlockManagerException(blockId) -} +checkShouldStore(blockId) val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 44b6f1b..09678c7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -2038,6 +2038,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(sortedBlocks.sameElements(decomManager.shufflesToMigrate.asScala.map(_._1))) } + test("SPARK-34193: Potential race condition during decommissioning with TorrentBroadcast") { +// Validate that we allow putting of broadcast blocks during decommissioning +val exec1 = "exec1" + +val store = makeBlockManager(1000, exec1) +master.decommissionBlockManagers(Seq(exec1)) +val a = new Array[Byte](1) +// Put a broadcast block, no exception +val broadcast0BlockId = BroadcastBlockId(0) +store.putSingle(broadcast0BlockId, a, StorageLevel.DISK_ONLY) + } + class MockBlockTransferService( val maxFailures: Int, override val hostName: String = "MockBlockTransferServiceHost") extends BlockTransferService { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (f1bc37e -> 9d83d62)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f1bc37e [SPARK-34221][WEBUI] Ensure if a stage fails in the UI page, the corresponding error message can be displayed correctly add 9d83d62 [SPARK-34193][CORE] TorrentBroadcast block manager decommissioning race fix No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/storage/BlockManager.scala | 17 +++-- .../org/apache/spark/storage/BlockManagerSuite.scala| 12 2 files changed, 23 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-34221][WEBUI] Ensure if a stage fails in the UI page, the corresponding error message can be displayed correctly
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new e85c881 [SPARK-34221][WEBUI] Ensure if a stage fails in the UI page, the corresponding error message can be displayed correctly e85c881 is described below commit e85c881c21eafa26fa7421f7abb8421082b472bb Author: neko AuthorDate: Wed Jan 27 10:01:57 2021 -0800 [SPARK-34221][WEBUI] Ensure if a stage fails in the UI page, the corresponding error message can be displayed correctly ### What changes were proposed in this pull request? Ensure that if a stage fails in the UI page, the corresponding error message can be displayed correctly. ### Why are the changes needed? errormessage is not handled properly in JavaScript. If the 'at' is not exist, the error message on the page will be blank. I made wochanges, 1. `msg.indexOf("at")` => `msg.indexOf("\n")` ![image](https://user-images.githubusercontent.com/52202080/105663531-7362cb00-5f0d-11eb-87fd-008ed65c33ca.png) As shows ablove, truncated at the 'at' position will result in a strange abstract of the error message. If there is a `\n` worit is more reasonable to truncate at the '\n' position. 2. If the `\n` does not exist check whether the msg is more than 100. If true, then truncate the display to avoid too long error message ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual test shows as belows, just a js change: before modified: ![problem](https://user-images.githubusercontent.com/52202080/105712153-661cff00-5f54-11eb-80bf-e33c323c4e55.png) after modified ![after mdified](https://user-images.githubusercontent.com/52202080/105712180-6c12e000-5f54-11eb-8998-ff8bc8a0a503.png) Closes #31314 from akiyamaneko/error_message_display_empty. Authored-by: neko Signed-off-by: Dongjoon Hyun (cherry picked from commit f1bc37e6244e959f1d950c450010dd6024b6ba5f) Signed-off-by: Dongjoon Hyun --- core/src/main/resources/org/apache/spark/ui/static/stagepage.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index 67d6d74..400b70f 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -861,7 +861,8 @@ $(document).ready(function () { if (typeof msg === 'undefined') { return ""; } else { -var formHead = msg.substring(0, msg.indexOf("at")); +var indexOfLineSeparator = msg.indexOf("\n"); +var formHead = indexOfLineSeparator > 0 ? msg.substring(0, indexOfLineSeparator) : (msg.length > 100 ? msg.substring(0, 100) : msg); var form = "+details"; var formMsg = "" + row.errorMessage + ""; return formHead + form + formMsg; - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-34221][WEBUI] Ensure if a stage fails in the UI page, the corresponding error message can be displayed correctly
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 609af85 [SPARK-34221][WEBUI] Ensure if a stage fails in the UI page, the corresponding error message can be displayed correctly 609af85 is described below commit 609af857ccde65fd2afa06d061e2caa7a9b850e3 Author: neko AuthorDate: Wed Jan 27 10:01:57 2021 -0800 [SPARK-34221][WEBUI] Ensure if a stage fails in the UI page, the corresponding error message can be displayed correctly ### What changes were proposed in this pull request? Ensure that if a stage fails in the UI page, the corresponding error message can be displayed correctly. ### Why are the changes needed? errormessage is not handled properly in JavaScript. If the 'at' is not exist, the error message on the page will be blank. I made wochanges, 1. `msg.indexOf("at")` => `msg.indexOf("\n")` ![image](https://user-images.githubusercontent.com/52202080/105663531-7362cb00-5f0d-11eb-87fd-008ed65c33ca.png) As shows ablove, truncated at the 'at' position will result in a strange abstract of the error message. If there is a `\n` worit is more reasonable to truncate at the '\n' position. 2. If the `\n` does not exist check whether the msg is more than 100. If true, then truncate the display to avoid too long error message ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual test shows as belows, just a js change: before modified: ![problem](https://user-images.githubusercontent.com/52202080/105712153-661cff00-5f54-11eb-80bf-e33c323c4e55.png) after modified ![after mdified](https://user-images.githubusercontent.com/52202080/105712180-6c12e000-5f54-11eb-8998-ff8bc8a0a503.png) Closes #31314 from akiyamaneko/error_message_display_empty. Authored-by: neko Signed-off-by: Dongjoon Hyun (cherry picked from commit f1bc37e6244e959f1d950c450010dd6024b6ba5f) Signed-off-by: Dongjoon Hyun --- core/src/main/resources/org/apache/spark/ui/static/stagepage.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js index ebb79f5..91bf274 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js @@ -986,7 +986,8 @@ $(document).ready(function () { if (typeof msg === 'undefined') { return ""; } else { -var formHead = msg.substring(0, msg.indexOf("at")); +var indexOfLineSeparator = msg.indexOf("\n"); +var formHead = indexOfLineSeparator > 0 ? msg.substring(0, indexOfLineSeparator) : (msg.length > 100 ? msg.substring(0, 100) : msg); var form = "+details"; var formMsg = "" + row.errorMessage + ""; return formHead + form + formMsg; - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1318be7 -> f1bc37e)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1318be7 [SPARK-34267][SQL] Remove `refreshTable()` from `SessionState` add f1bc37e [SPARK-34221][WEBUI] Ensure if a stage fails in the UI page, the corresponding error message can be displayed correctly No new revisions were added by this update. Summary of changes: core/src/main/resources/org/apache/spark/ui/static/stagepage.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-34267][SQL] Remove `refreshTable()` from `SessionState`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1318be7 [SPARK-34267][SQL] Remove `refreshTable()` from `SessionState` 1318be7 is described below commit 1318be7ee94a40289b3d584261c9d38d66398fec Author: Max Gekk AuthorDate: Wed Jan 27 09:43:59 2021 -0800 [SPARK-34267][SQL] Remove `refreshTable()` from `SessionState` ### What changes were proposed in this pull request? Remove `SessionState.refreshTable()` and modify the tests where the method is used. ### Why are the changes needed? There are already 2 methods with the same name in: - `SessionCatalog` - `CatalogImpl` One more method in `SessionState` does not give any benefits. By removing it, we can improve code maintenance. ### Does this PR introduce _any_ user-facing change? Should not because `SessionState` is an internal class. ### How was this patch tested? By running the modified test suites: ``` $ build/sbt -Phive -Phive-thriftserver "test:testOnly *MetastoreDataSourcesSuite" $ build/sbt -Phive -Phive-thriftserver "test:testOnly *HiveOrcQuerySuite" $ build/sbt -Phive -Phive-thriftserver "test:testOnly *HiveParquetMetastoreSuite" ``` Closes #31366 from MaxGekk/remove-refreshTable-from-SessionState. Authored-by: Max Gekk Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/internal/SessionState.scala | 4 .../spark/sql/hive/HiveParquetMetastoreSuite.scala | 6 +++--- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 16 .../apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala| 1 - 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 60ca06d..258c9bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -119,10 +119,6 @@ private[sql] class SessionState( // -- def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan) - - def refreshTable(tableName: String): Unit = { -catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) - } } private[sql] object SessionState { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala index 0bdaa0c..0351754 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala @@ -473,7 +473,7 @@ class HiveParquetMetastoreSuite extends ParquetPartitioningTest { checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. -sessionState.refreshTable("test_insert_parquet") +spark.catalog.refreshTable("test_insert_parquet") assert(getCachedDataSourceTable(tableIdentifier) === null) sql( """ @@ -486,7 +486,7 @@ class HiveParquetMetastoreSuite extends ParquetPartitioningTest { sql("select * from test_insert_parquet"), sql("select a, b from jt").collect()) // Invalidate the cache. -sessionState.refreshTable("test_insert_parquet") +spark.catalog.refreshTable("test_insert_parquet") assert(getCachedDataSourceTable(tableIdentifier) === null) // Create a partitioned table. @@ -536,7 +536,7 @@ class HiveParquetMetastoreSuite extends ParquetPartitioningTest { |select b, '2015-04-02', a FROM jt """.stripMargin).collect()) -sessionState.refreshTable("test_parquet_partitioned_cache_test") +spark.catalog.refreshTable("test_parquet_partitioned_cache_test") assert(getCachedDataSourceTable(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index ecbb104..ba44192 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -256,13 +256,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) // Discard the cached relation. -sessionState.refreshTable("jsonTable") +
[spark] branch branch-2.4 updated: [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 6bc088f [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal 6bc088f is described below commit 6bc088fd0499a28201dc6c2a25836d02d769e14d Author: Wenchen Fan AuthorDate: Wed Jan 27 09:34:31 2021 -0800 [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal This is a followup of https://github.com/apache/spark/pull/31319 . When reading parquet int/long as decimal, the behavior should be the same as reading int/long and then cast to the decimal type. This PR changes to the expected behavior. When reading parquet binary as decimal, we don't really know how to interpret the binary (it may from a string), and should fail. This PR changes to the expected behavior. To make the behavior more sane. Yes, but it's a followup. updated test Closes #31357 from cloud-fan/bug. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun (cherry picked from commit 2dbb7d5af8f498e49488cd8876bd3d0b083723b7) Signed-off-by: Dongjoon Hyun --- .../datasources/parquet/ParquetRowConverter.scala | 48 +++--- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 58 -- 2 files changed, 60 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 0d22fe5..5878bb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -210,19 +210,6 @@ private[parquet] class ParquetRowConverter( } /** - * Get a precision and a scale to interpret parquet decimal values. - * 1. If there is a decimal metadata, we read decimal values with the given precision and scale. - * 2. If there is no metadata, we read decimal values with scale `0` because it's plain integers - *when it is written into INT32/INT64/BINARY/FIXED_LEN_BYTE_ARRAY types. - */ - private def getPrecisionAndScale(parquetType: Type, t: DecimalType): (Int, Int) = { -val metadata = parquetType.asPrimitiveType().getDecimalMetadata -val precision = if (metadata == null) t.precision else metadata.getPrecision() -val scale = if (metadata == null) 0 else metadata.getScale() -(precision, scale) - } - - /** * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type * `catalystType`. Converted values are handled by `updater`. */ @@ -249,20 +236,43 @@ private[parquet] class ParquetRowConverter( // For INT32 backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 => -val (precision, scale) = getPrecisionAndScale(parquetType, t) -new ParquetIntDictionaryAwareDecimalConverter(precision, scale, updater) +val metadata = parquetType.asPrimitiveType().getDecimalMetadata +if (metadata == null) { + // If the column is a plain INT32, we should pick the precision that can host the largest + // INT32 value. + new ParquetIntDictionaryAwareDecimalConverter( +DecimalType.IntDecimal.precision, 0, updater) +} else { + new ParquetIntDictionaryAwareDecimalConverter( +metadata.getPrecision, metadata.getScale, updater) +} // For INT64 backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 => -val (precision, scale) = getPrecisionAndScale(parquetType, t) -new ParquetLongDictionaryAwareDecimalConverter(precision, scale, updater) +val metadata = parquetType.asPrimitiveType().getDecimalMetadata +if (metadata == null) { + // If the column is a plain INT64, we should pick the precision that can host the largest + // INT64 value. + new ParquetLongDictionaryAwareDecimalConverter( +DecimalType.LongDecimal.precision, 0, updater) +} else { + new ParquetLongDictionaryAwareDecimalConverter( +metadata.getPrecision, metadata.getScale, updater) +} // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY || parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY => -val (precision, scale) = getPrecisionAndScale(parquetType, t) -new ParquetBina
[spark] branch branch-3.0 updated: [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 323679f [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal 323679f is described below commit 323679f75250d279110f9586bc7758a12b0b68bd Author: Wenchen Fan AuthorDate: Wed Jan 27 09:34:31 2021 -0800 [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/31319 . When reading parquet int/long as decimal, the behavior should be the same as reading int/long and then cast to the decimal type. This PR changes to the expected behavior. When reading parquet binary as decimal, we don't really know how to interpret the binary (it may from a string), and should fail. This PR changes to the expected behavior. ### Why are the changes needed? To make the behavior more sane. ### Does this PR introduce _any_ user-facing change? Yes, but it's a followup. ### How was this patch tested? updated test Closes #31357 from cloud-fan/bug. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun (cherry picked from commit 2dbb7d5af8f498e49488cd8876bd3d0b083723b7) Signed-off-by: Dongjoon Hyun --- .../datasources/parquet/ParquetRowConverter.scala | 48 +++--- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 58 -- 2 files changed, 60 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 5005d41..151bb13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -234,19 +234,6 @@ private[parquet] class ParquetRowConverter( } /** - * Get a precision and a scale to interpret parquet decimal values. - * 1. If there is a decimal metadata, we read decimal values with the given precision and scale. - * 2. If there is no metadata, we read decimal values with scale `0` because it's plain integers - *when it is written into INT32/INT64/BINARY/FIXED_LEN_BYTE_ARRAY types. - */ - private def getPrecisionAndScale(parquetType: Type, t: DecimalType): (Int, Int) = { -val metadata = parquetType.asPrimitiveType().getDecimalMetadata -val precision = if (metadata == null) t.precision else metadata.getPrecision() -val scale = if (metadata == null) 0 else metadata.getScale() -(precision, scale) - } - - /** * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type * `catalystType`. Converted values are handled by `updater`. */ @@ -273,20 +260,43 @@ private[parquet] class ParquetRowConverter( // For INT32 backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 => -val (precision, scale) = getPrecisionAndScale(parquetType, t) -new ParquetIntDictionaryAwareDecimalConverter(precision, scale, updater) +val metadata = parquetType.asPrimitiveType().getDecimalMetadata +if (metadata == null) { + // If the column is a plain INT32, we should pick the precision that can host the largest + // INT32 value. + new ParquetIntDictionaryAwareDecimalConverter( +DecimalType.IntDecimal.precision, 0, updater) +} else { + new ParquetIntDictionaryAwareDecimalConverter( +metadata.getPrecision, metadata.getScale, updater) +} // For INT64 backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 => -val (precision, scale) = getPrecisionAndScale(parquetType, t) -new ParquetLongDictionaryAwareDecimalConverter(precision, scale, updater) +val metadata = parquetType.asPrimitiveType().getDecimalMetadata +if (metadata == null) { + // If the column is a plain INT64, we should pick the precision that can host the largest + // INT64 value. + new ParquetLongDictionaryAwareDecimalConverter( +DecimalType.LongDecimal.precision, 0, updater) +} else { + new ParquetLongDictionaryAwareDecimalConverter( +metadata.getPrecision, metadata.getScale, updater) +} // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTyp
[spark] branch branch-3.1 updated: [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 5a2eb64 [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal 5a2eb64 is described below commit 5a2eb64a0d30051e6ce19ba62a89192099ca9b67 Author: Wenchen Fan AuthorDate: Wed Jan 27 09:34:31 2021 -0800 [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/31319 . When reading parquet int/long as decimal, the behavior should be the same as reading int/long and then cast to the decimal type. This PR changes to the expected behavior. When reading parquet binary as decimal, we don't really know how to interpret the binary (it may from a string), and should fail. This PR changes to the expected behavior. ### Why are the changes needed? To make the behavior more sane. ### Does this PR introduce _any_ user-facing change? Yes, but it's a followup. ### How was this patch tested? updated test Closes #31357 from cloud-fan/bug. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun (cherry picked from commit 2dbb7d5af8f498e49488cd8876bd3d0b083723b7) Signed-off-by: Dongjoon Hyun --- .../datasources/parquet/ParquetRowConverter.scala | 48 +++--- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 58 -- 2 files changed, 60 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 32db964..dca12ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -243,19 +243,6 @@ private[parquet] class ParquetRowConverter( } /** - * Get a precision and a scale to interpret parquet decimal values. - * 1. If there is a decimal metadata, we read decimal values with the given precision and scale. - * 2. If there is no metadata, we read decimal values with scale `0` because it's plain integers - *when it is written into INT32/INT64/BINARY/FIXED_LEN_BYTE_ARRAY types. - */ - private def getPrecisionAndScale(parquetType: Type, t: DecimalType): (Int, Int) = { -val metadata = parquetType.asPrimitiveType().getDecimalMetadata -val precision = if (metadata == null) t.precision else metadata.getPrecision() -val scale = if (metadata == null) 0 else metadata.getScale() -(precision, scale) - } - - /** * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type * `catalystType`. Converted values are handled by `updater`. */ @@ -282,20 +269,43 @@ private[parquet] class ParquetRowConverter( // For INT32 backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 => -val (precision, scale) = getPrecisionAndScale(parquetType, t) -new ParquetIntDictionaryAwareDecimalConverter(precision, scale, updater) +val metadata = parquetType.asPrimitiveType().getDecimalMetadata +if (metadata == null) { + // If the column is a plain INT32, we should pick the precision that can host the largest + // INT32 value. + new ParquetIntDictionaryAwareDecimalConverter( +DecimalType.IntDecimal.precision, 0, updater) +} else { + new ParquetIntDictionaryAwareDecimalConverter( +metadata.getPrecision, metadata.getScale, updater) +} // For INT64 backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 => -val (precision, scale) = getPrecisionAndScale(parquetType, t) -new ParquetLongDictionaryAwareDecimalConverter(precision, scale, updater) +val metadata = parquetType.asPrimitiveType().getDecimalMetadata +if (metadata == null) { + // If the column is a plain INT64, we should pick the precision that can host the largest + // INT64 value. + new ParquetLongDictionaryAwareDecimalConverter( +DecimalType.LongDecimal.precision, 0, updater) +} else { + new ParquetLongDictionaryAwareDecimalConverter( +metadata.getPrecision, metadata.getScale, updater) +} // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTyp
[spark] branch master updated (5718d64 -> 2dbb7d5)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 5718d64 [SPARK-34083][SQL] Using TPCDS original definitions for char/varchar columns add 2dbb7d5 [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet non-decimal fields as decimal No new revisions were added by this update. Summary of changes: .../datasources/parquet/ParquetRowConverter.scala | 48 +++--- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 58 -- 2 files changed, 60 insertions(+), 46 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1217c8b -> 5718d64)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1217c8b Revert "[SPARK-31168][SPARK-33913][BUILD] Upgrade Scala to 2.12.13 and Kafka to 2.7.0" add 5718d64 [SPARK-34083][SQL] Using TPCDS original definitions for char/varchar columns No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../approved-plans-modified/q27.sf100/explain.txt | 4 +- .../approved-plans-modified/q27/explain.txt| 4 +- .../approved-plans-modified/q43.sf100/explain.txt | 8 +- .../q43.sf100/simplified.txt | 2 +- .../approved-plans-modified/q43/explain.txt| 8 +- .../approved-plans-modified/q43/simplified.txt | 2 +- .../approved-plans-modified/q59.sf100/explain.txt | 16 +- .../q59.sf100/simplified.txt | 4 +- .../approved-plans-modified/q59/explain.txt| 16 +- .../approved-plans-modified/q59/simplified.txt | 4 +- .../approved-plans-v1_4/q2.sf100/explain.txt | 14 +- .../approved-plans-v1_4/q2.sf100/simplified.txt| 4 +- .../approved-plans-v1_4/q2/explain.txt | 14 +- .../approved-plans-v1_4/q2/simplified.txt | 4 +- .../approved-plans-v1_4/q43.sf100/explain.txt | 8 +- .../approved-plans-v1_4/q43.sf100/simplified.txt | 2 +- .../approved-plans-v1_4/q43/explain.txt| 8 +- .../approved-plans-v1_4/q43/simplified.txt | 2 +- .../approved-plans-v1_4/q56.sf100/explain.txt | 4 +- .../approved-plans-v1_4/q56/explain.txt| 4 +- .../approved-plans-v1_4/q59.sf100/explain.txt | 14 +- .../approved-plans-v1_4/q59.sf100/simplified.txt | 4 +- .../approved-plans-v1_4/q59/explain.txt| 14 +- .../approved-plans-v1_4/q59/simplified.txt | 4 +- .../approved-plans-v1_4/q60.sf100/explain.txt | 4 +- .../approved-plans-v1_4/q60/explain.txt| 4 +- .../approved-plans-v1_4/q66.sf100/explain.txt | 4 +- .../approved-plans-v1_4/q66/explain.txt| 4 +- .../approved-plans-v2_7/q18a.sf100/explain.txt | 4 +- .../approved-plans-v2_7/q18a/explain.txt | 4 +- .../approved-plans-v2_7/q27a.sf100/explain.txt | 4 +- .../approved-plans-v2_7/q27a/explain.txt | 4 +- .../scala/org/apache/spark/sql/TPCDSBase.scala | 558 +++-- 34 files changed, 518 insertions(+), 242 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b2c104b -> 1217c8b)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b2c104b [SPARK-34231][AVRO][TEST] Make proper use of resource file within AvroSuite test case add 1217c8b Revert "[SPARK-31168][SPARK-33913][BUILD] Upgrade Scala to 2.12.13 and Kafka to 2.7.0" No new revisions were added by this update. Summary of changes: dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 6 +++--- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 6 +++--- docs/_config.yml| 2 +- .../test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 4 ++-- pom.xml | 6 +++--- project/SparkBuild.scala| 2 +- 6 files changed, 13 insertions(+), 13 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org