[spark] branch branch-3.4 updated: [SPARK-42721][CONNECT][FOLLOWUP] Apply scalafmt to LoggingInterceptor

2023-03-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 67ccd8fff11 [SPARK-42721][CONNECT][FOLLOWUP] Apply scalafmt to 
LoggingInterceptor
67ccd8fff11 is described below

commit 67ccd8fff112a5233e8ac44be888fc3484b3d082
Author: Dongjoon Hyun 
AuthorDate: Fri Mar 10 19:42:55 2023 -0800

[SPARK-42721][CONNECT][FOLLOWUP] Apply scalafmt to LoggingInterceptor

### What changes were proposed in this pull request?

This is a follow-up to fix Scala linter failure at 
`LoggingInterceptor.scala`.

### Why are the changes needed?

To recover CI.

- **master**: 
https://github.com/apache/spark/actions/runs/4389407261/jobs/7686936044
- **branch-3.4**: 
https://github.com/apache/spark/actions/runs/4389407870/jobs/7686935027
```
The scalafmt check failed on connector/connect at following occurrences:

Requires formatting: LoggingInterceptor.scala

Before submitting your change, please make sure to format your code using 
the following command:
./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false 
-Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl 
connector/connect/common -pl connector/connect/server -pl 
connector/connect/client/jvm
Error: Process completed with exit code 1.
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass the CI.

Closes #40374 from dongjoon-hyun/SPARK-42721.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 967ce371ba1645d9a24dbf01a1b64faf569e8863)
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/sql/connect/service/LoggingInterceptor.scala   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
index c91075fd127..2d848d3c840 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
@@ -31,9 +31,9 @@ import io.grpc.ServerInterceptor
 import org.apache.spark.internal.Logging
 
 /**
- * A gRPC interceptor to log RPC requests and responses. It logs the protobufs 
as JSON.
- * Useful for local development. An ID is logged for each RPC so that requests 
and corresponding
- * responses can be exactly matched.
+ * A gRPC interceptor to log RPC requests and responses. It logs the protobufs 
as JSON. Useful for
+ * local development. An ID is logged for each RPC so that requests and 
corresponding responses
+ * can be exactly matched.
  */
 class LoggingInterceptor extends ServerInterceptor with Logging {
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (19cb8d7014e -> 967ce371ba1)

2023-03-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 19cb8d7014e [SPARK-42721][CONNECT] RPC logging interceptor
 add 967ce371ba1 [SPARK-42721][CONNECT][FOLLOWUP] Apply scalafmt to 
LoggingInterceptor

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/connect/service/LoggingInterceptor.scala   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-42721][CONNECT] RPC logging interceptor

2023-03-10 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 516a202e3ed [SPARK-42721][CONNECT] RPC logging interceptor
516a202e3ed is described below

commit 516a202e3ed778487f9d2eaaeb6603193eb0a7b6
Author: Raghu Angadi 
AuthorDate: Fri Mar 10 19:51:55 2023 -0400

[SPARK-42721][CONNECT] RPC logging interceptor

### What changes were proposed in this pull request?

This adds an gRPC interceptor in spark-connect server. It logs all the 
incoming RPC requests and responses.

 - How to enable: Set interceptor config. e.g.

   ./sbin/start-connect-server.sh --conf 
spark.connect.grpc.interceptor.classes=org.apache.spark.sql.connect.service.LoggingInterceptor
  --jars connector/connect/server/target/spark-connect_*-SNAPSHOT.jar

 - Sample output:

23/03/08 10:54:37 INFO LoggingInterceptor: Received RPC Request 
spark.connect.SparkConnectService/ExecutePlan (id 1868663481):
{
  "client_id": "6844bc44-4411-4481-8109-a10e3a836f97",
  "user_context": {
"user_id": "raghu"
  },
  "plan": {
"root": {
  "common": {
"plan_id": "37"
  },
  "show_string": {
"input": {
  "common": {
"plan_id": "36"
  },
  "read": {
"data_source": {
  "format": "csv",
  "schema": "",
  "paths": ["file:///tmp/x-in"]
}
  }
},
"num_rows": 20,
"truncate": 20
  }
}
  },
  "client_type": "_SPARK_CONNECT_PYTHON"
}

### Why are the changes needed?
This is useful in  development. It might be useful to debug some problems 
in production as well.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
 - Manually in development
 - Unit test

Closes #40342 from rangadi/logging-interceptor.

Authored-by: Raghu Angadi 
Signed-off-by: Herman van Hovell 
(cherry picked from commit 19cb8d7014e03d828794a637bc67d09fc84650ad)
Signed-off-by: Herman van Hovell 
---
 connector/connect/server/pom.xml   |  6 ++
 .../sql/connect/service/LoggingInterceptor.scala   | 75 ++
 .../connect/service/InterceptorRegistrySuite.scala |  9 +++
 3 files changed, 90 insertions(+)

diff --git a/connector/connect/server/pom.xml b/connector/connect/server/pom.xml
index a2aff8f9f31..302f6590fd2 100644
--- a/connector/connect/server/pom.xml
+++ b/connector/connect/server/pom.xml
@@ -155,6 +155,12 @@
   ${protobuf.version}
   compile
 
+
+  com.google.protobuf
+  protobuf-java-util
+  ${protobuf.version}
+  compile
+
 
   io.grpc
   grpc-netty
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
new file mode 100644
index 000..c91075fd127
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.Random
+
+import com.google.protobuf.Message
+import com.google.protobuf.util.JsonFormat
+import io.grpc.ForwardingServerCall.SimpleForwardingServerCall
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.Metadata
+import io.grpc.ServerCall
+import io.grpc.ServerCallHandler
+import io.grpc.ServerInterceptor
+
+import org.apache.spark.internal.Logging
+
+/**
+ * A 

[spark] branch master updated: [SPARK-42721][CONNECT] RPC logging interceptor

2023-03-10 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 19cb8d7014e [SPARK-42721][CONNECT] RPC logging interceptor
19cb8d7014e is described below

commit 19cb8d7014e03d828794a637bc67d09fc84650ad
Author: Raghu Angadi 
AuthorDate: Fri Mar 10 19:51:55 2023 -0400

[SPARK-42721][CONNECT] RPC logging interceptor

### What changes were proposed in this pull request?

This adds an gRPC interceptor in spark-connect server. It logs all the 
incoming RPC requests and responses.

 - How to enable: Set interceptor config. e.g.

   ./sbin/start-connect-server.sh --conf 
spark.connect.grpc.interceptor.classes=org.apache.spark.sql.connect.service.LoggingInterceptor
  --jars connector/connect/server/target/spark-connect_*-SNAPSHOT.jar

 - Sample output:

23/03/08 10:54:37 INFO LoggingInterceptor: Received RPC Request 
spark.connect.SparkConnectService/ExecutePlan (id 1868663481):
{
  "client_id": "6844bc44-4411-4481-8109-a10e3a836f97",
  "user_context": {
"user_id": "raghu"
  },
  "plan": {
"root": {
  "common": {
"plan_id": "37"
  },
  "show_string": {
"input": {
  "common": {
"plan_id": "36"
  },
  "read": {
"data_source": {
  "format": "csv",
  "schema": "",
  "paths": ["file:///tmp/x-in"]
}
  }
},
"num_rows": 20,
"truncate": 20
  }
}
  },
  "client_type": "_SPARK_CONNECT_PYTHON"
}

### Why are the changes needed?
This is useful in  development. It might be useful to debug some problems 
in production as well.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
 - Manually in development
 - Unit test

Closes #40342 from rangadi/logging-interceptor.

Authored-by: Raghu Angadi 
Signed-off-by: Herman van Hovell 
---
 connector/connect/server/pom.xml   |  6 ++
 .../sql/connect/service/LoggingInterceptor.scala   | 75 ++
 .../connect/service/InterceptorRegistrySuite.scala |  9 +++
 3 files changed, 90 insertions(+)

diff --git a/connector/connect/server/pom.xml b/connector/connect/server/pom.xml
index 35b6ae60a16..079d07db362 100644
--- a/connector/connect/server/pom.xml
+++ b/connector/connect/server/pom.xml
@@ -155,6 +155,12 @@
   ${protobuf.version}
   compile
 
+
+  com.google.protobuf
+  protobuf-java-util
+  ${protobuf.version}
+  compile
+
 
   io.grpc
   grpc-netty
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
new file mode 100644
index 000..c91075fd127
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.Random
+
+import com.google.protobuf.Message
+import com.google.protobuf.util.JsonFormat
+import io.grpc.ForwardingServerCall.SimpleForwardingServerCall
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.Metadata
+import io.grpc.ServerCall
+import io.grpc.ServerCallHandler
+import io.grpc.ServerInterceptor
+
+import org.apache.spark.internal.Logging
+
+/**
+ * A gRPC interceptor to log RPC requests and responses. It logs the protobufs 
as JSON.
+ * Useful for local development. An 

[spark] branch branch-3.4 updated: [SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by newSession should not share the channel

2023-03-10 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new d79d10291c6 [SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by 
newSession should not share the channel
d79d10291c6 is described below

commit d79d10291c686377468d7f1bf46f866a243d5551
Author: Rui Wang 
AuthorDate: Fri Mar 10 19:38:19 2023 -0400

[SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by newSession should 
not share the channel

### What changes were proposed in this pull request?

SparkSession created by newSession should not share the channel. This is 
because that a SparkSession might be called `stop` in which the channel it uses 
will be shutdown. If the channel is shared, other non-stop SparkSession that is 
sharing this channel will get into trouble.

### Why are the changes needed?

This fixes the issue when one SparkSession is stopped to cause other active 
SparkSession not working in Spark Connect.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing UT

Closes #40346 from amaliujia/rw-session-do-not-share-channel.

Authored-by: Rui Wang 
Signed-off-by: Herman van Hovell 
(cherry picked from commit e5f56e51dcbffb1f79dc00e8493e946ce1209cdc)
Signed-off-by: Herman van Hovell 
---
 .../spark/sql/connect/client/SparkConnectClient.scala| 16 +---
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala   |  2 +-
 .../org/apache/spark/sql/PlanGenerationTestSuite.scala   |  2 +-
 .../org/apache/spark/sql/SQLImplicitsTestSuite.scala |  2 +-
 4 files changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 348fc94bb89..736a8af8e38 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.sql.connect.client
 
-import io.grpc.{CallCredentials, CallOptions, Channel, ClientCall, 
ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc, 
InsecureChannelCredentials, ManagedChannel, Metadata, MethodDescriptor, Status, 
TlsChannelCredentials}
+import io.grpc.{CallCredentials, CallOptions, Channel, ClientCall, 
ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc, 
InsecureChannelCredentials, ManagedChannel, ManagedChannelBuilder, Metadata, 
MethodDescriptor, Status, TlsChannelCredentials}
 import java.net.URI
 import java.util.UUID
 import java.util.concurrent.Executor
+import scala.language.existentials
 
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.UserContext
@@ -31,9 +32,11 @@ import 
org.apache.spark.sql.connect.common.config.ConnectCommon
  */
 private[sql] class SparkConnectClient(
 private val userContext: proto.UserContext,
-private val channel: ManagedChannel,
+private val channelBuilder: ManagedChannelBuilder[_],
 private[client] val userAgent: String) {
 
+  private[this] lazy val channel: ManagedChannel = channelBuilder.build()
+
   private[this] val stub = 
proto.SparkConnectServiceGrpc.newBlockingStub(channel)
 
   private[client] val artifactManager: ArtifactManager = new 
ArtifactManager(userContext, channel)
@@ -164,7 +167,7 @@ private[sql] class SparkConnectClient(
   }
 
   def copy(): SparkConnectClient = {
-new SparkConnectClient(userContext, channel, userAgent)
+new SparkConnectClient(userContext, channelBuilder, userAgent)
   }
 
   /**
@@ -208,8 +211,8 @@ private[sql] object SparkConnectClient {
   "Either remove 'token' or set 'use_ssl=true'"
 
   // for internal tests
-  def apply(userContext: UserContext, channel: ManagedChannel): 
SparkConnectClient =
-new SparkConnectClient(userContext, channel, DEFAULT_USER_AGENT)
+  def apply(userContext: UserContext, builder: ManagedChannelBuilder[_]): 
SparkConnectClient =
+new SparkConnectClient(userContext, builder, DEFAULT_USER_AGENT)
 
   def builder(): Builder = new Builder()
 
@@ -394,10 +397,9 @@ private[sql] object SparkConnectClient {
   if (metadata.nonEmpty) {
 channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata))
   }
-  val channel: ManagedChannel = channelBuilder.build()
   new SparkConnectClient(
 userContextBuilder.build(),
-channel,
+channelBuilder,
 userAgent.getOrElse(DEFAULT_USER_AGENT))
 }
   }
diff --git 

[spark] branch master updated (88b91ee10d0 -> e5f56e51dcb)

2023-03-10 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 88b91ee10d0 [SPARK-42718][BUILD] Upgrade rocksdbjni to 7.10.2
 add e5f56e51dcb [SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by 
newSession should not share the channel

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/connect/client/SparkConnectClient.scala| 16 +---
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala   |  2 +-
 .../org/apache/spark/sql/PlanGenerationTestSuite.scala   |  2 +-
 .../org/apache/spark/sql/SQLImplicitsTestSuite.scala |  2 +-
 4 files changed, 12 insertions(+), 10 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (061bd92375a -> 88b91ee10d0)

2023-03-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 061bd92375a [SPARK-42398][SQL][FOLLOWUP] DelegatingCatalogExtension 
should override the new createTable method
 add 88b91ee10d0 [SPARK-42718][BUILD] Upgrade rocksdbjni to 7.10.2

No new revisions were added by this update.

Summary of changes:
 dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 +-
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +-
 pom.xml   | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-42398][SQL][FOLLOWUP] DelegatingCatalogExtension should override the new createTable method

2023-03-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 6d8ea2f0e18 [SPARK-42398][SQL][FOLLOWUP] DelegatingCatalogExtension 
should override the new createTable method
6d8ea2f0e18 is described below

commit 6d8ea2f0e18e758977e3dfe39d524e6b0f0d54c8
Author: Wenchen Fan 
AuthorDate: Fri Mar 10 13:00:14 2023 -0800

[SPARK-42398][SQL][FOLLOWUP] DelegatingCatalogExtension should override the 
new createTable method

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/40049 to fix a 
small issue: `DelegatingCatalogExtension` should also override the new 
`createTable` function and call the session catalog, instead of using the 
default implementation.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A, too trivial.

Closes #40369 from cloud-fan/api.

Authored-by: Wenchen Fan 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 061bd92375ae9232c9b901ab0760f9712790c26f)
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/connector/catalog/DelegatingCatalogExtension.java  | 9 +
 1 file changed, 9 insertions(+)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
index 534e1b86eca..f6686d2e4d3 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
@@ -102,6 +102,15 @@ public abstract class DelegatingCatalogExtension 
implements CatalogExtension {
 return asTableCatalog().createTable(ident, schema, partitions, properties);
   }
 
+  @Override
+  public Table createTable(
+  Identifier ident,
+  Column[] columns,
+  Transform[] partitions,
+  Map properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException {
+return asTableCatalog().createTable(ident, columns, partitions, 
properties);
+  }
+
   @Override
   public Table alterTable(
   Identifier ident,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-42398][SQL][FOLLOWUP] DelegatingCatalogExtension should override the new createTable method

2023-03-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 061bd92375a [SPARK-42398][SQL][FOLLOWUP] DelegatingCatalogExtension 
should override the new createTable method
061bd92375a is described below

commit 061bd92375ae9232c9b901ab0760f9712790c26f
Author: Wenchen Fan 
AuthorDate: Fri Mar 10 13:00:14 2023 -0800

[SPARK-42398][SQL][FOLLOWUP] DelegatingCatalogExtension should override the 
new createTable method

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/40049 to fix a 
small issue: `DelegatingCatalogExtension` should also override the new 
`createTable` function and call the session catalog, instead of using the 
default implementation.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A, too trivial.

Closes #40369 from cloud-fan/api.

Authored-by: Wenchen Fan 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/connector/catalog/DelegatingCatalogExtension.java  | 9 +
 1 file changed, 9 insertions(+)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
index 534e1b86eca..f6686d2e4d3 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
@@ -102,6 +102,15 @@ public abstract class DelegatingCatalogExtension 
implements CatalogExtension {
 return asTableCatalog().createTable(ident, schema, partitions, properties);
   }
 
+  @Override
+  public Table createTable(
+  Identifier ident,
+  Column[] columns,
+  Transform[] partitions,
+  Map properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException {
+return asTableCatalog().createTable(ident, columns, partitions, 
properties);
+  }
+
   @Override
   public Table alterTable(
   Identifier ident,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (645d33ea3e8 -> 164db5ba3c3)

2023-03-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 645d33ea3e8 [SPARK-42685][CORE] Optimize Utils.bytesToString routines
 add 164db5ba3c3 Revert "[SPARK-41498] Propagate metadata through Union"

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/analysis/Analyzer.scala |   7 -
 .../plans/logical/basicLogicalOperators.scala  |  58 +
 .../sql/connector/catalog/InMemoryBaseTable.scala  |   2 +-
 .../spark/sql/connector/MetadataColumnSuite.scala  | 283 -
 4 files changed, 14 insertions(+), 336 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: Revert "[SPARK-41498] Propagate metadata through Union"

2023-03-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 7e71378764f Revert "[SPARK-41498] Propagate metadata through Union"
7e71378764f is described below

commit 7e71378764fdaba42bf3868414045af920821742
Author: Wenchen Fan 
AuthorDate: Fri Mar 10 10:30:13 2023 -0800

Revert "[SPARK-41498] Propagate metadata through Union"

This reverts commit 827ca9b82476552458e8ba7b01b90001895e8384.

### What changes were proposed in this pull request?

After more thinking, it's a bit fragile to propagate metadata columns 
through Union. We have added quite some new fields in the file source 
`_metadata` metadata column such as `row_index`, `block_start`, etc. Some are 
parquet only. The same thing may happen in other data sources as well. If one 
day one table under Union adds a new metadata column (or add a new field if the 
metadata column is a struct type), but other tables under Union do not have 
this new column, then Union can't pro [...]

To be future-proof, let's revert this support.

### Why are the changes needed?

to make the analysis behavior more robust.

### Does this PR introduce _any_ user-facing change?

Yes, but propagating metadata columns through Union is not released yet.

### How was this patch tested?

N/A

Closes #40371 from cloud-fan/revert.

Authored-by: Wenchen Fan 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 164db5ba3c39614017f5ef6428194a442d79b425)
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/catalyst/analysis/Analyzer.scala |   7 -
 .../plans/logical/basicLogicalOperators.scala  |  58 +
 .../sql/connector/catalog/InMemoryBaseTable.scala  |   2 +-
 .../spark/sql/connector/MetadataColumnSuite.scala  | 283 -
 4 files changed, 14 insertions(+), 336 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e5d78b21f19..ddd26c2efe1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1040,13 +1040,6 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
   child = addMetadataCol(p.child, requiredAttrIds))
 newProj.copyTagsFrom(p)
 newProj
-  case u: Union if u.metadataOutput.exists(a => 
requiredAttrIds.contains(a.exprId)) =>
-u.withNewChildren(u.children.map { child =>
-  // The children of a Union will have the same attributes with 
different expression IDs
-  val exprIdMap = u.metadataOutput.map(_.exprId)
-.zip(child.metadataOutput.map(_.exprId)).toMap
-  addMetadataCol(child, requiredAttrIds.map(a => 
exprIdMap.getOrElse(a, a)))
-})
   case _ => plan.withNewChildren(plan.children.map(addMetadataCol(_, 
requiredAttrIds)))
 }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 74929bf5d79..21bf6419cdd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -450,55 +450,23 @@ case class Union(
   AttributeSet.fromAttributeSets(children.map(_.outputSet)).size
   }
 
-  /**
-   * Merges a sequence of attributes to have a common datatype and updates the
-   * nullability to be consistent with the attributes being merged.
-   */
-  private def mergeAttributes(attributes: Seq[Attribute]): Attribute = {
-val firstAttr = attributes.head
-val nullable = attributes.exists(_.nullable)
-val newDt = attributes.map(_.dataType).reduce(StructType.unionLikeMerge)
-if (firstAttr.dataType == newDt) {
-  firstAttr.withNullability(nullable)
-} else {
-  AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
-firstAttr.exprId, firstAttr.qualifier)
-}
-  }
-
-  override def output: Seq[Attribute] = 
children.map(_.output).transpose.map(mergeAttributes)
-
-  override def metadataOutput: Seq[Attribute] = {
-val childrenMetadataOutput = children.map(_.metadataOutput)
-// This follows similar code in `CheckAnalysis` to check if the output of 
a Union is correct,
-// but just silently doesn't return an output instead of throwing an 
error. It also ensures
-// that the attribute and data type names are the same.
-val refDataTypes = 

[spark] branch branch-3.4 updated: [SPARK-42743][SQL] Support analyze TimestampNTZ columns

2023-03-10 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 24f3d4d1791 [SPARK-42743][SQL] Support analyze TimestampNTZ columns
24f3d4d1791 is described below

commit 24f3d4d17913ec90a48ecf9dd23b4db7c19d10c2
Author: Gengliang Wang 
AuthorDate: Fri Mar 10 14:33:45 2023 +0300

[SPARK-42743][SQL] Support analyze TimestampNTZ columns

### What changes were proposed in this pull request?

Support analyze TimestampNTZ columns
```
ANALYZE TABLE table_name [ PARTITION clause ]
COMPUTE STATISTICS [ NOSCAN | FOR COLUMNS col1 [, ...] | FOR ALL 
COLUMNS ]
```

### Why are the changes needed?

Support computing statistics of TimestmapNTZ columns, which can be used for 
optimizations.

### Does this PR introduce _any_ user-facing change?

No, the TimestampNTZ type is not released yet.

### How was this patch tested?

Update existing UT

Closes #40362 from gengliangwang/analyzeColumn.

Authored-by: Gengliang Wang 
Signed-off-by: Max Gekk 
---
 .../spark/sql/catalyst/catalog/interface.scala |  9 +--
 .../sql/catalyst/util/TimestampFormatter.scala |  8 ++
 .../execution/command/AnalyzeColumnCommand.scala   |  5 ++--
 .../spark/sql/execution/command/CommandUtils.scala |  6 ++---
 .../spark/sql/StatisticsCollectionTestBase.scala   | 29 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala|  3 ++-
 6 files changed, 44 insertions(+), 16 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 829c121c583..6f4c4f27efc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -661,11 +661,13 @@ object CatalogColumnStat extends Logging {
   def getTimestampFormatter(
   isParsing: Boolean,
   format: String = "-MM-dd HH:mm:ss.SS",
-  zoneId: ZoneId = ZoneOffset.UTC): TimestampFormatter = {
+  zoneId: ZoneId = ZoneOffset.UTC,
+  forTimestampNTZ: Boolean = false): TimestampFormatter = {
 TimestampFormatter(
   format = format,
   zoneId = zoneId,
-  isParsing = isParsing)
+  isParsing = isParsing,
+  forTimestampNTZ = forTimestampNTZ)
   }
 
   /**
@@ -702,6 +704,9 @@ object CatalogColumnStat extends Logging {
 val externalValue = dataType match {
   case DateType => DateFormatter().format(v.asInstanceOf[Int])
   case TimestampType => getTimestampFormatter(isParsing = 
false).format(v.asInstanceOf[Long])
+  case TimestampNTZType =>
+getTimestampFormatter(isParsing = false, forTimestampNTZ = true)
+  .format(v.asInstanceOf[Long])
   case BooleanType | _: IntegralType | FloatType | DoubleType => v
   case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
   // This version of Spark does not use min/max for binary/string types so 
we ignore it.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 8ebe77978b5..392e8ebdc6c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -525,6 +525,14 @@ object TimestampFormatter {
 getFormatter(Some(format), zoneId, isParsing = isParsing)
   }
 
+  def apply(
+  format: String,
+  zoneId: ZoneId,
+  isParsing: Boolean,
+  forTimestampNTZ: Boolean): TimestampFormatter = {
+getFormatter(Some(format), zoneId, isParsing = isParsing, forTimestampNTZ 
= forTimestampNTZ)
+  }
+
   def apply(zoneId: ZoneId): TimestampFormatter = {
 getFormatter(None, zoneId, isParsing = false)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index d821b127e06..299f41eb55e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -23,7 +23,7 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableTyp
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{DatetimeType, _}
 
 
 

[spark] branch master updated: [SPARK-42685][CORE] Optimize Utils.bytesToString routines

2023-03-10 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 645d33ea3e8 [SPARK-42685][CORE] Optimize Utils.bytesToString routines
645d33ea3e8 is described below

commit 645d33ea3e8e9ba41409f15b63544ea6b078fba4
Author: Alkis Evlogimenos 
AuthorDate: Fri Mar 10 11:27:41 2023 -0600

[SPARK-42685][CORE] Optimize Utils.bytesToString routines

### What changes were proposed in this pull request?

Optimize `Utils.bytesToString`. Arithmetic ops on `BigInt` and `BigDecimal` 
are order(s) of magnitude slower than the ops on primitive types. Division is 
an especially slow operation and it is used en masse here.

To avoid heating up the Earth while formatting byte counts for human 
consumption we observe that most formatting operations are not in the 10s of 
EiBs but on counts that fit in 64-bits and use (fastpath) 64-bit operations to 
format them.

### Why are the changes needed?
Use of `Utils.bytesToString` is prevalent through the codebase and they are 
mainly used in logs. If the logs are emitted then this becomes a heavyweight 
operation.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #40301 from alkis/faster-byte-to-string.

Authored-by: Alkis Evlogimenos 
Signed-off-by: Sean Owen 
---
 .../main/scala/org/apache/spark/util/Utils.scala   | 43 --
 1 file changed, 16 insertions(+), 27 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 510486bc56b..d81e85ffe08 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1305,41 +1305,30 @@ private[spark] object Utils extends Logging {
 (JavaUtils.byteStringAsBytes(str) / 1024 / 1024).toInt
   }
 
+  private[this] val siByteSizes =
+Array(1L << 60, 1L << 50, 1L << 40, 1L << 30, 1L << 20, 1L << 10, 1)
+  private[this] val siByteSuffixes =
+Array("EiB", "PiB", "TiB", "GiB", "MiB", "KiB", "B")
   /**
* Convert a quantity in bytes to a human-readable string such as "4.0 MiB".
*/
-  def bytesToString(size: Long): String = bytesToString(BigInt(size))
+  def bytesToString(size: Long): String = {
+var i = 0
+while (i < siByteSizes.length - 1 && size < 2 * siByteSizes(i)) i += 1
+"%.1f %s".formatLocal(Locale.US, size.toDouble / siByteSizes(i), 
siByteSuffixes(i))
+  }
 
   def bytesToString(size: BigInt): String = {
 val EiB = 1L << 60
-val PiB = 1L << 50
-val TiB = 1L << 40
-val GiB = 1L << 30
-val MiB = 1L << 20
-val KiB = 1L << 10
-
-if (size >= BigInt(1L << 11) * EiB) {
+if (size.isValidLong) {
+  // Common case, most sizes fit in 64 bits and all ops on BigInt are 
order(s) of magnitude
+  // slower than Long/Double.
+  bytesToString(size.toLong)
+} else if (size < BigInt(2L << 10) * EiB) {
+  "%.1f EiB".formatLocal(Locale.US, BigDecimal(size) / EiB)
+} else {
   // The number is too large, show it in scientific notation.
   BigDecimal(size, new MathContext(3, RoundingMode.HALF_UP)).toString() + 
" B"
-} else {
-  val (value, unit) = {
-if (size >= 2 * EiB) {
-  (BigDecimal(size) / EiB, "EiB")
-} else if (size >= 2 * PiB) {
-  (BigDecimal(size) / PiB, "PiB")
-} else if (size >= 2 * TiB) {
-  (BigDecimal(size) / TiB, "TiB")
-} else if (size >= 2 * GiB) {
-  (BigDecimal(size) / GiB, "GiB")
-} else if (size >= 2 * MiB) {
-  (BigDecimal(size) / MiB, "MiB")
-} else if (size >= 2 * KiB) {
-  (BigDecimal(size) / KiB, "KiB")
-} else {
-  (BigDecimal(size), "B")
-}
-  }
-  "%.1f %s".formatLocal(Locale.US, value, unit)
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2

2023-03-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 4bbdcbcfd95 [SPARK-42745][SQL] Improved AliasAwareOutputExpression 
works with DSv2
4bbdcbcfd95 is described below

commit 4bbdcbcfd955d5ef3144badc4c5c570a9f8c4868
Author: Peter Toth 
AuthorDate: Fri Mar 10 20:58:38 2023 +0800

[SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2

### What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/37525 (SPARK-40086 / 
SPARK-42049) the following, simple subselect expression containing query:
```
select (select sum(id) from t1)
```
fails with:
```
09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 
0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
at 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47)
at 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47)
at 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60)
at scala.runtime.Statics.anyHash(Statics.java:122)
...
at 
org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249)
at scala.runtime.Statics.anyHash(Statics.java:122)
at 
scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
at 
scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149)
at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148)
at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44)
at scala.collection.mutable.HashTable.init(HashTable.scala:110)
at scala.collection.mutable.HashTable.init$(HashTable.scala:89)
at scala.collection.mutable.HashMap.init(HashMap.scala:44)
at scala.collection.mutable.HashMap.readObject(HashMap.scala:195)
...
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85)
at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
when DSv2 is enabled.

