This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f5101319c1a [SPARK-41046][CONNECT] Support CreateView in Connect DSL f5101319c1a is described below commit f5101319c1a83a754d899e10a367356af069ca66 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Thu Nov 10 13:02:45 2022 +0800 [SPARK-41046][CONNECT] Support CreateView in Connect DSL ### What changes were proposed in this pull request? This PR supports creating global temp view or local temp view in Connect DSL. In proto, it is modeled as a command which will be executed immediately on the server side. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #38566 from amaliujia/create_view_api. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../src/main/protobuf/spark/connect/commands.proto | 19 ++++++++ .../command/SparkConnectCommandPlanner.scala | 32 +++++++++++++ .../org/apache/spark/sql/connect/dsl/package.scala | 13 ++++++ .../planner/SparkConnectCommandPlannerSuite.scala | 19 ++++++++ python/pyspark/sql/connect/proto/commands_pb2.py | 30 ++++++------ python/pyspark/sql/connect/proto/commands_pb2.pyi | 54 +++++++++++++++++++++- 6 files changed, 152 insertions(+), 15 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/commands.proto b/connector/connect/src/main/protobuf/spark/connect/commands.proto index 79c6cffdf60..086d4d0cc92 100644 --- a/connector/connect/src/main/protobuf/spark/connect/commands.proto +++ b/connector/connect/src/main/protobuf/spark/connect/commands.proto @@ -31,6 +31,7 @@ message Command { oneof command_type { CreateScalarFunction create_function = 1; WriteOperation write_operation = 2; + CreateDataFrameViewCommand create_dataframe_view = 3; } } @@ -65,6 +66,24 @@ message CreateScalarFunction { } } +// A command that can create DataFrame global temp view or local temp view. +message CreateDataFrameViewCommand { + // Required. The relation that this view will be built on. + Relation input = 1; + + // Required. View name. + string name = 2; + + // Required. Whether this is global temp view or local temp view. + bool is_global = 3; + + // Required. + // + // If true, and if the view already exists, updates it; if false, and if the view + // already exists, throws exception. + bool replace = 4; +} + // As writes are not directly handled during analysis and planning, they are modeled as commands. message WriteOperation { // The output of the `input` relation will be persisted according to the options. diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala index 80c36a4773e..11090976c7f 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala @@ -25,7 +25,11 @@ import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction} import org.apache.spark.connect.proto import org.apache.spark.connect.proto.WriteOperation import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView} +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connect.planner.{DataTypeProtoConverter, SparkConnectPlanner} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.command.CreateViewCommand import org.apache.spark.sql.execution.python.UserDefinedPythonFunction import org.apache.spark.sql.types.StringType @@ -45,6 +49,8 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command) handleCreateScalarFunction(command.getCreateFunction) case proto.Command.CommandTypeCase.WRITE_OPERATION => handleWriteOperation(command.getWriteOperation) + case proto.Command.CommandTypeCase.CREATE_DATAFRAME_VIEW => + handleCreateViewCommand(command.getCreateDataframeView) case _ => throw new UnsupportedOperationException(s"$command not supported.") } } @@ -79,6 +85,32 @@ class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command) session.udf.registerPython(cf.getPartsList.asScala.head, udf) } + def handleCreateViewCommand(createView: proto.CreateDataFrameViewCommand): Unit = { + val viewType = if (createView.getIsGlobal) GlobalTempView else LocalTempView + + val tableIdentifier = + try { + session.sessionState.sqlParser.parseTableIdentifier(createView.getName) + } catch { + case _: ParseException => + throw QueryCompilationErrors.invalidViewNameError(createView.getName) + } + + val plan = CreateViewCommand( + name = tableIdentifier, + userSpecifiedColumns = Nil, + comment = None, + properties = Map.empty, + originalText = None, + plan = new SparkConnectPlanner(createView.getInput, session).transform(), + allowExisting = false, + replace = createView.getReplace, + viewType = viewType, + isAnalyzed = true) + + Dataset.ofRows(session, plan).queryExecution.commandExecuted + } + /** * Transforms the write operation and executes it. * diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 416b20f2ba9..ec14333fdc3 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -182,6 +182,19 @@ package object dsl { writeOp.setInput(logicalPlan) Command.newBuilder().setWriteOperation(writeOp.build()).build() } + + def createView(name: String, global: Boolean, replace: Boolean): Command = { + Command + .newBuilder() + .setCreateDataframeView( + CreateDataFrameViewCommand + .newBuilder() + .setName(name) + .setIsGlobal(global) + .setReplace(replace) + .setInput(logicalPlan)) + .build() + } } } diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala index e5ca670e4dd..8ab8e0599fc 100644 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectCommandPlannerSuite.scala @@ -138,4 +138,23 @@ class SparkConnectCommandPlannerSuite } } + test("Test CreateView") { + withView("view1", "view2", "view3", "view4") { + transform(localRelation.createView("view1", global = true, replace = true)) + assert(spark.catalog.tableExists("global_temp.view1")) + + transform(localRelation.createView("view2", global = false, replace = true)) + assert(spark.catalog.tableExists("view2")) + + transform(localRelation.createView("view3", global = true, replace = false)) + assertThrows[AnalysisException] { + transform(localRelation.createView("view3", global = true, replace = false)) + } + + transform(localRelation.createView("view4", global = false, replace = false)) + assertThrows[AnalysisException] { + transform(localRelation.createView("view4", global = false, replace = false)) + } + } + } } diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py index fa05b6ff76c..11f53322ce7 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.py +++ b/python/pyspark/sql/connect/proto/commands_pb2.py @@ -33,7 +33,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xb3\x01\n\x07\x43ommand\x12N\n\x0f\x63reate_function\x18\x01 \x01(\x0b\x32#.spark.connect.CreateScalarFunctionH\x00R\x0e\x63reateFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperationB\x0e\n\x0c\x63ommand_type"\x97\x04\n\x14\x43reateScalarFunction\x12\x14\n\x05parts\x18\x01 \x03(\tR\x05parts\x12P\n\x0 [...] + b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x94\x02\n\x07\x43ommand\x12N\n\x0f\x63reate_function\x18\x01 \x01(\x0b\x32#.spark.connect.CreateScalarFunctionH\x00R\x0e\x63reateFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateD [...] ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) @@ -45,17 +45,19 @@ if _descriptor._USE_C_DESCRIPTORS == False: _WRITEOPERATION_OPTIONSENTRY._options = None _WRITEOPERATION_OPTIONSENTRY._serialized_options = b"8\001" _COMMAND._serialized_start = 106 - _COMMAND._serialized_end = 285 - _CREATESCALARFUNCTION._serialized_start = 288 - _CREATESCALARFUNCTION._serialized_end = 823 - _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_start = 661 - _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_end = 800 - _WRITEOPERATION._serialized_start = 826 - _WRITEOPERATION._serialized_end = 1568 - _WRITEOPERATION_OPTIONSENTRY._serialized_start = 1264 - _WRITEOPERATION_OPTIONSENTRY._serialized_end = 1322 - _WRITEOPERATION_BUCKETBY._serialized_start = 1324 - _WRITEOPERATION_BUCKETBY._serialized_end = 1415 - _WRITEOPERATION_SAVEMODE._serialized_start = 1418 - _WRITEOPERATION_SAVEMODE._serialized_end = 1555 + _COMMAND._serialized_end = 382 + _CREATESCALARFUNCTION._serialized_start = 385 + _CREATESCALARFUNCTION._serialized_end = 920 + _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_start = 758 + _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_end = 897 + _CREATEDATAFRAMEVIEWCOMMAND._serialized_start = 923 + _CREATEDATAFRAMEVIEWCOMMAND._serialized_end = 1073 + _WRITEOPERATION._serialized_start = 1076 + _WRITEOPERATION._serialized_end = 1818 + _WRITEOPERATION_OPTIONSENTRY._serialized_start = 1514 + _WRITEOPERATION_OPTIONSENTRY._serialized_end = 1572 + _WRITEOPERATION_BUCKETBY._serialized_start = 1574 + _WRITEOPERATION_BUCKETBY._serialized_end = 1665 + _WRITEOPERATION_SAVEMODE._serialized_start = 1668 + _WRITEOPERATION_SAVEMODE._serialized_end = 1805 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi b/python/pyspark/sql/connect/proto/commands_pb2.pyi index 0ac8da335b5..9b9880e0b93 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.pyi +++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi @@ -60,21 +60,27 @@ class Command(google.protobuf.message.Message): CREATE_FUNCTION_FIELD_NUMBER: builtins.int WRITE_OPERATION_FIELD_NUMBER: builtins.int + CREATE_DATAFRAME_VIEW_FIELD_NUMBER: builtins.int @property def create_function(self) -> global___CreateScalarFunction: ... @property def write_operation(self) -> global___WriteOperation: ... + @property + def create_dataframe_view(self) -> global___CreateDataFrameViewCommand: ... def __init__( self, *, create_function: global___CreateScalarFunction | None = ..., write_operation: global___WriteOperation | None = ..., + create_dataframe_view: global___CreateDataFrameViewCommand | None = ..., ) -> None: ... def HasField( self, field_name: typing_extensions.Literal[ "command_type", b"command_type", + "create_dataframe_view", + b"create_dataframe_view", "create_function", b"create_function", "write_operation", @@ -86,6 +92,8 @@ class Command(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "command_type", b"command_type", + "create_dataframe_view", + b"create_dataframe_view", "create_function", b"create_function", "write_operation", @@ -94,7 +102,9 @@ class Command(google.protobuf.message.Message): ) -> None: ... def WhichOneof( self, oneof_group: typing_extensions.Literal["command_type", b"command_type"] - ) -> typing_extensions.Literal["create_function", "write_operation"] | None: ... + ) -> typing_extensions.Literal[ + "create_function", "write_operation", "create_dataframe_view" + ] | None: ... global___Command = Command @@ -210,6 +220,48 @@ class CreateScalarFunction(google.protobuf.message.Message): global___CreateScalarFunction = CreateScalarFunction +class CreateDataFrameViewCommand(google.protobuf.message.Message): + """A command that can create DataFrame global temp view or local temp view.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + INPUT_FIELD_NUMBER: builtins.int + NAME_FIELD_NUMBER: builtins.int + IS_GLOBAL_FIELD_NUMBER: builtins.int + REPLACE_FIELD_NUMBER: builtins.int + @property + def input(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: + """Required. The relation that this view will be built on.""" + name: builtins.str + """Required. View name.""" + is_global: builtins.bool + """Required. Whether this is global temp view or local temp view.""" + replace: builtins.bool + """Required. + + If true, and if the view already exists, updates it; if false, and if the view + already exists, throws exception. + """ + def __init__( + self, + *, + input: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., + name: builtins.str = ..., + is_global: builtins.bool = ..., + replace: builtins.bool = ..., + ) -> None: ... + def HasField( + self, field_name: typing_extensions.Literal["input", b"input"] + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "input", b"input", "is_global", b"is_global", "name", b"name", "replace", b"replace" + ], + ) -> None: ... + +global___CreateDataFrameViewCommand = CreateDataFrameViewCommand + class WriteOperation(google.protobuf.message.Message): """As writes are not directly handled during analysis and planning, they are modeled as commands.""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org