This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7d6654c1d0ac00d01f322d4c9cd80fdfc5a03828 Author: ForwardXu <forwardxu...@gmail.com> AuthorDate: Wed Nov 9 10:41:03 2022 +0800 [HUDI-5178] Add Call show_table_properties for spark sql (#7161) (cherry picked from commit 1d1181a4410154ff0615f374cfee97630b425e88) --- .../hudi/command/procedures/BaseProcedure.scala | 4 +- .../hudi/command/procedures/HoodieProcedures.scala | 1 + .../procedures/ShowTablePropertiesProcedure.scala | 71 ++++++++++++++++++++++ .../TestShowTablePropertiesProcedure.scala | 45 ++++++++++++++ 4 files changed, 119 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala index d0404664f4..67930cb3ed 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala @@ -22,7 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.model.HoodieRecordPayload import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} -import org.apache.hudi.exception.HoodieClusteringException +import org.apache.hudi.exception.HoodieException import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.SparkSession @@ -111,7 +111,7 @@ abstract class BaseProcedure extends Procedure { t => HoodieCLIUtils.getHoodieCatalogTable(sparkSession, t.asInstanceOf[String]).tableLocation) .getOrElse( tablePath.map(p => p.asInstanceOf[String]).getOrElse( - throw new HoodieClusteringException("Table name or table path must be given one")) + throw new HoodieException("Table name or table path must be given one")) ) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index fabfda9367..d6131353c5 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -83,6 +83,7 @@ object HoodieProcedures { ,(BackupInvalidParquetProcedure.NAME, BackupInvalidParquetProcedure.builder) ,(CopyToTempView.NAME, CopyToTempView.builder) ,(ShowCommitExtraMetadataProcedure.NAME, ShowCommitExtraMetadataProcedure.builder) + ,(ShowTablePropertiesProcedure.NAME, ShowTablePropertiesProcedure.builder) ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala new file mode 100644 index 0000000000..d75df07fc9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTablePropertiesProcedure.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.Supplier +import scala.collection.JavaConversions._ + +class ShowTablePropertiesProcedure() extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "path", DataTypes.StringType, None), + ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("key", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("value", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int] + + val basePath: String = getBasePath(tableName, tablePath) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val tableProps = metaClient.getTableConfig.getProps + + val rows = new util.ArrayList[Row] + tableProps.foreach(p => rows.add(Row(p._1, p._2))) + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ShowTablePropertiesProcedure() + +} + +object ShowTablePropertiesProcedure { + val NAME = "show_table_properties" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowTablePropertiesProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTablePropertiesProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTablePropertiesProcedure.scala new file mode 100644 index 0000000000..0488920458 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTablePropertiesProcedure.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +class TestShowTablePropertiesProcedure extends HoodieSparkProcedureTestBase { + test("Test Call show_table_properties Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + val result = spark.sql(s"""call show_table_properties(path => '$tablePath')""").collect() + assertResult(true) {result.length > 0} + } + } +}