[spark] branch master updated: [MINOR][SQL][TESTS] Use SystemUtils.isJavaVersionAtMost for java version check

2023-07-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 44ffe9089bc [MINOR][SQL][TESTS] Use SystemUtils.isJavaVersionAtMost 
for java version check
44ffe9089bc is described below

commit 44ffe9089bce724164fd05ebc26d9e9fdb7f0480
Author: Hyukjin Kwon 
AuthorDate: Mon Jul 3 13:01:17 2023 +0900

[MINOR][SQL][TESTS] Use SystemUtils.isJavaVersionAtMost for java version 
check

### What changes were proposed in this pull request?

This PR proposes to use Apache Common API for proper Java version checking 
instead of manual check via `System.getProperty("java.version")`.

### Why are the changes needed?

To have the consistent codebase. This is the only place left.

### Does this PR introduce _any_ user-facing change?

No, with/without this change virtually same.

### How was this patch tested?

Manually tested.

Closes #41822 from HyukjinKwon/minor-java-string.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 .../org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
index 8f6099e96ef..e050a6d1789 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
@@ -412,7 +412,7 @@ class TimestampFormatterSuite extends 
DatetimeFormatterSuite {
 assert(formatter.format(date(1970, 1, 3)) == "03")
 assert(formatter.format(date(1970, 4, 9)) == "99")
 
-if (System.getProperty("java.version").split("\\D+")(0).toInt < 9) {
+if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8)) {
   // https://bugs.openjdk.java.net/browse/JDK-8079628
   intercept[SparkUpgradeException] {
 formatter.format(date(1970, 4, 10))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44185][SQL] Fix inconsistent path qualifying between catalog and data operations

2023-07-02 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8483191dfbb [SPARK-44185][SQL] Fix inconsistent path qualifying 
between catalog and data operations
8483191dfbb is described below

commit 8483191dfbb2842c10d5e8d2409eafed6dfc4abd
Author: Kent Yao 
AuthorDate: Mon Jul 3 09:45:20 2023 +0800

[SPARK-44185][SQL] Fix inconsistent path qualifying between catalog and 
data operations

### What changes were proposed in this pull request?

This PR adds a new analysis rule to qualify the table path before execution 
to fix inconsistent path qualifying between catalog and data operations

### Why are the changes needed?

To fix bugs like

- CREATE TABLE statement with relative LOCATION w/o table schema will infer 
schema from files from the directory relative to the current working directory 
and store the directory relative to the warehouse path.
- CTAS statement with relative LOCATION cannot assert empty root path as it 
checks the wrong path it will finally use.
- DataframeWriter does not qualify the path before checking

### Does this PR introduce _any_ user-facing change?

yes.
 For table statements with a LOCATION clause or 'path' option, when you 
specify a relative path, spark now uses warehouse path to qualify both the 
catalog and data operation. Before this patch, it used warehouse path for 
catalog mostly and CWD for data.

### How was this patch tested?

new unit tests.

Closes #41733 from yaooqinn/SPARK-44185.

Authored-by: Kent Yao 
Signed-off-by: Kent Yao 
---
 .../sql/catalyst/catalog/SessionCatalog.scala  |  2 +-
 .../spark/sql/execution/datasources/ddl.scala  |  9 +++
 .../spark/sql/execution/datasources/rules.scala| 22 ++
 .../sql/internal/BaseSessionStateBuilder.scala |  1 +
 .../execution/datasources/TableLocationSuite.scala | 79 ++
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |  1 +
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 17 -
 .../spark/sql/hive/execution/HiveSerDeSuite.scala  |  2 +-
 8 files changed, 128 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index cd4b4cfaf6b..d18736deda4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -416,7 +416,7 @@ class SessionCatalog(
 }
   }
 
