This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 19d7f66a0 [spark] Introduce View Support to SparkCatalog (#4538)
19d7f66a0 is described below
commit 19d7f66a006d02ee3ad233937fc6116a07acefbb
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Nov 19 11:02:16 2024 +0800
[spark] Introduce View Support to SparkCatalog (#4538)
---
.../extensions/RewritePaimonViewCommands.scala | 80 ++++++++++++++
.../apache/paimon/spark/sql/PaimonViewTest.scala} | 18 +---
.../extensions/RewritePaimonViewCommands.scala | 79 ++++++++++++++
.../apache/paimon/spark/sql/PaimonViewTest.scala} | 18 +---
.../apache/paimon/spark/sql/PaimonViewTest.scala} | 18 +---
.../apache/paimon/spark/sql/PaimonViewTest.scala} | 18 +---
.../apache/paimon/spark/sql/PaimonViewTest.scala} | 18 +---
.../java/org/apache/paimon/spark/SparkCatalog.java | 49 ++-------
.../org/apache/paimon/spark/SparkTypeUtils.java | 4 +
.../apache/paimon/spark/catalog/SupportView.java | 86 +++++++++++++++
.../apache/paimon/spark/utils/CatalogUtils.java} | 26 +++--
.../catalyst/analysis/PaimonViewResolver.scala | 85 +++++++++++++++
.../catalyst/plans/logical/PaimonViewCommand.scala | 74 +++++++++++++
.../paimon/spark/execution/PaimonStrategy.scala | 37 ++++++-
.../paimon/spark/execution/PaimonViewExec.scala | 117 +++++++++++++++++++++
.../extensions/PaimonSparkSessionExtensions.scala | 3 +-
.../org/apache/paimon/spark/leafnode/package.scala | 7 +-
.../PaimonSparkSqlExtensionsParser.scala | 4 +-
.../extensions/PaimonSqlExtensionsAstBuilder.scala | 13 ++-
.../extensions/RewritePaimonViewCommands.scala | 77 ++++++++++++++
.../spark/sql/DDLWithHiveCatalogTestBase.scala | 4 +-
.../paimon/spark/sql/PaimonViewTestBase.scala | 96 +++++++++++++++++
22 files changed, 794 insertions(+), 137 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
new file mode 100644
index 000000000..e759edd0c
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView,
DropPaimonView, ResolvedIdentifier, ShowPaimonViews}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{CTESubstitution,
ResolvedNamespace, UnresolvedView}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
+
+case class RewritePaimonViewCommands(spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with LookupCatalog {
+
+ protected lazy val catalogManager: CatalogManager =
spark.sessionState.catalogManager
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp
{
+
+ case CreateViewStatement(
+ ResolvedIdent(resolved),
+ userSpecifiedColumns,
+ comment,
+ properties,
+ Some(originalText),
+ child,
+ allowExisting,
+ replace,
+ _) =>
+ CreatePaimonView(
+ child = resolved,
+ queryText = originalText,
+ query = CTESubstitution.apply(child),
+ columnAliases = userSpecifiedColumns.map(_._1),
+ columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
+ comment = comment,
+ properties = properties,
+ allowExisting = allowExisting,
+ replace = replace
+ )
+
+ case DropView(ResolvedIdent(resolved), ifExists: Boolean) =>
+ DropPaimonView(resolved, ifExists)
+
+ case ShowViews(_, pattern, output) if
catalogManager.currentCatalog.isInstanceOf[SupportView] =>
+ ShowPaimonViews(
+ ResolvedNamespace(catalogManager.currentCatalog,
catalogManager.currentNamespace),
+ pattern,
+ output)
+ }
+
+ private object ResolvedIdent {
+ def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved
match {
+ case CatalogAndIdentifier(viewCatalog: SupportView, ident) =>
+ Some(ResolvedIdentifier(viewCatalog, ident))
+ case UnresolvedView(CatalogAndIdentifier(viewCatalog: SupportView,
ident), _, _, _) =>
+ Some(ResolvedIdentifier(viewCatalog, ident))
+ case _ =>
+ None
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
similarity index 60%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to
paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
index 5befb88da..6ab8a2671 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -16,20 +16,6 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand,
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
-
-package object leafnode {
-
- trait PaimonLeafParsedStatement extends LeafParsedStatement
-
- trait PaimonLeafRunnableCommand extends LeafRunnableCommand
-
- trait PaimonLeafCommand extends LeafCommand
-
- trait PaimonLeafV2CommandExec extends LeafV2CommandExec
-
-}
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
new file mode 100644
index 000000000..5d57cda2f
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView,
DropPaimonView, ResolvedIdentifier, ShowPaimonViews}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{CTESubstitution,
ResolvedNamespace, UnresolvedDBObjectName, UnresolvedView}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
+
+case class RewritePaimonViewCommands(spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with LookupCatalog {
+
+ protected lazy val catalogManager: CatalogManager =
spark.sessionState.catalogManager
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp
{
+
+ case CreateView(
+ ResolvedIdent(resolved),
+ userSpecifiedColumns,
+ comment,
+ properties,
+ Some(queryText),
+ query,
+ allowExisting,
+ replace) =>
+ CreatePaimonView(
+ child = resolved,
+ queryText = queryText,
+ query = CTESubstitution.apply(query),
+ columnAliases = userSpecifiedColumns.map(_._1),
+ columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
+ comment = comment,
+ properties = properties,
+ allowExisting = allowExisting,
+ replace = replace
+ )
+
+ case DropView(ResolvedIdent(resolved), ifExists: Boolean) =>
+ DropPaimonView(resolved, ifExists)
+
+ case ShowViews(_, pattern, output) if
catalogManager.currentCatalog.isInstanceOf[SupportView] =>
+ ShowPaimonViews(
+ ResolvedNamespace(catalogManager.currentCatalog,
catalogManager.currentNamespace),
+ pattern,
+ output)
+ }
+
+ private object ResolvedIdent {
+ def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved
match {
+ case UnresolvedDBObjectName(CatalogAndIdentifier(viewCatalog:
SupportView, ident), _) =>
+ Some(ResolvedIdentifier(viewCatalog, ident))
+ case UnresolvedView(CatalogAndIdentifier(viewCatalog: SupportView,
ident), _, _, _) =>
+ Some(ResolvedIdentifier(viewCatalog, ident))
+ case _ =>
+ None
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
similarity index 60%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to
paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
index 5befb88da..6ab8a2671 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -16,20 +16,6 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand,
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
-
-package object leafnode {
-
- trait PaimonLeafParsedStatement extends LeafParsedStatement
-
- trait PaimonLeafRunnableCommand extends LeafRunnableCommand
-
- trait PaimonLeafCommand extends LeafCommand
-
- trait PaimonLeafV2CommandExec extends LeafV2CommandExec
-
-}
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
similarity index 60%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to
paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
index 5befb88da..6ab8a2671 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -16,20 +16,6 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand,
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
-
-package object leafnode {
-
- trait PaimonLeafParsedStatement extends LeafParsedStatement
-
- trait PaimonLeafRunnableCommand extends LeafRunnableCommand
-
- trait PaimonLeafCommand extends LeafCommand
-
- trait PaimonLeafV2CommandExec extends LeafV2CommandExec
-
-}
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
similarity index 60%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
index 5befb88da..6ab8a2671 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -16,20 +16,6 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand,
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
-
-package object leafnode {
-
- trait PaimonLeafParsedStatement extends LeafParsedStatement
-
- trait PaimonLeafRunnableCommand extends LeafRunnableCommand
-
- trait PaimonLeafCommand extends LeafCommand
-
- trait PaimonLeafV2CommandExec extends LeafV2CommandExec
-
-}
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
similarity index 60%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to
paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
index 5befb88da..6ab8a2671 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTest.scala
@@ -16,20 +16,6 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand,
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
-
-package object leafnode {
-
- trait PaimonLeafParsedStatement extends LeafParsedStatement
-
- trait PaimonLeafRunnableCommand extends LeafRunnableCommand
-
- trait PaimonLeafCommand extends LeafCommand
-
- trait PaimonLeafV2CommandExec extends LeafV2CommandExec
-
-}
+class PaimonViewTest extends PaimonViewTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 89448c1f4..3b9af1694 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -27,6 +27,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.spark.catalog.SparkBaseCatalog;
import org.apache.paimon.spark.catalog.SupportFunction;
+import org.apache.paimon.spark.catalog.SupportView;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.FormatTableOptions;
@@ -72,10 +73,12 @@ import static
org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
+import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
+import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Spark {@link TableCatalog} for paimon. */
-public class SparkCatalog extends SparkBaseCatalog implements SupportFunction {
+public class SparkCatalog extends SparkBaseCatalog implements SupportFunction,
SupportView {
private static final Logger LOG =
LoggerFactory.getLogger(SparkCatalog.class);
@@ -126,10 +129,7 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
@Override
public void createNamespace(String[] namespace, Map<String, String>
metadata)
throws NamespaceAlreadyExistsException {
- checkArgument(
- isValidateNamespace(namespace),
- "Namespace %s is not valid",
- Arrays.toString(namespace));
+ checkNamespace(namespace);
try {
catalog.createDatabase(namespace[0], false, metadata);
} catch (Catalog.DatabaseAlreadyExistException e) {
@@ -152,9 +152,7 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
if (namespace.length == 0) {
return listNamespaces();
}
- if (!isValidateNamespace(namespace)) {
- throw new NoSuchNamespaceException(namespace);
- }
+ checkNamespace(namespace);
try {
catalog.getDatabase(namespace[0]);
return new String[0][];
@@ -166,10 +164,7 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
- checkArgument(
- isValidateNamespace(namespace),
- "Namespace %s is not valid",
- Arrays.toString(namespace));
+ checkNamespace(namespace);
String dataBaseName = namespace[0];
try {
return catalog.getDatabase(dataBaseName).options();
@@ -207,10 +202,7 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
*/
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException {
- checkArgument(
- isValidateNamespace(namespace),
- "Namespace %s is not valid",
- Arrays.toString(namespace));
+ checkNamespace(namespace);
try {
catalog.dropDatabase(namespace[0], false, cascade);
return true;
@@ -224,10 +216,7 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
@Override
public Identifier[] listTables(String[] namespace) throws
NoSuchNamespaceException {
- checkArgument(
- isValidateNamespace(namespace),
- "Missing database in namespace: %s",
- Arrays.toString(namespace));
+ checkNamespace(namespace);
try {
return catalog.listTables(namespace[0]).stream()
.map(table -> Identifier.of(namespace, table))
@@ -239,10 +228,7 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
@Override
public void invalidateTable(Identifier ident) {
- try {
- catalog.invalidateTable(toIdentifier(ident));
- } catch (NoSuchTableException ignored) {
- }
+ catalog.invalidateTable(toIdentifier(ident));
}
@Override
@@ -347,7 +333,7 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
try {
catalog.dropTable(toIdentifier(ident), false);
return true;
- } catch (Catalog.TableNotExistException | NoSuchTableException e) {
+ } catch (Catalog.TableNotExistException e) {
return false;
}
}
@@ -454,10 +440,6 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
}
}
- private boolean isValidateNamespace(String[] namespace) {
- return namespace.length == 1;
- }
-
@Override
public void renameTable(Identifier oldIdent, Identifier newIdent)
throws NoSuchTableException, TableAlreadyExistsException {
@@ -472,15 +454,6 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
// --------------------- tools ------------------------------------------
- protected org.apache.paimon.catalog.Identifier toIdentifier(Identifier
ident)
- throws NoSuchTableException {
- if (!isValidateNamespace(ident.namespace())) {
- throw new NoSuchTableException(ident);
- }
-
- return new org.apache.paimon.catalog.Identifier(ident.namespace()[0],
ident.name());
- }
-
protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
Identifier ident, Map<String, String> extraOptions) throws
NoSuchTableException {
try {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
index 8bba67620..f6643f758 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
@@ -81,6 +81,10 @@ public class SparkTypeUtils {
return type.accept(PaimonToSparkTypeVisitor.INSTANCE);
}
+ public static org.apache.paimon.types.RowType toPaimonRowType(StructType
type) {
+ return (RowType) toPaimonType(type);
+ }
+
public static org.apache.paimon.types.DataType toPaimonType(DataType
dataType) {
return SparkToPaimonTypeVisitor.visit(dataType);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
new file mode 100644
index 000000000..b8ce86e89
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportView.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalog;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.spark.SparkTypeUtils;
+import org.apache.paimon.view.View;
+import org.apache.paimon.view.ViewImpl;
+
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
+import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
+
+/** Catalog methods for working with Views. */
+public interface SupportView extends WithPaimonCatalog {
+
+ default List<String> listViews(String[] namespace) throws
NoSuchNamespaceException {
+ try {
+ checkNamespace(namespace);
+ return paimonCatalog().listViews(namespace[0]);
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new NoSuchNamespaceException(namespace);
+ }
+ }
+
+ default View loadView(Identifier ident) throws
Catalog.ViewNotExistException {
+ return paimonCatalog().getView(toIdentifier(ident));
+ }
+
+ default void createView(
+ Identifier ident,
+ StructType schema,
+ String queryText,
+ String comment,
+ Map<String, String> properties,
+ Boolean ignoreIfExists)
+ throws NoSuchNamespaceException {
+ org.apache.paimon.catalog.Identifier paimonIdent = toIdentifier(ident);
+ try {
+ paimonCatalog()
+ .createView(
+ paimonIdent,
+ new ViewImpl(
+ paimonIdent,
+ SparkTypeUtils.toPaimonRowType(schema),
+ queryText,
+ comment,
+ properties),
+ ignoreIfExists);
+ } catch (Catalog.ViewAlreadyExistException e) {
+ throw new RuntimeException("view already exists: " + ident, e);
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new NoSuchNamespaceException(ident.namespace());
+ }
+ }
+
+ default void dropView(Identifier ident, Boolean ignoreIfExists) {
+ try {
+ paimonCatalog().dropView(toIdentifier(ident), ignoreIfExists);
+ } catch (Catalog.ViewNotExistException e) {
+ throw new RuntimeException("view not exists: " + ident, e);
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
similarity index 53%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
copy to
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
index 5befb88da..fca9df210 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
@@ -16,20 +16,26 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.utils;
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand,
LeafParsedStatement}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+import org.apache.spark.sql.connector.catalog.Identifier;
-package object leafnode {
+import java.util.Arrays;
- trait PaimonLeafParsedStatement extends LeafParsedStatement
+import static org.apache.paimon.utils.Preconditions.checkArgument;
- trait PaimonLeafRunnableCommand extends LeafRunnableCommand
+/** Utils of catalog. */
+public class CatalogUtils {
- trait PaimonLeafCommand extends LeafCommand
-
- trait PaimonLeafV2CommandExec extends LeafV2CommandExec
+ public static void checkNamespace(String[] namespace) {
+ checkArgument(
+ namespace.length == 1,
+ "Paimon only support single namespace, but got %s",
+ Arrays.toString(namespace));
+ }
+ public static org.apache.paimon.catalog.Identifier toIdentifier(Identifier
ident) {
+ checkNamespace(ident.namespace());
+ return new org.apache.paimon.catalog.Identifier(ident.namespace()[0],
ident.name());
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
new file mode 100644
index 000000000..a375a2965
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.catalog.Catalog.ViewNotExistException
+import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.view.View
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal,
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, UpCast}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.parser.extensions.{CurrentOrigin, Origin}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project,
SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.PaimonLookupCatalog
+
+case class PaimonViewResolver(spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with PaimonLookupCatalog {
+
+ protected lazy val catalogManager = spark.sessionState.catalogManager
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+ case u @ UnresolvedRelation(parts @ CatalogAndIdentifier(catalog:
SupportView, ident), _, _) =>
+ try {
+ val view = catalog.loadView(ident)
+ createViewRelation(parts, view)
+ } catch {
+ case _: ViewNotExistException =>
+ u
+ }
+ }
+
+ private def createViewRelation(nameParts: Seq[String], view: View):
LogicalPlan = {
+ val parsedPlan = parseViewText(nameParts.toArray.mkString("."), view.query)
+
+ val aliases =
SparkTypeUtils.fromPaimonRowType(view.rowType()).fields.zipWithIndex.map {
+ case (expected, pos) =>
+ val attr = GetColumnByOrdinal(pos, expected.dataType)
+ Alias(UpCast(attr, expected.dataType), expected.name)(explicitMetadata
=
+ Some(expected.metadata))
+ }
+
+ SubqueryAlias(nameParts, Project(aliases, parsedPlan))
+ }
+
+ private def parseViewText(name: String, viewText: String): LogicalPlan = {
+ val origin = Origin(
+ objectType = Some("VIEW"),
+ objectName = Some(name)
+ )
+ try {
+ CurrentOrigin.withOrigin(origin) {
+ try {
+ spark.sessionState.sqlParser.parseQuery(viewText)
+ } catch {
+ // For compatibility with Spark 3.2 and below
+ case _: NoSuchMethodError =>
+ spark.sessionState.sqlParser.parsePlan(viewText)
+ }
+ }
+ } catch {
+ case _: ParseException =>
+ throw new RuntimeException("Failed to parse view text: " + viewText)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonViewCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonViewCommand.scala
new file mode 100644
index 000000000..24b27bb0e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonViewCommand.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+import org.apache.paimon.spark.leafnode.{PaimonBinaryCommand,
PaimonUnaryCommand}
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan,
ShowViews, Statistics}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier}
+
+case class CreatePaimonView(
+ child: LogicalPlan,
+ queryText: String,
+ query: LogicalPlan,
+ columnAliases: Seq[String],
+ columnComments: Seq[Option[String]],
+ queryColumnNames: Seq[String] = Seq.empty,
+ comment: Option[String],
+ properties: Map[String, String],
+ allowExisting: Boolean,
+ replace: Boolean)
+ extends PaimonBinaryCommand {
+
+ override def left: LogicalPlan = child
+
+ override def right: LogicalPlan = query
+
+ override protected def withNewChildrenInternal(
+ newLeft: LogicalPlan,
+ newRight: LogicalPlan): LogicalPlan =
+ copy(child = newLeft, query = newRight)
+}
+
+case class DropPaimonView(child: LogicalPlan, ifExists: Boolean) extends
PaimonUnaryCommand {
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
DropPaimonView =
+ copy(child = newChild)
+}
+
+case class ShowPaimonViews(
+ namespace: LogicalPlan,
+ pattern: Option[String],
+ override val output: Seq[Attribute] = ShowViews.getOutputAttrs)
+ extends PaimonUnaryCommand {
+
+ override def child: LogicalPlan = namespace
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
ShowPaimonViews =
+ copy(namespace = newChild)
+}
+
+/** Copy from spark 3.4+ */
+case class ResolvedIdentifier(catalog: CatalogPlugin, identifier: Identifier)
extends LeafNode {
+
+ override def output: Seq[Attribute] = Nil
+
+ override def stats: Statistics = Statistics.DUMMY
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 0c3d3e6b6..0c3865f7d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -19,10 +19,12 @@
package org.apache.paimon.spark.execution
import org.apache.paimon.spark.{SparkCatalog, SparkUtils}
-import
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand,
DeleteTagCommand, PaimonCallCommand, RenameTagCommand, ShowTagsCommand}
+import org.apache.paimon.spark.catalog.SupportView
+import
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand,
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand,
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand}
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace
import org.apache.spark.sql.catalyst.expressions.{Expression,
GenericInternalRow, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
LogicalPlan}
import org.apache.spark.sql.connector.catalog.{Identifier,
PaimonLookupCatalog, TableCatalog}
@@ -65,6 +67,39 @@ case class PaimonStrategy(spark: SparkSession)
case RenameTagCommand(PaimonCatalogAndIdentifier(catalog, ident),
sourceTag, targetTag) =>
RenameTagExec(catalog, ident, sourceTag, targetTag) :: Nil
+ case CreatePaimonView(
+ ResolvedIdentifier(viewCatalog: SupportView, ident),
+ queryText,
+ query,
+ columnAliases,
+ columnComments,
+ queryColumnNames,
+ comment,
+ properties,
+ allowExisting,
+ replace) =>
+ CreatePaimonViewExec(
+ viewCatalog,
+ ident,
+ queryText,
+ query.schema,
+ columnAliases,
+ columnComments,
+ queryColumnNames,
+ comment,
+ properties,
+ allowExisting,
+ replace) :: Nil
+
+ case DropPaimonView(ResolvedIdentifier(viewCatalog: SupportView, ident),
ifExists) =>
+ DropPaimonViewExec(viewCatalog, ident, ifExists) :: Nil
+
+ // A new member was added to ResolvedNamespace since spark4.0,
+ // unapply pattern matching is not used here to ensure compatibility
across multiple spark versions.
+ case ShowPaimonViews(r: ResolvedNamespace, pattern, output)
+ if r.catalog.isInstanceOf[SupportView] =>
+ ShowPaimonViewsExec(output, r.catalog.asInstanceOf[SupportView],
r.namespace, pattern) :: Nil
+
case _ => Nil
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
new file mode 100644
index 000000000..7a4b907c7
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonViewExec.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
GenericInternalRow}
+import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class CreatePaimonViewExec(
+ catalog: SupportView,
+ ident: Identifier,
+ queryText: String,
+ viewSchema: StructType,
+ columnAliases: Seq[String],
+ columnComments: Seq[Option[String]],
+ queryColumnNames: Seq[String],
+ comment: Option[String],
+ properties: Map[String, String],
+ allowExisting: Boolean,
+ replace: Boolean
+) extends PaimonLeafV2CommandExec {
+
+ override def output: Seq[Attribute] = Nil
+
+ override protected def run(): Seq[InternalRow] = {
+ if (columnAliases.nonEmpty || columnComments.nonEmpty ||
queryColumnNames.nonEmpty) {
+ throw new UnsupportedOperationException(
+ "columnAliases, columnComments and queryColumnNames are not supported
now")
+ }
+
+ // Note: for replace just drop then create ,this operation is non-atomic.
+ if (replace) {
+ catalog.dropView(ident, true)
+ }
+
+ catalog.createView(
+ ident,
+ viewSchema,
+ queryText,
+ comment.orNull,
+ properties.asJava,
+ allowExisting)
+
+ Nil
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"CreatePaimonViewExec: $ident"
+ }
+}
+
+case class DropPaimonViewExec(catalog: SupportView, ident: Identifier,
ifExists: Boolean)
+ extends PaimonLeafV2CommandExec {
+
+ override lazy val output: Seq[Attribute] = Nil
+
+ override protected def run(): Seq[InternalRow] = {
+ catalog.dropView(ident, ifExists)
+ Nil
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"DropPaimonViewExec: $ident"
+ }
+}
+
+case class ShowPaimonViewsExec(
+ output: Seq[Attribute],
+ catalog: SupportView,
+ namespace: Seq[String],
+ pattern: Option[String])
+ extends PaimonLeafV2CommandExec {
+
+ override protected def run(): Seq[InternalRow] = {
+ val rows = new ArrayBuffer[InternalRow]()
+ catalog.listViews(namespace.toArray).asScala.map {
+ viewName =>
+ if (pattern.forall(StringUtils.filterPattern(Seq(viewName),
_).nonEmpty)) {
+ rows += new GenericInternalRow(
+ Array(
+ UTF8String.fromString(namespace.mkString(".")),
+ UTF8String.fromString(viewName),
+ false))
+ }
+ }
+ rows.toSeq
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"ShowPaimonViewsExec: $namespace"
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 4fe217ee0..6f47a77ef 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.extensions
-import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis,
PaimonDeleteTable, PaimonIncompatiblePHRRules,
PaimonIncompatibleResolutionRules, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable}
+import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis,
PaimonDeleteTable, PaimonIncompatiblePHRRules,
PaimonIncompatibleResolutionRules, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable,
PaimonViewResolver}
import
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable,
MergePaimonScalarSubqueries}
import
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.execution.PaimonStrategy
@@ -37,6 +37,7 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
// analyzer extensions
extensions.injectResolutionRule(spark => new PaimonAnalysis(spark))
extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark))
+ extensions.injectResolutionRule(spark => PaimonViewResolver(spark))
extensions.injectResolutionRule(spark =>
PaimonIncompatibleResolutionRules(spark))
extensions.injectPostHocResolutionRule(spark =>
PaimonPostHocResolutionRules(spark))
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
index 5befb88da..6ebab0384 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/leafnode/package.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark
-import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand,
LeafParsedStatement}
+import org.apache.spark.sql.catalyst.plans.logical.{BinaryCommand,
LeafCommand, LeafParsedStatement, UnaryCommand}
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
@@ -30,6 +30,9 @@ package object leafnode {
trait PaimonLeafCommand extends LeafCommand
- trait PaimonLeafV2CommandExec extends LeafV2CommandExec
+ trait PaimonUnaryCommand extends UnaryCommand
+
+ trait PaimonBinaryCommand extends BinaryCommand
+ trait PaimonLeafV2CommandExec extends LeafV2CommandExec
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
index dd0a48159..9ece18693 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
@@ -23,7 +23,7 @@ import org.antlr.v4.runtime.atn.PredictionMode
import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
import org.antlr.v4.runtime.tree.TerminalNodeImpl
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
@@ -61,7 +61,7 @@ class PaimonSparkSqlExtensionsParser(val delegate:
ParserInterface)
parse(sqlTextAfterSubstitution)(parser =>
astBuilder.visit(parser.singleStatement()))
.asInstanceOf[LogicalPlan]
} else {
- delegate.parsePlan(sqlText)
+
RewritePaimonViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText))
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index b864894e7..a1289a5f0 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.catalyst.parser.extensions
import org.apache.paimon.spark.catalyst.plans.logical
-import
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand,
DeleteTagCommand, PaimonCallArgument, PaimonCallStatement, PaimonNamedArgument,
PaimonPositionalArgument, RenameTagCommand, ShowTagsCommand, TagOptions}
+import org.apache.paimon.spark.catalyst.plans.logical._
import org.apache.paimon.utils.TimeUtils
import org.antlr.v4.runtime._
@@ -212,5 +212,16 @@ object CurrentOrigin {
def get: Origin = value.get()
def set(o: Origin): Unit = value.set(o)
def reset(): Unit = value.set(Origin())
+
+ def withOrigin[A](o: Origin)(f: => A): A = {
+ // remember the previous one so it can be reset to this
+ // way withOrigin can be recursive
+ val previous = get
+ set(o)
+ val ret =
+ try f
+ finally { set(previous) }
+ ret
+ }
}
/* Apache Spark copy end */
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
new file mode 100644
index 000000000..f69e5d920
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonViewCommands.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView,
DropPaimonView, ResolvedIdentifier, ShowPaimonViews}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{CTESubstitution,
ResolvedNamespace, UnresolvedIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
+
+case class RewritePaimonViewCommands(spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with LookupCatalog {
+
+ protected lazy val catalogManager: CatalogManager =
spark.sessionState.catalogManager
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp
{
+
+ // A new member was added to CreatePaimonView since spark4.0,
+ // unapply pattern matching is not used here to ensure compatibility
across multiple spark versions.
+ case c: CreateView =>
+ ResolvedIdent
+ .unapply(c.child)
+ .map {
+ resolved =>
+ CreatePaimonView(
+ child = resolved,
+ queryText = c.originalText.get,
+ query = CTESubstitution.apply(c.query),
+ columnAliases = c.userSpecifiedColumns.map(_._1),
+ columnComments = c.userSpecifiedColumns.map(_._2.orElse(None)),
+ comment = c.comment,
+ properties = c.properties,
+ allowExisting = c.allowExisting,
+ replace = c.replace
+ )
+ }
+ .getOrElse(c)
+
+ case DropView(ResolvedIdent(resolved), ifExists: Boolean) =>
+ DropPaimonView(resolved, ifExists)
+
+ case ShowViews(_, pattern, output) if
catalogManager.currentCatalog.isInstanceOf[SupportView] =>
+ ShowPaimonViews(
+ ResolvedNamespace(catalogManager.currentCatalog,
catalogManager.currentNamespace),
+ pattern,
+ output)
+ }
+
+ private object ResolvedIdent {
+ def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved
match {
+ case UnresolvedIdentifier(CatalogAndIdentifier(viewCatalog: SupportView,
ident), _) =>
+ Some(ResolvedIdentifier(viewCatalog, ident))
+ case _ =>
+ None
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index 33b993160..56922ae2a 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -241,12 +241,12 @@ abstract class DDLWithHiveCatalogTestBase extends
PaimonHiveTestBase {
spark.sql(s"CREATE DATABASE paimon_db")
spark.sql(s"USE paimon_db")
spark.sql(s"CREATE TABLE paimon_tbl (id int, name string, dt string)
using paimon")
- // Currently, only spark_catalog supports create other table or view
+ // Only spark_catalog supports create other table
if (catalogName.equals(sparkCatalogName)) {
spark.sql(s"CREATE TABLE parquet_tbl (id int, name string, dt
string) using parquet")
spark.sql(s"CREATE VIEW parquet_tbl_view AS SELECT * FROM
parquet_tbl")
- spark.sql(s"CREATE VIEW paimon_tbl_view AS SELECT * FROM
paimon_tbl")
}
+ spark.sql(s"CREATE VIEW paimon_tbl_view AS SELECT * FROM paimon_tbl")
spark.sql(s"USE default")
spark.sql(s"DROP DATABASE paimon_db CASCADE")
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala
new file mode 100644
index 000000000..39ed8e8a7
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonViewTestBase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonHiveTestBase
+
+import org.apache.spark.sql.Row
+
+abstract class PaimonViewTestBase extends PaimonHiveTestBase {
+
+ test("Paimon View: create and drop view") {
+ Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
+ catalogName =>
+ {
+ sql(s"USE $catalogName")
+ withDatabase("test_db") {
+ sql("CREATE DATABASE test_db")
+ sql("USE test_db")
+ withTable("t") {
+ withView("v1") {
+ sql("CREATE TABLE t (id INT) USING paimon")
+ sql("INSERT INTO t VALUES (1), (2)")
+
+ sql("CREATE VIEW v1 AS SELECT * FROM t")
+ checkAnswer(sql("SHOW VIEWS"), Seq(Row("test_db", "v1",
false)))
+ checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1), Row(2)))
+ checkAnswer(
+ sql("SELECT * FROM v1 WHERE id >= (SELECT max(id) FROM v1)"),
+ Seq(Row(2)))
+
+ // test drop view
+ sql("DROP VIEW IF EXISTS v1")
+ checkAnswer(sql("SHOW VIEWS"), Seq())
+ sql("CREATE VIEW v1 AS SELECT * FROM t WHERE id > 1")
+ checkAnswer(sql("SHOW VIEWS"), Seq(Row("test_db", "v1",
false)))
+ checkAnswer(sql("SELECT * FROM v1"), Seq(Row(2)))
+
+ // test create or replace view
+ intercept[Exception] {
+ sql("CREATE VIEW v1 AS SELECT * FROM t WHERE id < 2")
+ }
+ sql("CREATE OR REPLACE VIEW v1 AS SELECT * FROM t WHERE id <
2")
+ checkAnswer(sql("SELECT * FROM v1"), Seq(Row(1)))
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("Paimon View: show views") {
+ Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
+ catalogName =>
+ {
+ sql(s"USE $catalogName")
+ withDatabase("test_db") {
+ sql("CREATE DATABASE test_db")
+ sql("USE test_db")
+ withTable("t") {
+ withView("va", "vab", "vc") {
+ sql("CREATE TABLE t (id INT) USING paimon")
+ sql("CREATE VIEW va AS SELECT * FROM t")
+ sql("CREATE VIEW vab AS SELECT * FROM t")
+ sql("CREATE VIEW vc AS SELECT * FROM t")
+ checkAnswer(
+ sql("SHOW VIEWS"),
+ Seq(
+ Row("test_db", "va", false),
+ Row("test_db", "vab", false),
+ Row("test_db", "vc", false)))
+ checkAnswer(
+ sql("SHOW VIEWS LIKE 'va*'"),
+ Seq(Row("test_db", "va", false), Row("test_db", "vab",
false)))
+ }
+ }
+ }
+ }
+ }
+ }
+}