This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e5a5921968c [SPARK-44220][SQL] Move StringConcat to sql/api e5a5921968c is described below commit e5a5921968c84601ce005a7785bdd08c41a2d862 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Thu Jun 29 11:52:06 2023 +0800 [SPARK-44220][SQL] Move StringConcat to sql/api ### What changes were proposed in this pull request? Move StringConcat to `sql/api` module. ### Why are the changes needed? StringConcat is widely used in data types. As we plan to move entire data type family to sql/api, we should move StringConcat. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT Closes #41764 from amaliujia/move_out_string_concat. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Kent Yao <y...@apache.org> --- common/unsafe/pom.xml | 7 +++ .../spark/unsafe/array/ByteArrayMethods.java | 7 +-- .../apache/spark/unsafe/array/ByteArrayUtils.java | 27 +++++++++ sql/api/pom.xml | 5 ++ .../spark/sql/catalyst/util/StringUtils.scala | 65 ++++++++++++++++++++++ sql/catalyst/pom.xml | 5 ++ .../spark/sql/catalyst/util/StringUtils.scala | 47 ---------------- .../org/apache/spark/sql/types/ArrayType.scala | 2 +- .../org/apache/spark/sql/types/DataType.scala | 2 +- .../scala/org/apache/spark/sql/types/MapType.scala | 2 +- .../org/apache/spark/sql/types/StructField.scala | 2 +- .../org/apache/spark/sql/types/StructType.scala | 8 +-- .../org/apache/spark/sql/types/DataTypeSuite.scala | 2 +- .../apache/spark/sql/execution/debug/package.scala | 2 +- 14 files changed, 120 insertions(+), 63 deletions(-) diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index a61f00084eb..bdf82d9285e 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -38,6 +38,13 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-common-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> </dependency> <!-- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java index 500bc9de325..f81c0609276 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java @@ -42,12 +42,7 @@ public class ByteArrayMethods { return numBytes + ((8 - remainder) & 0x7); } - // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat smaller. - // Be conservative and lower the cap a little. - // Refer to "http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l229" - // This value is word rounded. Use this value if the allocated byte arrays are used to store other - // types rather than bytes. - public static final int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15; + public static final int MAX_ROUNDED_ARRAY_LENGTH = ByteArrayUtils.MAX_ROUNDED_ARRAY_LENGTH; private static final boolean unaligned = Platform.unaligned(); /** diff --git a/common/utils/src/main/scala/org/apache/spark/unsafe/array/ByteArrayUtils.java b/common/utils/src/main/scala/org/apache/spark/unsafe/array/ByteArrayUtils.java new file mode 100644 index 00000000000..f7b9a664c80 --- /dev/null +++ b/common/utils/src/main/scala/org/apache/spark/unsafe/array/ByteArrayUtils.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.array; + +public class ByteArrayUtils { + // Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat smaller. + // Be conservative and lower the cap a little. + // Refer to "http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l229" + // This value is word rounded. Use this value if the allocated byte arrays are used to store other + // types rather than bytes. + public static final int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15; +} diff --git a/sql/api/pom.xml b/sql/api/pom.xml index 9d100b1130e..9b7917e0343 100644 --- a/sql/api/pom.xml +++ b/sql/api/pom.xml @@ -35,6 +35,11 @@ </properties> <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-common-utils_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala new file mode 100644 index 00000000000..10ac988da2e --- /dev/null +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.unsafe.array.ByteArrayUtils + +/** + * Concatenation of sequence of strings to final string with cheap append method + * and one memory allocation for the final string. Can also bound the final size of + * the string. + */ +class StringConcat(val maxLength: Int = ByteArrayUtils.MAX_ROUNDED_ARRAY_LENGTH) { + protected val strings = new ArrayBuffer[String] + protected var length: Int = 0 + + def atLimit: Boolean = length >= maxLength + + /** + * Appends a string and accumulates its length to allocate a string buffer for all + * appended strings once in the toString method. Returns true if the string still + * has room for further appends before it hits its max limit. + */ + def append(s: String): Unit = { + if (s != null) { + val sLen = s.length + if (!atLimit) { + val available = maxLength - length + val stringToAppend = if (available >= sLen) s else s.substring(0, available) + strings.append(stringToAppend) + } + + // Keeps the total length of appended strings. Note that we need to cap the length at + // `ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH`; otherwise, we will overflow + // length causing StringIndexOutOfBoundsException in the substring call above. + length = Math.min(length.toLong + sLen, ByteArrayUtils.MAX_ROUNDED_ARRAY_LENGTH).toInt + } + } + + /** + * The method allocates memory for all appended strings, writes them to the memory and + * returns concatenated string. + */ + override def toString: String = { + val finalLength = if (atLimit) maxLength else length + val result = new java.lang.StringBuilder(finalLength) + strings.foreach(result.append) + result.toString + } +} diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index 9057cc8a20e..9dbc8d625d0 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -49,6 +49,11 @@ <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql-api_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index da912c19393..ccf6e5b57ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -19,15 +19,12 @@ package org.apache.spark.sql.catalyst.util import java.util.regex.{Pattern, PatternSyntaxException} -import scala.collection.mutable.ArrayBuffer - import org.apache.commons.text.similarity.LevenshteinDistance import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types.UTF8String object StringUtils extends Logging { @@ -131,50 +128,6 @@ object StringUtils extends Logging { funcNames.toSeq } - /** - * Concatenation of sequence of strings to final string with cheap append method - * and one memory allocation for the final string. Can also bound the final size of - * the string. - */ - class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { - protected val strings = new ArrayBuffer[String] - protected var length: Int = 0 - - def atLimit: Boolean = length >= maxLength - - /** - * Appends a string and accumulates its length to allocate a string buffer for all - * appended strings once in the toString method. Returns true if the string still - * has room for further appends before it hits its max limit. - */ - def append(s: String): Unit = { - if (s != null) { - val sLen = s.length - if (!atLimit) { - val available = maxLength - length - val stringToAppend = if (available >= sLen) s else s.substring(0, available) - strings.append(stringToAppend) - } - - // Keeps the total length of appended strings. Note that we need to cap the length at - // `ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH`; otherwise, we will overflow - // length causing StringIndexOutOfBoundsException in the substring call above. - length = Math.min(length.toLong + sLen, ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH).toInt - } - } - - /** - * The method allocates memory for all appended strings, writes them to the memory and - * returns concatenated string. - */ - override def toString: String = { - val finalLength = if (atLimit) maxLength else length - val result = new java.lang.StringBuilder(finalLength) - strings.foreach(result.append) - result.toString - } - } - /** * A string concatenator for plan strings. Uses length from a configured value, and * prints a warning the first time a plan is truncated. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala index d6f5c3bdf43..a5226870097 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.types import org.json4s.JsonDSL._ import org.apache.spark.annotation.Stable -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringConcat /** * Companion object for ArrayType. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 893a41f3e39..4b701dc2438 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.DataTypeJsonUtils.{DataTypeJsonDeserializer, DataTypeJsonSerializer} -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringConcat import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.DayTimeIntervalType._ import org.apache.spark.sql.types.YearMonthIntervalType._ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala index 2e5c7f731dc..ce0c76dbe4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala @@ -21,7 +21,7 @@ import org.json4s.JsonAST.JValue import org.json4s.JsonDSL._ import org.apache.spark.annotation.Stable -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringConcat /** * The data type for Maps. Keys in a map are not allowed to have `null` values. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala index df480bebcb0..e03b2e8ab3c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala @@ -23,7 +23,7 @@ import org.json4s.JsonDSL._ import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeeded} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringConcat import org.apache.spark.sql.util.SchemaUtils /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index a7234e4173b..dad8252e5ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser} import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.catalyst.types.DataTypeUtils -import org.apache.spark.sql.catalyst.util.{truncatedString, StringUtils} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringConcat +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.collection.Utils @@ -387,7 +387,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru def treeString: String = treeString(Int.MaxValue) def treeString(maxDepth: Int): String = { - val stringConcat = new StringUtils.StringConcat() + val stringConcat = new StringConcat() stringConcat.append("root\n") val prefix = " |" val depth = if (maxDepth > 0) maxDepth else Int.MaxValue @@ -431,7 +431,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def catalogString: String = { // in catalogString, we should not truncate - val stringConcat = new StringUtils.StringConcat() + val stringConcat = new StringConcat() val len = fields.length stringConcat.append("struct<") var i = 0 diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala index 4001b546566..0e78f875ad7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.types.DataTypeUtils -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringConcat import org.apache.spark.sql.types.DataTypeTestUtils.{dayTimeIntervalTypes, yearMonthIntervalTypes} class DataTypeSuite extends SparkFunSuite { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 7d3aa7440c4..6f796e6ca94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeFormatter, CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringConcat import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org