-  private def makeQualifiedTablePath(locationUri: URI, database: String): URI 
= {
+  def makeQualifiedTablePath(locationUri: URI, database: String): URI = {
 if (locationUri.isAbsolute) {
   locationUri
 } else if (new Path(locationUri).isAbsolute) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index dc5894e42e7..42d6769525c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -59,6 +59,15 @@ case class CreateTable(
   override protected def withNewChildrenInternal(
   newChildren: IndexedSeq[LogicalPlan]): LogicalPlan =
 copy(query = if (query.isDefined) Some(newChildren.head) else None)
+
+  /**
+   * Identifies the underlying table's location is qualified or absent.
+   *
+   * @return true if the location is absolute or absent, false otherwise.
+   */
+  def locationQualifiedOrAbsent: Boolean = {
+tableDesc.storage.locationUri.map(_.isAbsolute).getOrElse(true)
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index af98bb13c73..0b4df18eb7c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -553,3 +553,25 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
 }
   }
 }
+
+/**
+ * A rule to qualify relative locations with warehouse path before it breaks 
in catalog
+ * operation and data reading and writing.
+ *
+ * @param catalog the session catalog
+ */
+case class QualifyLocationWithWarehouse(catalog: SessionCatalog) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+case c @ CreateTableV1(tableDesc, _, _) if !c.locationQualifiedOrAbsent =>
+  val qualifiedTableIdent = 

[spark] branch master updated: [SPARK-40731][DSTREAM] Make `streaming` pass on Java 21

2023-07-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4569bf63659 [SPARK-40731][DSTREAM] Make `streaming` pass on Java 21
4569bf63659 is described below

commit 4569bf63659f98fa509ae0f7906b8a18bf32f0e9
Author: yangjie01 
AuthorDate: Mon Jul 3 10:08:34 2023 +0900

[SPARK-40731][DSTREAM] Make `streaming` pass on Java 21

### What changes were proposed in this pull request?
This pr use use a 0 size `ByteBuffer` instead of `mock[ByteBuffer]` for 
Java 17+ in `BatchedWriteAheadLogSuite`  due to mockito 4 can't mock/spy 
`sealed class` now.

### Why are the changes needed?
Make `streaming` pass on Java 21

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass GitHub Actions
- manual test

```
java -version
openjdk version "21-ea" 2023-09-19
OpenJDK Runtime Environment Zulu21+65-CA (build 21-ea+26)
OpenJDK 64-Bit Server VM Zulu21+65-CA (build 21-ea+26, mixed mode, sharing)
```

```
build/sbt "streaming/test"
```

**Before**

```
[info] - BatchedWriteAheadLog - failures in wrappedLog get bubbled up *** 
FAILED *** (3 milliseconds)
[info]   Expected exception org.apache.spark.SparkException to be thrown, 
but org.mockito.exceptions.base.MockitoException was thrown 
(WriteAheadLogSuite.scala:478)
[info]   org.scalatest.exceptions.TestFailedException:
...
[info]   Cause: org.mockito.exceptions.base.MockitoException: Cannot 
mock/spy class java.nio.ByteBuffer
[info] Mockito cannot mock/spy because :
[info]  - sealed class
[info]   at 
org.scalatestplus.mockito.MockitoSugar.mock(MockitoSugar.scala:73)
[info]   at 
org.scalatestplus.mockito.MockitoSugar.mock$(MockitoSugar.scala:72)
[info]   at 
org.apache.spark.streaming.util.BatchedWriteAheadLogSuite.mock(WriteAheadLogSuite.scala:420)
[info]   at 
org.apache.spark.streaming.util.BatchedWriteAheadLogSuite.$anonfun$new$38(WriteAheadLogSuite.scala:479)
...
[info] - BatchedWriteAheadLog - name log with the highest timestamp of 
aggregated entries (6 milliseconds)
[info] - BatchedWriteAheadLog - shutdown properly *** FAILED *** (1 
millisecond)
[info]   Expected exception java.lang.IllegalStateException to be thrown, 
but org.mockito.exceptions.base.MockitoException was thrown 
(WriteAheadLogSuite.scala:549)
[info]   org.scalatest.exceptions.TestFailedException:
...
[info]   Cause: org.mockito.exceptions.base.MockitoException: Cannot 
mock/spy class java.nio.ByteBuffer
[info] Mockito cannot mock/spy because :
[info]  - sealed class
[info]   at 
org.scalatestplus.mockito.MockitoSugar.mock(MockitoSugar.scala:73)
[info]   at 
org.scalatestplus.mockito.MockitoSugar.mock$(MockitoSugar.scala:72)
[info]   at 
org.apache.spark.streaming.util.BatchedWriteAheadLogSuite.mock(WriteAheadLogSuite.scala:420)
[info]   at 
org.apache.spark.streaming.util.BatchedWriteAheadLogSuite.$anonfun$new$46(WriteAheadLogSuite.scala:549)
...
[info] Run completed in 4 minutes, 41 seconds.
[info] Total number of tests run: 340
[info] Suites: completed 40, aborted 0
[info] Tests: succeeded 338, failed 2, canceled 0, ignored 1, pending 0
[info] *** 2 TESTS FAILED ***
[error] Failed: Total 440, Failed 2, Errors 0, Passed 438, Ignored 1
[error] Failed tests:
[error] org.apache.spark.streaming.util.BatchedWriteAheadLogSuite
```

**After**

```
[info] Run completed in 4 minutes, 34 seconds.
[info] Total number of tests run: 340
[info] Suites: completed 40, aborted 0
[info] Tests: succeeded 340, failed 0, canceled 0, ignored 1, pending 0
[info] All tests passed.
[info] Passed: Total 440, Failed 0, Errors 0, Passed 440, Ignored 1
```

Closes #41815 from LuciferYang/SPARK-40731.

Authored-by: yangjie01 
Signed-off-by: Hyukjin Kwon 
---
 .../spark/streaming/util/WriteAheadLogSuite.scala  | 18 --
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 60e04403937..4d23230e2ea 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -28,6 +28,7 @@ import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.implicitConversions
 
+import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.mockito.ArgumentCaptor
@@ -476,7 

[spark] branch master updated: [SPARK-44195][R] Add JobTag APIs to SparkR SparkContext

2023-07-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8daa2553c92 [SPARK-44195][R] Add JobTag APIs to SparkR SparkContext
8daa2553c92 is described below

commit 8daa2553c92bf7de4a214c7bb4127141e06cc1d2
Author: Juliusz Sompolski 
AuthorDate: Mon Jul 3 09:53:26 2023 +0900

[SPARK-44195][R] Add JobTag APIs to SparkR SparkContext

### What changes were proposed in this pull request?

Add APIs from https://github.com/apache/spark/pull/41440 to SparkR:
* addJobTag(tag)
* removeJobTag(tag)
* getJobTags()
* clearJobTags()
* cancelJobsWithTag()
* setInterruptOnCancel(tag)

Additionally:
* fix a bug in removeJobTag when the last tag is removed (should be left 
with empty tags, not an empty string tag)
* fix comments to cancelJobsWithTag
* add a few defensive reinforcements against an empty string tag as a 
result of missing property / removing last tag.

### Why are the changes needed?

SparkR parity.

### Does this PR introduce _any_ user-facing change?

Yes, introduce the APIs introduced in Scala in 
https://github.com/apache/spark/pull/41440 to SparkR

### How was this patch tested?

Added test.

Closes #41742 from juliuszsompolski/SPARK-44195.

Authored-by: Juliusz Sompolski 
Signed-off-by: Hyukjin Kwon 
---
 R/pkg/NAMESPACE| 10 ++-
 R/pkg/R/sparkR.R   | 98 ++
 R/pkg/pkgdown/_pkgdown_template.yml|  6 ++
 R/pkg/tests/fulltests/test_context.R   | 16 
 .../main/scala/org/apache/spark/SparkContext.scala |  9 +-
 .../apache/spark/api/java/JavaSparkContext.scala   |  2 +-
 .../main/scala/org/apache/spark/api/r/RUtils.scala |  4 +
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  6 +-
 .../apache/spark/status/AppStatusListener.scala|  1 +
 .../org/apache/spark/JobCancellationSuite.scala|  4 +-
 10 files changed, 147 insertions(+), 9 deletions(-)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index bb05e99a9d8..78068f20c57 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -77,13 +77,19 @@ exportMethods("glm",
   "spark.lm",
   "spark.fmRegressor")
 
-# Job group lifecycle management methods
+# Job group and job tag lifecycle management methods
 export("setJobGroup",
"clearJobGroup",
"cancelJobGroup",
"setJobDescription",
+   "setInterruptOnCancel",
"setLocalProperty",
-   "getLocalProperty")
+   "getLocalProperty",
+   "addJobTag",
+   "removeJobTag",
+   "getJobTags",
+   "clearJobTags",
+   "cancelJobsWithTag")
 
 # Export Utility methods
 export("setLogLevel")
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index e2ab5747177..1cbaad6d178 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -542,6 +542,104 @@ cancelJobGroup <- function(groupId) {
   invisible(callJMethod(sc, "cancelJobGroup", groupId))
 }
 