This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` 
as those shouldn't throw NPE in any circumstances.

But if we dig deeper we realize that the NPE orrurs since 
https://github.com/apache/spark/pull/37525 and the root cause of the problem is 
changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The 
mutable map deserialization invokes the `hashCode()` of the keys while that is 
not the case with immutable maps. In this case the key is a subquery expression 
whose plan contains the `BatchScanExec`.
Please note that the mutability of `aliasMap` shouldn't be an issue as it 
is a `private` field of `AliasAwareOutputExpression` (though adding a simple 
`.toMap` would also help to avoid the NPE).
Based on the above findings this PR also proposes making `aliasMap` to 
transient as it isn't needed on executors.

A side quiestion is if adding any subqery expressions to 
`AliasAwareOutputExpression.aliasMap` makes any sense because 
`AliasAwareOutputExpression.projectExpression()` mainly projects 
`child.outputPartitioning` and `child.outputOrdering` that can't contain 
subquery expressions. But there are a few exceptions (`SortAggregateExec`, 
`TakeOrderedAndProjectExec`) where 
`AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the 
`child` and actually leaving those expressions i [...]

### Why are the changes needed?
To fix regression introduced with 
https://github.com/apache/spark/pull/37525.

### Does this PR 

[spark] branch master updated: [SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2

2023-03-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 93d5816b3f1 [SPARK-42745][SQL] Improved AliasAwareOutputExpression 
works with DSv2
93d5816b3f1 is described below

commit 93d5816b3f1460b405c9828ed5ae646adfa236aa
Author: Peter Toth 
AuthorDate: Fri Mar 10 20:58:38 2023 +0800

[SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2

### What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/37525 (SPARK-40086 / 
SPARK-42049) the following, simple subselect expression containing query:
```
select (select sum(id) from t1)
```
fails with:
```
09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 
0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
at 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47)
at 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47)
at 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60)
at scala.runtime.Statics.anyHash(Statics.java:122)
...
at 
org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249)
at scala.runtime.Statics.anyHash(Statics.java:122)
at 
scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
at 
scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149)
at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148)
at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44)
at scala.collection.mutable.HashTable.init(HashTable.scala:110)
at scala.collection.mutable.HashTable.init$(HashTable.scala:89)
at scala.collection.mutable.HashMap.init(HashMap.scala:44)
at scala.collection.mutable.HashMap.readObject(HashMap.scala:195)
...
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85)
at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
when DSv2 is enabled.

This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` 
as those shouldn't throw NPE in any circumstances.

But if we dig deeper we realize that the NPE orrurs since 
https://github.com/apache/spark/pull/37525 and the root cause of the problem is 
changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The 
mutable map deserialization invokes the `hashCode()` of the keys while that is 
not the case with immutable maps. In this case the key is a subquery expression 
whose plan contains the `BatchScanExec`.
Please note that the mutability of `aliasMap` shouldn't be an issue as it 
is a `private` field of `AliasAwareOutputExpression` (though adding a simple 
`.toMap` would also help to avoid the NPE).
Based on the above findings this PR also proposes making `aliasMap` to 
transient as it isn't needed on executors.

A side quiestion is if adding any subqery expressions to 
`AliasAwareOutputExpression.aliasMap` makes any sense because 
`AliasAwareOutputExpression.projectExpression()` mainly projects 
`child.outputPartitioning` and `child.outputOrdering` that can't contain 
subquery expressions. But there are a few exceptions (`SortAggregateExec`, 
`TakeOrderedAndProjectExec`) where 
`AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the 
`child` and actually leaving those expressions i [...]

### Why are the changes needed?
To fix regression introduced with 
https://github.com/apache/spark/pull/37525.

### Does this PR introduce 

[spark] branch master updated (f8966e7eee1 -> ac30c07dff4)

2023-03-10 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from f8966e7eee1 [SPARK-42702][SPARK-42623][SQL] Support parameterized 
query in subquery and CTE
 add ac30c07dff4 [SPARK-42743][SQL] Support analyze TimestampNTZ columns

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/catalog/interface.scala |  9 +--
 .../sql/catalyst/util/TimestampFormatter.scala |  8 ++
 .../execution/command/AnalyzeColumnCommand.scala   |  5 ++--
 .../spark/sql/execution/command/CommandUtils.scala |  6 ++---
 .../spark/sql/StatisticsCollectionTestBase.scala   | 29 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala|  3 ++-
 6 files changed, 44 insertions(+), 16 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org