cloud-fan commented on code in PR #50875:
URL: https://github.com/apache/spark/pull/50875#discussion_r2095768058
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -1215,4 +1215,129 @@ class SparkSqlAstBuilder extends AstBuilder {
withIdentClause(ctx.identifierReference(), procIdentifier =>
DescribeProcedureCommand(UnresolvedIdentifier(procIdentifier)))
}
+
+ override def visitCreatePipelineInsertIntoFlow(
+ ctx: CreatePipelineInsertIntoFlowContext): LogicalPlan = withOrigin(ctx)
{
+ val createPipelineFlowHeaderCtx = ctx.createPipelineFlowHeader()
+ val ident =
UnresolvedIdentifier(visitMultipartIdentifier(createPipelineFlowHeaderCtx.flowName))
+ val commentOpt =
Option(createPipelineFlowHeaderCtx.commentSpec()).map(visitCommentSpec)
+ val flowOperation = withInsertInto(ctx.insertInto(),
visitQuery(ctx.query()))
+ CreateFlowCommand(
+ name = ident,
+ flowOperation = flowOperation,
+ comment = commentOpt
+ )
+ }
+
+ override def visitCreatePipelineDataset(
+ ctx: CreatePipelineDatasetContext): LogicalPlan = withOrigin(ctx) {
+ val createPipelineDatasetHeaderCtx = ctx.createPipelineDatasetHeader()
+
+ val syntaxTypeErrorStr = if
(createPipelineDatasetHeaderCtx.materializedView() != null) {
+ "MATERIALIZED VIEW"
+ } else if (createPipelineDatasetHeaderCtx.streamingTable() != null) {
+ "STREAMING TABLE"
+ } else {
+ // Should never be possible based on grammar definition.
+ throw invalidStatement(ctx.getText, ctx)
+ }
+
+ val ifNotExists = createPipelineDatasetHeaderCtx.EXISTS() != null
+ val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
+ val (colDefs, colConstraints) =
Option(ctx.tableElementList()).map(visitTableElementList)
+ .getOrElse((Nil, Nil))
+
+ if (colConstraints.nonEmpty) {
+ throw operationNotAllowed("Pipeline datasets do not currently support
column constraints. " +
+ "Please remove and CHECK, UNIQUE, PK, and FK constraints specified on
the pipeline " +
+ "dataset.", ctx)
+ }
+
+ val (partTransforms, partCols, bucketSpec,
+ properties, options, location, comment, collation, serdeInfoOpt,
+ clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())
+
+ val partitioning =
+ partitionExpressions(partTransforms, partCols, ctx) ++
+ clusterBySpec.map(_.asTransform)
+
+ // Because the createTableClauses grammar is reused for
createPipelineDataset but pipeline
+ // datasets don't support bucketing, options, storage location, or Hive
SerDe, validate they
+ // are not set.
+ if (bucketSpec.isDefined) {
+ throw operationNotAllowed(s"Bucketing is not supported for CREATE
$syntaxTypeErrorStr " +
+ "statements. Please remove any bucket spec specified in the
statement.", ctx)
+ }
+ if (options.options.nonEmpty) {
+ throw operationNotAllowed(s"Options are not supported for CREATE
$syntaxTypeErrorStr " +
+ "statements. Please remove any OPTIONS lists specified in the
statement.", ctx)
+ }
+ serdeInfoOpt.map(serdeInfo => if (serdeInfo.storedAs.nonEmpty) {
+ throw operationNotAllowed(s"The STORED AS syntax is not supported for
CREATE " +
+ s"$syntaxTypeErrorStr statements. Consider using the Data Source based
USING clause "
+ + "instead.", ctx)
+ } else {
+ throw operationNotAllowed(s"Hive SerDe format options are not supported
for CREATE " +
+ s"$syntaxTypeErrorStr statements.", ctx)
+ })
+ if (location.nonEmpty) {
+ throw operationNotAllowed(s"Specifying location is not supported for
CREATE " +
+ s"$syntaxTypeErrorStr statements. The storage location for a pipeline
dataset is " +
+ "managed by the pipeline itself.", ctx)
+ }
+
+ val spec = TableSpec(
+ properties = properties,
+ provider = provider,
+ options = Map.empty,
+ location = location,
+ comment = comment,
+ collation = collation,
+ serde = None,
+ external = false,
+ constraints = Seq.empty
+ )
+
+ withIdentClause(createPipelineDatasetHeaderCtx.identifierReference, ident
=> {
Review Comment:
Given the `CreatePipelineDataset#name` field is a `LogicalPlan`, we can just
do
```
CreateMaterializedViewAsSelect
name = withIdentClause(...),
...)
```
instead of
```
withIdentClause... {
CreateMaterializedViewAsSelect(...)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]