+#' Set the behavior of job cancellation from jobs started in this thread.
+#'
+#' @param interruptOnCancel If true, then job cancellation will result in 
`Thread.interrupt()`
+#' being called on the job's executor threads. This is useful to help ensure 
that the tasks
+#' are actually stopped in a timely manner, but is off by default due to 
HDFS-1208, where HDFS
+#' may respond to Thread.interrupt() by marking nodes as dead.
+#' @rdname setInterruptOnCancel
+#' @name setInterruptOnCancel
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' setInterruptOnCancel(true)
+#'}
+#' @note cancelJobGroup since 3.5.0
+setInterruptOnCancel <- function(interruptOnCancel) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "setInterruptOnCancel", interruptOnCancel))
+}
+
+#' Add a tag to be assigned to all the jobs started by this thread.
+#'
+#' @param tag The tag to be added. Cannot contain ',' (comma) character.
+#' @rdname addJobTAg
+#' @name addJobTag
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' addJobTag("myJobTag")
+#'}
+#' @note addJobTag since 3.5.0
+addJobTag <- function(tag) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "addJobTag", tag))
+}
+
+#' Remove a tag previously added to be assigned to all the jobs started by 
this thread.
+#' Noop if such a tag was not added earlier.
+#'
+#' @param tag The tag to be removed. Cannot contain ',' (comma) character.
+#' @rdname removeJobTAg
+#' @name removeJobTag
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' removeJobTag("myJobTag")
+#'}
+#' @note cancelJobGroup since 3.5.0
+removeJobTag <- function(tag) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "removeJobTag", tag))
+}
+
+#' Get the tags that are currently set to be assigned to all the jobs started 
by this thread.
+#'
+#' 

