(spark) branch master updated: [SPARK-47324][SQL] Add missing timestamp conversion for JDBC nested types
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 72a95bcad7f1 [SPARK-47324][SQL] Add missing timestamp conversion for JDBC nested types 72a95bcad7f1 is described below commit 72a95bcad7f1906c97fb0971ed6338374ec3009d Author: Kent Yao AuthorDate: Mon Mar 11 09:34:12 2024 +0900 [SPARK-47324][SQL] Add missing timestamp conversion for JDBC nested types ### What changes were proposed in this pull request? [SPARK-44280](https://issues.apache.org/jira/browse/SPARK-44280) added a new API convertJavaTimestampToTimestamp which is called only for plain timestamps. This PR makes it work for timestamps in arrays ### Why are the changes needed? data consistency/correctness ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45435 from yaooqinn/SPARK-47324. Authored-by: Kent Yao Signed-off-by: Hyukjin Kwon --- .../spark/sql/jdbc/PostgresIntegrationSuite.scala | 17 +--- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 46 +- 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 2d1c0314f27b..04e31679f386 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -23,8 +23,7 @@ import java.text.SimpleDateFormat import java.time.{LocalDateTime, ZoneOffset} import java.util.Properties -import org.apache.spark.sql.Column -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.types.{ArrayType, DecimalType, FloatType, ShortType} import org.apache.spark.tags.DockerTest @@ -149,9 +148,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { |('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate() conn.prepareStatement("CREATE TABLE infinity_timestamp" + - "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP);").executeUpdate() -conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column)" + - " VALUES ('infinity'), ('-infinity');").executeUpdate() + "(id SERIAL PRIMARY KEY, timestamp_column TIMESTAMP, timestamp_array TIMESTAMP[])") + .executeUpdate() +conn.prepareStatement("INSERT INTO infinity_timestamp (timestamp_column, timestamp_array)" + + " VALUES ('infinity', ARRAY[TIMESTAMP 'infinity']), " + +"('-infinity', ARRAY[TIMESTAMP '-infinity'])") + .executeUpdate() conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate() conn.prepareStatement("create table custom_type(type_array not_null_text[]," + @@ -447,10 +449,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(row.length == 2) val infinity = row(0).getAs[Timestamp]("timestamp_column") val negativeInfinity = row(1).getAs[Timestamp]("timestamp_column") +val infinitySeq = row(0).getAs[scala.collection.Seq[Timestamp]]("timestamp_array") +val negativeInfinitySeq = row(1).getAs[scala.collection.Seq[Timestamp]]("timestamp_array") val minTimeStamp = LocalDateTime.of(1, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC) val maxTimestamp = LocalDateTime.of(, 12, 31, 23, 59, 59).toEpochSecond(ZoneOffset.UTC) - assert(infinity.getTime == maxTimestamp) assert(negativeInfinity.getTime == minTimeStamp) +assert(infinitySeq.head.getTime == maxTimestamp) +assert(negativeInfinitySeq.head.getTime == minTimeStamp) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index b5e78ba32cd5..a7bbb832a839 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.{Connection, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException} +import java.math.{BigDecimal => JBigDecimal} +import java.sql.{Connection, Date, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException,
(spark) branch master updated: [SPARK-47331][SS] Serialization using case classes/primitives/POJO based on SQL encoder for Arbitrary State API v2
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new afbebfbadc4b [SPARK-47331][SS] Serialization using case classes/primitives/POJO based on SQL encoder for Arbitrary State API v2 afbebfbadc4b is described below commit afbebfbadc4b5e927df7c568a8afb08fc4407f58 Author: jingz-db AuthorDate: Mon Mar 11 09:20:44 2024 +0900 [SPARK-47331][SS] Serialization using case classes/primitives/POJO based on SQL encoder for Arbitrary State API v2 ### What changes were proposed in this pull request? In the new operator for arbitrary state-v2, we cannot rely on the session/encoder being available since the initialization for the various state instances happens on the executors. Hence, for the state serialization, we propose to let user explicitly pass in encoder for state variable and serialize primitives/case classes/POJO with SQL encoder. Leveraging SQL encoder can speed up the serialization. ### Why are the changes needed? These changes are needed for providing a dedicated serializer for state-v2. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939 ### Does this PR introduce _any_ user-facing change? Users will need to specify the SQL encoder for their state variable: `def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T]` `def getListState[T](stateName: String, valEncoder: Encoder[T]): ListState[T]` For primitive type, Encoder is something as: `Encoders.scalaLong`; for case class, `Encoders.product[CaseClass]`; for POJO, `Encoders.bean(classOf[POJOClass])` ### How was this patch tested? Unit tests for primitives, case classes, POJO separately in `ValueStateSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #45447 from jingz-db/sql-encoder-state-v2. Authored-by: jingz-db Signed-off-by: Jungtaek Lim --- .../sql/streaming/StatefulProcessorHandle.scala| 7 +- .../sql/execution/streaming/ListStateImpl.scala| 8 +- .../streaming/StateTypesEncoderUtils.scala | 41 +--- .../streaming/StatefulProcessorHandleImpl.scala| 9 +- .../sql/execution/streaming/ValueStateImpl.scala | 9 +- .../execution/streaming/state/POJOTestClass.java | 78 ++ .../streaming/state/ValueStateSuite.scala | 117 - .../streaming/TransformWithListStateSuite.scala| 7 +- .../sql/streaming/TransformWithStateSuite.scala| 11 +- 9 files changed, 250 insertions(+), 37 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala index 5d3390f80f6d..86bf1e85f90c 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming import java.io.Serializable import org.apache.spark.annotation.{Evolving, Experimental} +import org.apache.spark.sql.Encoder /** * Represents the operation handle provided to the stateful processor used in the @@ -33,20 +34,22 @@ private[sql] trait StatefulProcessorHandle extends Serializable { * The user must ensure to call this function only within the `init()` method of the * StatefulProcessor. * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable * @tparam T - type of state variable * @return - instance of ValueState of type T that can be used to store state persistently */ - def getValueState[T](stateName: String): ValueState[T] + def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T] /** * Creates new or returns existing list state associated with stateName. * The ListState persists values of type T. * * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable * @tparam T - type of state variable * @return - instance of ListState of type T that can be used to store state persistently */ - def getListState[T](stateName: String): ListState[T] + def getListState[T](stateName: String, valEncoder: Encoder[T]): ListState[T] /** Function to return queryInfo for currently running task */ def getQueryInfo(): QueryInfo diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala
(spark) branch master updated (f6df78154bac -> 5ac560c76e60)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from f6df78154bac [SPARK-47035][SS][CONNECT][FOLLOWUP] Additional protobuf change to client side listener add 5ac560c76e60 [MINOR][DOCS][SQL] Fix doc comment for coalescePartitions.parallelismFirst No new revisions were added by this update. Summary of changes: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47035][SS][CONNECT][FOLLOWUP] Additional protobuf change to client side listener
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f6df78154bac [SPARK-47035][SS][CONNECT][FOLLOWUP] Additional protobuf change to client side listener f6df78154bac is described below commit f6df78154bac826bd51d2aad185ce02a7efd36b6 Author: Wei Liu AuthorDate: Mon Mar 11 09:07:35 2024 +0900 [SPARK-47035][SS][CONNECT][FOLLOWUP] Additional protobuf change to client side listener ### What changes were proposed in this pull request? Followup of previous protocol change https://github.com/apache/spark/pull/45091. Add the request proto `Command` and response proto message to `ExecutePlanResponse` ### Why are the changes needed? Continuation of client side listener for spark connect ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Will be tested in subsequent PR, the proto change itself doesn't do any harm ### Was this patch authored or co-authored using generative AI tooling? No Closes #45444 from WweiL/SPARK-47035-protocol-followup. Authored-by: Wei Liu Signed-off-by: Hyukjin Kwon --- .../src/main/protobuf/spark/connect/base.proto | 3 + .../src/main/protobuf/spark/connect/commands.proto | 2 + python/pyspark/sql/connect/proto/base_pb2.py | 204 ++--- python/pyspark/sql/connect/proto/base_pb2.pyi | 13 ++ python/pyspark/sql/connect/proto/commands_pb2.py | 180 +- python/pyspark/sql/connect/proto/commands_pb2.pyi | 40 +++- 6 files changed, 249 insertions(+), 193 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index f24ca0a8fc3b..cb9dbe62c193 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -357,6 +357,9 @@ message ExecutePlanResponse { // Response for commands on the streaming query manager. StreamingQueryManagerCommandResult streaming_query_manager_command_result = 11; +// Response for commands on the client side streaming query listener. +StreamingQueryListenerEventsResult streaming_query_listener_events_result = 16; + // Response type informing if the stream is complete in reattachable execution. ResultComplete result_complete = 14; diff --git a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto index e845d5f29061..76ac106b1de8 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto @@ -42,6 +42,7 @@ message Command { GetResourcesCommand get_resources_command = 8; StreamingQueryManagerCommand streaming_query_manager_command = 9; CommonInlineUserDefinedTableFunction register_table_function = 10; +StreamingQueryListenerBusCommand streaming_query_listener_bus_command = 11; // This field is used to mark extensions to the protocol. When plugins generate arbitrary // Commands they can add them here. During the planning the correct resolution is done. @@ -456,6 +457,7 @@ message StreamingQueryListenerEvent { message StreamingQueryListenerEventsResult { repeated StreamingQueryListenerEvent events = 1; + optional bool listener_bus_listener_added = 2; } // Command to get the output of 'SparkContext.resources' diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py index 8326ce511d56..1941900ae69d 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.py +++ b/python/pyspark/sql/connect/proto/base_pb2.py @@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17 [...] +
(spark) branch master updated: [SPARK-47334][SQL] Make `withColumnRenamed` reuse the implementation of `withColumnsRenamed`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a4603f134fb0 [SPARK-47334][SQL] Make `withColumnRenamed` reuse the implementation of `withColumnsRenamed` a4603f134fb0 is described below commit a4603f134fb0d496109d4c90889191c506e82691 Author: Ruifeng Zheng AuthorDate: Mon Mar 11 09:06:20 2024 +0900 [SPARK-47334][SQL] Make `withColumnRenamed` reuse the implementation of `withColumnsRenamed` ### What changes were proposed in this pull request? Make `withColumnRenamed` reuse the implementation of `withColumnsRenamed` ### Why are the changes needed? to avoid any divergence in the future ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #45450 from zhengruifeng/with_rename_consistent. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .../main/scala/org/apache/spark/sql/Dataset.scala | 27 -- 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f3bf6119659d..f0c9f7ae53fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2876,23 +2876,8 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ - def withColumnRenamed(existingName: String, newName: String): DataFrame = { -val resolver = sparkSession.sessionState.analyzer.resolver -val output = queryExecution.analyzed.output -val shouldRename = output.exists(f => resolver(f.name, existingName)) -if (shouldRename) { - val columns = output.map { col => -if (resolver(col.name, existingName)) { - Column(col).as(newName) -} else { - Column(col) -} - } - select(columns : _*) -} else { - toDF() -} - } + def withColumnRenamed(existingName: String, newName: String): DataFrame = +withColumnsRenamed(Seq(existingName), Seq(newName)) /** * (Scala-specific) @@ -2921,18 +2906,24 @@ class Dataset[T] private[sql]( val resolver = sparkSession.sessionState.analyzer.resolver val output: Seq[NamedExpression] = queryExecution.analyzed.output +var shouldRename = false val projectList = colNames.zip(newColNames).foldLeft(output) { case (attrs, (existingName, newName)) => attrs.map(attr => if (resolver(attr.name, existingName)) { +shouldRename = true Alias(attr, newName)() } else { attr } ) } -withPlan(Project(projectList, logicalPlan)) +if (shouldRename) { + withPlan(Project(projectList, logicalPlan)) +} else { + toDF() +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47325][INFRA] Use the latest `buf-setup-action` in github workflow
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f6a00f2dda93 [SPARK-47325][INFRA] Use the latest `buf-setup-action` in github workflow f6a00f2dda93 is described below commit f6a00f2dda9379a3b91297a556953d6f4c0f84cd Author: panbingkun AuthorDate: Mon Mar 11 08:53:36 2024 +0900 [SPARK-47325][INFRA] Use the latest `buf-setup-action` in github workflow ### What changes were proposed in this pull request? The pr aims to `unpin` specific version `buf-setup-action` in github workflow building. ### Why are the changes needed? - [The last](https://github.com/apache/spark/pull/45205) `pin` to a `specific version` was due to a bug in the version `v1.29.0-1`. The latest version has been upgraded to `v1.30.0`, and testing has found that this version is ok. - This latest version `v1.30.0` has a `change` regarding the upgrade from `node16` to `node20`. https://github.com/bufbuild/buf-setup-action/compare/v1.29.0...v1.30.0 https://github.com/apache/spark/assets/15246973/8277ac62-dc36-4237-8dc0-1522f5f248b8;> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45433 from panbingkun/test_buf-setup-action_1_30_0. Authored-by: panbingkun Signed-off-by: Hyukjin Kwon --- .github/workflows/build_and_test.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index a24309e137eb..4f2be1c04f98 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -574,9 +574,8 @@ jobs: git -c user.name='Apache Spark Test Account' -c user.email='sparktest...@gmail.com' merge --no-commit --progress --squash FETCH_HEAD git -c user.name='Apache Spark Test Account' -c user.email='sparktest...@gmail.com' commit -m "Merged commit" --allow-empty - name: Install Buf - uses: bufbuild/buf-setup-action@v1.29.0 + uses: bufbuild/buf-setup-action@v1 with: -version: 1.29.0 github_token: ${{ secrets.GITHUB_TOKEN }} - name: Protocol Buffers Linter uses: bufbuild/buf-lint-action@v1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47200][SS] Make Foreach batch sink user function error handling backward compatible
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new edb970b8a73e [SPARK-47200][SS] Make Foreach batch sink user function error handling backward compatible edb970b8a73e is described below commit edb970b8a73e5b1e08b01f9370dadb05a3e231e3 Author: micheal-o AuthorDate: Mon Mar 11 08:44:30 2024 +0900 [SPARK-47200][SS] Make Foreach batch sink user function error handling backward compatible ### What changes were proposed in this pull request? I checked in a previous PR (https://github.com/apache/spark/pull/45299), that handles and classifies exceptions thrown in user provided functions for foreach batch sink. This change is to make it backward compatible in order not to break current users, since users may be depending on getting the user code error from the `StreamingQueryException.cause` instead of `StreamingQueryException.cause.cause` ### Why are the changes needed? To prevent breaking existing usage pattern. ### Does this PR introduce _any_ user-facing change? Yes, better error message with error class for ForeachBatchSink user function failures. ### How was this patch tested? updated existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #45449 from micheal-o/ForeachBatchExBackwardCompat. Authored-by: micheal-o Signed-off-by: Jungtaek Lim --- .../src/main/resources/error/error-classes.json| 2 +- docs/sql-error-conditions.md | 2 +- .../sql/execution/streaming/StreamExecution.scala | 29 +++--- .../streaming/sources/ForeachBatchSink.scala | 14 --- .../sql/errors/QueryExecutionErrorsSuite.scala | 2 +- .../streaming/sources/ForeachBatchSinkSuite.scala | 17 +++-- 6 files changed, 43 insertions(+), 23 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 57746d6dbf1e..9717ff2ed49c 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1297,7 +1297,7 @@ }, "FOREACH_BATCH_USER_FUNCTION_ERROR" : { "message" : [ - "An error occurred in the user provided function in foreach batch sink." + "An error occurred in the user provided function in foreach batch sink. Reason: " ], "sqlState" : "39000" }, diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 7be01f8cb513..0be75cde968f 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -778,7 +778,7 @@ The operation `` is not allowed on the ``: `` [SQLSTATE: 39000](sql-error-conditions-sqlstates.html#class-39-external-routine-invocation-exception) -An error occurred in the user provided function in foreach batch sink. +An error occurred in the user provided function in foreach batch sink. Reason: `` ### FOUND_MULTIPLE_DATA_SOURCES diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 859fce8b1154..50a73082a8c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table} import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream} import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write} import org.apache.spark.sql.execution.command.StreamingExplainCommand +import org.apache.spark.sql.execution.streaming.sources.ForeachBatchUserFuncException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend import org.apache.spark.sql.streaming._ @@ -279,6 +280,7 @@ abstract class StreamExecution( * `start()` method returns. */ private def runStream(): Unit = { +var errorClassOpt: Option[String] = None try { sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString, interruptOnCancel = true) @@ -330,9 +332,17 @@ abstract class StreamExecution( getLatestExecutionContext().updateStatusMessage("Stopped") case e: Throwable => val message = if (e.getMessage == null) "" else e.getMessage +val cause = if (e.isInstanceOf[ForeachBatchUserFuncException]) { + // We want to maintain the current way users get the causing exception + // from the StreamingQueryException. Hence the ForeachBatch
(spark) branch master updated: [SPARK-47218][SQL] XML: Changed SchemaOfXml to fail on DROPMALFORMED mode
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ae518ecb7068 [SPARK-47218][SQL] XML: Changed SchemaOfXml to fail on DROPMALFORMED mode ae518ecb7068 is described below commit ae518ecb7068347f70d947255eb54fdfd5ec8d48 Author: Yousof Hosny AuthorDate: Mon Mar 11 08:40:19 2024 +0900 [SPARK-47218][SQL] XML: Changed SchemaOfXml to fail on DROPMALFORMED mode ### What changes were proposed in this pull request? Changed schema_of_xml should fail with an error on DROPMALFORMED mode to avoid creating schemas out of invalid XML. ### Why are the changes needed? DROPMALFORMED parse mode imply silently dropping the malformed record. But SchemaOfXml is expected to return a schema and may not have a valid schema to return for a malformed record. So DROPMALFORMED cannot be supported.. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45379 from yhosny/xml-parsemode-error. Authored-by: Yousof Hosny Signed-off-by: Hyukjin Kwon --- .../sql/catalyst/expressions/xmlExpressions.scala | 8 +++-- .../sql/execution/datasources/xml/XmlSuite.scala | 36 ++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala index 800515ca84b5..8cc1c3a89745 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode} +import org.apache.spark.sql.catalyst.util.{ArrayData, DropMalformedMode, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode} import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} import org.apache.spark.sql.internal.SQLConf @@ -189,8 +189,12 @@ case class SchemaOfXml( private lazy val xmlFactory = xmlOptions.buildXmlFactory() @transient - private lazy val xmlInferSchema = + private lazy val xmlInferSchema = { +if (xmlOptions.parseMode == DropMalformedMode) { + throw QueryCompilationErrors.parseModeUnsupportedError("schema_of_xml", xmlOptions.parseMode) +} new XmlInferSchema(xmlOptions, caseSensitive = SQLConf.get.caseSensitiveAnalysis) + } @transient private lazy val xml = child.eval().asInstanceOf[UTF8String] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala index 2194f76e7da6..d7dc96184dab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala @@ -1302,6 +1302,42 @@ class XmlSuite assert(result.select("decoded._corrupt_record").head().getString(0).nonEmpty) } + test("schema_of_xml with DROPMALFORMED parse error test") { +val e = intercept[AnalysisException] { + spark.sql(s"""SELECT schema_of_xml('1', map('mode', 'DROPMALFORMED'))""") + .collect() +} +checkError( + exception = e, + errorClass = "_LEGACY_ERROR_TEMP_1099", + parameters = Map( +"funcName" -> "schema_of_xml", +"mode" -> "DROPMALFORMED", +"permissiveMode" -> "PERMISSIVE", +"failFastMode" -> FailFastMode.name) +) + } + + test("schema_of_xml with FAILFAST parse error test") { +val e = intercept[SparkException] { + spark.sql(s"""SELECT schema_of_xml('1', map('mode', 'FAILFAST'))""") + .collect() +} +checkError( + exception = e, + errorClass = "_LEGACY_ERROR_TEMP_2165", + parameters = Map( +"failFastMode" -> FailFastMode.name) +) + } + + test("schema_of_xml with PERMISSIVE check no error test") { + val s = spark.sql(s"""SELECT schema_of_xml('1', map('mode', 'PERMISSIVE'))""") +.collect() + assert(s.head.get(0) ==
(spark) branch master updated: [MINOR][SQL][TEST] Moving tests to related suites
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 264e00ee12bb [MINOR][SQL][TEST] Moving tests to related suites 264e00ee12bb is described below commit 264e00ee12bbbd822e52fa8ce79692c60f531495 Author: Mihailo Milosevic AuthorDate: Mon Mar 11 00:25:55 2024 +0500 [MINOR][SQL][TEST] Moving tests to related suites ### What changes were proposed in this pull request? Tests from `QueryCompilationErrorsSuite` were moved to `DDLSuite` and `JDBCTableCatalogSuite`. ### Why are the changes needed? We should move tests to related test suites in order to improve testing. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Corresponding Suites succeed. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45439 from mihailom-db/SPARK-47326. Authored-by: Mihailo Milosevic Signed-off-by: Max Gekk --- .../sql/errors/QueryCompilationErrorsSuite.scala | 74 -- .../spark/sql/execution/command/DDLSuite.scala | 17 + .../v2/jdbc/JDBCTableCatalogSuite.scala| 56 3 files changed, 73 insertions(+), 74 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala index c9198c86c720..4574d3328d48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala @@ -25,13 +25,11 @@ import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test} import org.apache.spark.sql.catalyst.expressions.{Coalesce, Literal, UnsafeRow} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter -import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.expressions.SparkUserDefinedFunction import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils case class StringLongClass(a: String, b: Long) @@ -817,78 +815,6 @@ class QueryCompilationErrorsSuite parameters = Map("extraction" -> "\"array(test)\"")) } - test("CREATE NAMESPACE with LOCATION for JDBC catalog should throw an error") { -withTempDir { tempDir => - val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" - Utils.classForName("org.h2.Driver") - withSQLConf( -"spark.sql.catalog.h2" -> classOf[JDBCTableCatalog].getName, -"spark.sql.catalog.h2.url" -> url, -"spark.sql.catalog.h2.driver" -> "org.h2.Driver") { -checkError( - exception = intercept[AnalysisException] { -sql("CREATE NAMESPACE h2.test_namespace LOCATION './samplepath'") - }, - errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND", - sqlState = "0A000", - parameters = Map("cmd" -> toSQLStmt("CREATE NAMESPACE ... LOCATION ..."))) - } -} - } - - test("ALTER NAMESPACE with property other than COMMENT " + -"for JDBC catalog should throw an exception") { -withTempDir { tempDir => - val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" - Utils.classForName("org.h2.Driver") - withSQLConf( -"spark.sql.catalog.h2" -> classOf[JDBCTableCatalog].getName, -"spark.sql.catalog.h2.url" -> url, -"spark.sql.catalog.h2.driver" -> "org.h2.Driver") { -val namespace = "h2.test_namespace" -withNamespace(namespace) { - sql(s"CREATE NAMESPACE $namespace") - checkError( -exception = intercept[AnalysisException] { - sql(s"ALTER NAMESPACE h2.test_namespace SET LOCATION '/tmp/loc_test_2'") -}, -errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND_WITH_PROPERTY", -sqlState = "0A000", -parameters = Map( - "cmd" -> toSQLStmt("SET NAMESPACE"), - "property" -> toSQLConf("location"))) - - checkError( -exception = intercept[AnalysisException] { - sql(s"ALTER NAMESPACE h2.test_namespace SET PROPERTIES('a'='b')") -}, -errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND_WITH_PROPERTY", -sqlState = "0A000", -parameters = Map( - "cmd" -> toSQLStmt("SET NAMESPACE"), - "property" -> toSQLConf("a"))) -} - } -} - } - -