[spark] branch master updated: [SPARK-41822][CONNECT][TESTS][FOLLOW-UP] Remove the need of a fixed port to allow parallel running of tests

2023-07-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e3970cc31af [SPARK-41822][CONNECT][TESTS][FOLLOW-UP] Remove the need 
of a fixed port to allow parallel running of tests
e3970cc31af is described below

commit e3970cc31af347c47327748a0d65106622fc2fcd
Author: Zhen Li 
AuthorDate: Mon Jul 3 09:36:34 2023 +0900

[SPARK-41822][CONNECT][TESTS][FOLLOW-UP] Remove the need of a fixed port to 
allow parallel running of tests

### What changes were proposed in this pull request?
Reusing the same fixed server port in the tests causes some flakiness of 
running tests in parallel. Especially on some systems that it is slow to 
recycle ports.

This PR makes the test to not use a fixed port and also to not start a 
connection unless it is needed.

### Why are the changes needed?
Small test reliability improvements.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Only test changes.

Closes #41776 from zhenlineo/remove-fixed-port-in-tests.

Authored-by: Zhen Li 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/connect/client/SparkConnectClientSuite.scala  | 19 ++-
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
old mode 100755
new mode 100644
index 7e0b687054d..e9a10e27273
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -110,9 +110,12 @@ class SparkConnectClientSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
   }
 
   test("SparkSession initialisation with connection string") {
-val testPort = 16002
-client = 
SparkConnectClient.builder().connectionString(s"sc://localhost:$testPort").build()
-startDummyServer(testPort)
+startDummyServer(0)
+client = SparkConnectClient
+  .builder()
+  .connectionString(s"sc://localhost:${server.getPort}")
+  .build()
+
 val session = SparkSession.builder().client(client).create()
 val df = session.range(10)
 df.analyze // Trigger RPC
@@ -133,11 +136,17 @@ class SparkConnectClientSuite extends ConnectFunSuite 
with BeforeAndAfterEach {
 TestPackURI(
   "sc://localhost:1234/",
   isCorrect = true,
-  client => testClientConnection(1234)(_ => client)),
+  client => {
+assert(client.configuration.host == "localhost")
+assert(client.configuration.port == 1234)
+  }),
 TestPackURI(
   "sc://localhost/;",
   isCorrect = true,
-  client => 
testClientConnection(ConnectCommon.CONNECT_GRPC_BINDING_PORT)(_ => client)),
+  client => {
+assert(client.configuration.host == "localhost")
+assert(client.configuration.port == 
ConnectCommon.CONNECT_GRPC_BINDING_PORT)
+  }),
 TestPackURI("sc://host:123", isCorrect = true),
 TestPackURI(
   "sc://host:123/;user_id=a94",


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44246][CONNECT][FOLLOW-UP] Miscellaneous cleanups for Spark Connect Jar/Classfile Isolation

2023-07-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ad3059f23c4 [SPARK-44246][CONNECT][FOLLOW-UP] Miscellaneous cleanups 
for Spark Connect Jar/Classfile Isolation
ad3059f23c4 is described below

commit ad3059f23c407e464cf0b203bbafd7655c480866
Author: vicennial 
AuthorDate: Mon Jul 3 09:35:27 2023 +0900

[SPARK-44246][CONNECT][FOLLOW-UP] Miscellaneous cleanups for Spark Connect 
Jar/Classfile Isolation

### What changes were proposed in this pull request?

This PR is a follow-up of #41701 and addresses the comments mentioned 
[here](https://github.com/apache/spark/pull/41701#issuecomment-1608577372). The 
summary is:

- `pythonIncludes` are directly fetched from the `ArtifactManager` via 
`SessionHolder` instead of propagating through the spark conf
- `SessionHolder#withContext` renamed to 
`SessionHolder#withContextClassLoader` to decrease ambiguity.
- General increased test coverage for isolated classloading (New unit test 
in `ArtifactManagerSuite` and a new suite `ClassLoaderIsolationSuite`.

### Why are the changes needed?

General follow-ups from 
[here.](https://github.com/apache/spark/pull/41701#issuecomment-1608577372)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New test suite and unit tests.

Closes #41789 from vicennial/SPARK-44246.

Authored-by: vicennial 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/connect/planner/SparkConnectPlanner.scala  |  13 +--
 .../spark/sql/connect/service/SessionHolder.scala  |  45 +
 .../service/SparkConnectAnalyzeHandler.scala   |   2 +-
 .../connect/artifact/ArtifactManagerSuite.scala|  20 +++-
 core/src/test/resources/TestHelloV2.jar| Bin 0 -> 3784 bytes
 core/src/test/resources/TestHelloV3.jar| Bin 0 -> 3595 bytes
 .../spark/executor/ClassLoaderIsolationSuite.scala | 102 +
 7 files changed, 126 insertions(+), 56 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index cdad4fc6190..149d5512953 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -26,8 +26,6 @@ import com.google.protobuf.{Any => ProtoAny, ByteString}
 import io.grpc.{Context, Status, StatusRuntimeException}
 import io.grpc.stub.StreamObserver
 import org.apache.commons.lang3.exception.ExceptionUtils
-import org.json4s._
-import org.json4s.jackson.JsonMethods.parse
 
 import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
@@ -91,15 +89,6 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) 
extends Logging {
   private lazy val pythonExec =
 sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
 
-  // SparkConnectPlanner is used per request.
-  private lazy val pythonIncludes = {
-implicit val formats = DefaultFormats
-parse(session.conf.get("spark.connect.pythonUDF.includes", "[]"))
-  .extract[Array[String]]
-  .toList
-  .asJava
-  }
-
   // The root of the query plan is a relation and we apply the transformations 
to it.
   def transformRelation(rel: proto.Relation): LogicalPlan = {
 val plan = rel.getRelTypeCase match {
@@ -1527,7 +1516,7 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
   command = fun.getCommand.toByteArray,
   // Empty environment variables
   envVars = Maps.newHashMap(),
-  pythonIncludes = pythonIncludes,
+  pythonIncludes = 
sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
   pythonExec = pythonExec,
   pythonVer = fun.getPythonVer,
   // Empty broadcast variables
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 24502fccd96..56ef68abbc2 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -24,9 +24,6 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods.{compact, render}
-
 import 

[spark] branch master updated: [SPARK-44249][SQL][PYTHON] Refactor PythonUDTFRunner to send its return type separately

2023-07-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1fbb38ec5bd [SPARK-44249][SQL][PYTHON] Refactor PythonUDTFRunner to 
send its return type separately
1fbb38ec5bd is described below

commit 1fbb38ec5bd4ae8777053b1333fbe62a96f1e0f5
Author: Takuya UESHIN 
AuthorDate: Mon Jul 3 09:33:45 2023 +0900

[SPARK-44249][SQL][PYTHON] Refactor PythonUDTFRunner to send its return 
type separately

### What changes were proposed in this pull request?

Refactors `PythonUDTFRunner` to send its return type separately.

### Why are the changes needed?

The return type of Python UDTF doesn't need to be included in the Python 
"command" because `PythonUDTF` knows the return type. It can send the return 
type separately.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Updated the related tests and existing tests.

Closes #41792 from ueshin/issues/SPARK-44249/return_type.

Authored-by: Takuya UESHIN 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/udf.py  |  8 ++-
 python/pyspark/sql/udtf.py |  2 +-
 python/pyspark/worker.py   | 19 ++-
 .../spark/sql/catalyst/expressions/PythonUDF.scala |  2 +-
 .../execution/python/BatchEvalPythonUDTFExec.scala | 61 +++---
 .../sql/execution/python/PythonUDFRunner.scala | 48 +++--
 .../apache/spark/sql/IntegratedUDFTestUtils.scala  | 10 ++--
 .../sql/execution/python/PythonUDTFSuite.scala |  6 ---
 8 files changed, 94 insertions(+), 62 deletions(-)

diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index c6171ffece9..0d235660718 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -51,9 +51,13 @@ __all__ = ["UDFRegistration"]
 
 
 def _wrap_function(
-sc: SparkContext, func: Callable[..., Any], returnType: "DataTypeOrString"
+sc: SparkContext, func: Callable[..., Any], returnType: Optional[DataType] 
= None
 ) -> JavaObject:
-command = (func, returnType)
+command: Any
+if returnType is None:
+command = func
+else:
+command = (func, returnType)
 pickled_command, broadcast_vars, env, includes = 
_prepare_for_python_RDD(sc, command)
 assert sc._jvm is not None
 return sc._jvm.SimplePythonFunction(
diff --git a/python/pyspark/sql/udtf.py b/python/pyspark/sql/udtf.py
index 95093970596..3bf7bc977c3 100644
--- a/python/pyspark/sql/udtf.py
+++ b/python/pyspark/sql/udtf.py
@@ -118,7 +118,7 @@ class UserDefinedTableFunction:
 spark = SparkSession._getActiveSessionOrCreate()
 sc = spark.sparkContext
 
-wrapped_func = _wrap_function(sc, func, self.returnType)
+wrapped_func = _wrap_function(sc, func)
 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
 assert sc._jvm is not None
 judtf = 
sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonTableFunction(
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 71a7ccd15aa..b24600b0c1b 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -61,10 +61,10 @@ from pyspark.sql.pandas.serializers import (
 ApplyInPandasWithStateSerializer,
 )
 from pyspark.sql.pandas.types import to_arrow_type
-from pyspark.sql.types import StructType
+from pyspark.sql.types import StructType, _parse_datatype_json_string
 from pyspark.util import fail_on_stopiteration, try_simplify_traceback
 from pyspark import shuffle
-from pyspark.errors import PySparkRuntimeError, PySparkValueError
+from pyspark.errors import PySparkRuntimeError
 
 pickleSer = CPickleSerializer()
 utf8_deserializer = UTF8Deserializer()
@@ -461,20 +461,11 @@ def assign_cols_by_name(runner_conf):
 # ensure the UDTF is valid. This function also prepares a mapper function for 
applying
 # the UDTF logic to input rows.
 def read_udtf(pickleSer, infile, eval_type):
-num_udtfs = read_int(infile)
-if num_udtfs != 1:
-raise PySparkValueError(f"Unexpected number of UDTFs. Expected 1 but 
got {num_udtfs}.")
-
-# See `PythonUDFRunner.writeUDFs`.
+# See `PythonUDTFRunner.PythonUDFWriterThread.writeCommand'
 num_arg = read_int(infile)
 arg_offsets = [read_int(infile) for _ in range(num_arg)]
-num_chained_funcs = read_int(infile)
-if num_chained_funcs != 1:
-raise PySparkValueError(
-f"Unexpected number of chained UDTFs. Expected 1 but got 
{num_chained_funcs}."
-)
-
-handler, return_type = read_command(pickleSer, infile)
+handler = read_command(pickleSer, infile)
+return_type = _parse_datatype_json_string(utf8_deserializer.loads(infile))
 if not isinstance(handler, type):
 raise 

[spark] branch master updated: [SPARK-44211][PYTHON][CONNECT] Implement SparkSession.is_stopped

2023-07-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2cb1d3bf878 [SPARK-44211][PYTHON][CONNECT] Implement 
SparkSession.is_stopped
2cb1d3bf878 is described below

commit 2cb1d3bf87808896e7ef6467f7afb21d1e0a50fb
Author: Alice Sayutina 
AuthorDate: Mon Jul 3 09:18:53 2023 +0900

[SPARK-44211][PYTHON][CONNECT] Implement SparkSession.is_stopped

### What changes were proposed in this pull request?
Creates SparkSession.is_stopped, which returns if this session was stopped 
previously.

### Why are the changes needed?
It's not possible to determine if the session was closed right now

### Does this PR introduce _any_ user-facing change?
Introduces is_stopped property

### How was this patch tested?
Unit Tests

Closes #41760 from cdkrot/master.

Authored-by: Alice Sayutina 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/client/core.py  | 9 +
 python/pyspark/sql/connect/session.py  | 7 +++
 python/pyspark/sql/tests/connect/client/test_client.py | 7 +++
 python/pyspark/sql/tests/connect/test_session.py   | 7 +++
 4 files changed, 30 insertions(+)

diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 7368521259a..f8d304e9ccc 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -594,6 +594,7 @@ class SparkConnectClient(object):
 self._user_id = os.getenv("USER", None)
 
 self._channel = self._builder.toChannel()
+self._closed = False
 self._stub = grpc_lib.SparkConnectServiceStub(self._channel)
 self._artifact_manager = ArtifactManager(self._user_id, 
self._session_id, self._channel)
 # Configure logging for the SparkConnect client.
@@ -835,6 +836,14 @@ class SparkConnectClient(object):
 Close the channel.
 """
 self._channel.close()
+self._closed = True
+
+@property
+def is_closed(self) -> bool:
+"""
+Returns if the channel was closed previously using close() method
+"""
+return self._closed
 
 @property
 def host(self) -> str:
diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index 356dacd8e18..358674c9189 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -573,6 +573,13 @@ class SparkSession:
 
 stop.__doc__ = PySparkSession.stop.__doc__
 
+@property
+def is_stopped(self) -> bool:
+"""
+Returns if this session was stopped
+"""
+return self.client.is_closed
+
 @classmethod
 def getActiveSession(cls) -> Any:
 raise PySparkNotImplementedError(
diff --git a/python/pyspark/sql/tests/connect/client/test_client.py 
b/python/pyspark/sql/tests/connect/client/test_client.py
index 3e3ce6f40df..5c39d4502f5 100644
--- a/python/pyspark/sql/tests/connect/client/test_client.py
+++ b/python/pyspark/sql/tests/connect/client/test_client.py
@@ -79,6 +79,13 @@ class SparkConnectClientTestCase(unittest.TestCase):
 client.interrupt_all()
 self.assertIsNotNone(mock.req, "Interrupt API was not called when 
expected")
 
+def test_is_closed(self):
+client = SparkConnectClient("sc://foo/;token=bar")
+
+self.assertFalse(client.is_closed)
+client.close()
+self.assertTrue(client.is_closed)
+
 
 class MockService:
 # Simplest mock of the SparkConnectService.
diff --git a/python/pyspark/sql/tests/connect/test_session.py 
b/python/pyspark/sql/tests/connect/test_session.py
index 3cf6d91d404..bde22d80303 100644
--- a/python/pyspark/sql/tests/connect/test_session.py
+++ b/python/pyspark/sql/tests/connect/test_session.py
@@ -56,3 +56,10 @@ class SparkSessionTestCase(unittest.TestCase):
 test_session.stop()
 
 self.assertEqual("other", host)
+
+def test_session_stop(self):
+session = RemoteSparkSession.builder.remote("sc://other").getOrCreate()
+
+self.assertFalse(session.is_stopped)
+session.stop()
+self.assertTrue(session.is_stopped)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44268][CORE][TEST] Add tests to ensure error-classes.json and docs are in sync

2023-07-02 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b557f5752af [SPARK-44268][CORE][TEST] Add tests to ensure 
error-classes.json and docs are in sync
b557f5752af is described below

commit b557f5752afc32d614b37be610dbbca44519664b
Author: Jia Fan 
AuthorDate: Sun Jul 2 18:51:09 2023 +0300

[SPARK-44268][CORE][TEST] Add tests to ensure error-classes.json and docs 
are in sync

### What changes were proposed in this pull request?
Add new test to make sure to `error-classes.json` are match with series of 
`sql-error-conditions.md`.
After this PR, any difference between `error-classes.json` and document 
with report a error during test.
Note: only compare error class name at now.

Also fix all different which be found by new test case.

### Why are the changes needed?
Make sure our error-classes.json always sync with doc.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
new test.

Closes #41813 from Hisoka-X/SPARK-44268_sync_error_classes_to_doc.

Authored-by: Jia Fan 
Signed-off-by: Max Gekk 
---
 .../org/apache/spark/SparkThrowableSuite.scala |  51 ++
 ...ror-conditions-datatype-mismatch-error-class.md |   4 +
 ...tions-incompatible-data-to-table-error-class.md |  64 ---
 ...ror-conditions-insert-column-arity-mismatch.md} |  30 +-
 ...rror-conditions-insufficient-table-property.md} |  26 +-
 ... => sql-error-conditions-invalid-as-of-join.md} |  26 +-
 ...md => sql-error-conditions-invalid-boundary.md} |  26 +-
 ... sql-error-conditions-invalid-default-value.md} |  26 +-
 ...> sql-error-conditions-invalid-inline-table.md} |  26 +-
 ...ror-conditions-invalid-lamdba-function-call.md} |  26 +-
 ...or-conditions-invalid-limit-like-expression.md} |  26 +-
 ...nditions-invalid-parameter-value-error-class.md |  14 +-
 ...rror-conditions-invalid-partition-operation.md} |  26 +-
 docs/sql-error-conditions-invalid-sql-syntax.md|  92 
 ...nditions-invalid-time-travel-timestamp-expr.md} |  26 +-
 ...error-conditions-invalid-write-distribution.md} |  26 +-
 ...rror-conditions-malformed-record-in-parsing.md} |  24 +-
 ... => sql-error-conditions-missing-attributes.md} |  26 +-
 ...onditions-not-a-constant-string-error-class.md} |  26 +-
 ...=> sql-error-conditions-not-allowed-in-from.md} |  26 +-
 ...or-conditions-not-supported-in-jdbc-catalog.md} |  26 +-
 ...> sql-error-conditions-unsupported-add-file.md} |  26 +-
 ...-error-conditions-unsupported-default-value.md} |  26 +-
 ...r-conditions-unsupported-feature-error-class.md |  36 ++
 ... => sql-error-conditions-unsupported-insert.md} |  26 +-
 ...rror-conditions-unsupported-merge-condition.md} |  26 +-
 ... sql-error-conditions-unsupported-overwrite.md} |  26 +-
 docs/sql-error-conditions.md   | 609 -
 28 files changed, 984 insertions(+), 434 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
index 96c4e3b8ab7..034a782e533 100644
--- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
@@ -141,6 +141,57 @@ class SparkThrowableSuite extends SparkFunSuite {
 checkIfUnique(messageFormats)
   }
 
+  test("SPARK-44268: Error classes match with document") {
+val sqlstateDoc = "sql-error-conditions-sqlstates.md"
+val errors = errorReader.errorInfoMap
+val errorDocPaths = getWorkspaceFilePath("docs").toFile
+  .listFiles(_.getName.startsWith("sql-error-conditions-"))
+  .filter(!_.getName.equals(sqlstateDoc))
+  .map(f => IOUtils.toString(f.toURI, 
StandardCharsets.UTF_8)).map(_.split("\n"))
+// check the error classes in document should be in error-classes.json
+val linkInDocRegex = "\\[(.*)\\]\\((.*)\\)".r
+val commonErrorsInDoc = IOUtils.toString(getWorkspaceFilePath("docs",
+  "sql-error-conditions.md").toUri, StandardCharsets.UTF_8).split("\n")
+  .filter(_.startsWith("###")).map(s => s.replace("###", "").trim)
+  .filter(linkInDocRegex.findFirstMatchIn(_).isEmpty)
+
+commonErrorsInDoc.foreach(s => assert(errors.contains(s),
+  s"Error class: $s is not in error-classes.json"))
+
+val titlePrefix = "title:"
+val errorsInDoc = errorDocPaths.map(lines => {
+  val errorClass = lines.filter(_.startsWith(titlePrefix))
+.map(s => s.replace("error class", "").replace(titlePrefix, 
"").trim).head
+  assert(errors.contains(errorClass), s"Error class: $errorClass is not in 
error-classes.json")
+  val subClasses = lines.filter(_.startsWith("##")).map(s => 
s.replace("##", "").trim)
+.map { s =>
+  

[spark] branch master updated: [SPARK-44254][SQL] Move QueryExecutionErrors that used by DataType to sql/api as DataTypeErrors

2023-07-02 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cf852b284d5 [SPARK-44254][SQL] Move QueryExecutionErrors that used by 
DataType to sql/api as DataTypeErrors
cf852b284d5 is described below

commit cf852b284d550f9425ae7893796ae0042be6010f
Author: Rui Wang 
AuthorDate: Sun Jul 2 10:18:43 2023 +0300

[SPARK-44254][SQL] Move QueryExecutionErrors that used by DataType to 
sql/api as DataTypeErrors

### What changes were proposed in this pull request?

Moving some QueryExecutionErrors that are used by data types to `sql/api` 
and name those as DataType erros so that DataType can use those if DataType 
only stay in `sql/api` module.

### Why are the changes needed?

Towards a simpler DataType interface.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing test

Closes #41794 from amaliujia/datatype_more_refactors.

Authored-by: Rui Wang 
Signed-off-by: Max Gekk 
---
 sql/api/pom.xml|  5 ++
 .../apache/spark/sql/errors/DataTypeErrors.scala   | 95 ++
 .../spark/sql/errors/QueryExecutionErrors.scala| 42 ++
 .../apache/spark/sql/types/AbstractDataType.scala  |  4 +-
 .../scala/org/apache/spark/sql/types/Decimal.scala |  9 +-
 .../org/apache/spark/sql/types/DecimalType.scala   |  4 +-
 .../org/apache/spark/sql/types/Metadata.scala  | 10 +--
 .../org/apache/spark/sql/types/ObjectType.scala|  4 +-
 .../apache/spark/sql/types/UDTRegistration.scala   |  6 +-
 9 files changed, 127 insertions(+), 52 deletions(-)

diff --git a/sql/api/pom.xml b/sql/api/pom.xml
index 9b7917e0343..41a5b85d4c6 100644
--- a/sql/api/pom.xml
+++ b/sql/api/pom.xml
@@ -40,6 +40,11 @@
 spark-common-utils_${scala.binary.version}
 ${project.version}
 
+
+org.apache.spark
+spark-unsafe_${scala.binary.version}
+${project.version}
+
 
 
 
target/scala-${scala.binary.version}/classes
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
new file mode 100644
index 000..02e8b12c707
--- /dev/null
+++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/DataTypeErrors.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.errors
+
+import org.apache.spark.{SparkArithmeticException, SparkException, 
SparkRuntimeException, SparkUnsupportedOperationException}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Object for grouping error messages from (most) exceptions thrown during 
query execution.
+ * This does not include exceptions thrown during the eager execution of 
commands, which are
+ * grouped into [[QueryCompilationErrors]].
+ */
+private[sql] object DataTypeErrors {
+  def unsupportedOperationExceptionError(): SparkUnsupportedOperationException 
= {
+new SparkUnsupportedOperationException(
+  errorClass = "_LEGACY_ERROR_TEMP_2225",
+  messageParameters = Map.empty)
+  }
+
+  def decimalPrecisionExceedsMaxPrecisionError(
+  precision: Int, maxPrecision: Int): SparkArithmeticException = {
+new SparkArithmeticException(
+  errorClass = "DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION",
+  messageParameters = Map(
+"precision" -> precision.toString,
+"maxPrecision" -> maxPrecision.toString
+  ),
+  context = Array.empty,
+  summary = "")
+  }
+
+  def unsupportedRoundingMode(roundMode: BigDecimal.RoundingMode.Value): 
SparkException = {
+SparkException.internalError(s"Not supported rounding mode: 
${roundMode.toString}.")
+  }
+
+  def outOfDecimalTypeRangeError(str: UTF8String): SparkArithmeticException = {
+new SparkArithmeticException(
+  errorClass = "NUMERIC_OUT_OF_SUPPORTED_RANGE",
+  messageParameters = Map(
+"value" -> str.toString),
+  context =