gh-yzou commented on code in PR #1862:
URL: https://github.com/apache/polaris/pull/1862#discussion_r2190961940
##########
plugins/spark/v3.5/integration/src/intTest/resources/logback.xml:
##########
@@ -32,6 +32,9 @@ out the configuration if you would like ot see all spark
debug log during the ru
</encoder>
</appender>
+ <!-- Hudi-specific loggers for test -->
+ <logger name="org.apache.hudi" level="INFO"/>
Review Comment:
We can if the WARN level log output doesn't significantly slow the test, but
based on my previous testing, even with WARN, there seems quite a lot of Spark
logs, that was part of the reason why we set the level to Error. If the test
fails, people can always tune the log level to check more logs if needed, so i
wouldn't worry too much about the restricted log level.
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -263,25 +279,39 @@ public String[][] listNamespaces(String[] namespace)
throws NoSuchNamespaceExcep
@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
- return this.icebergsSparkCatalog.loadNamespaceMetadata(namespace);
+ Map<String, String> metadata =
this.icebergsSparkCatalog.loadNamespaceMetadata(namespace);
+ if (PolarisCatalogUtils.isHudiExtensionEnabled()) {
+ HudiCatalogUtils.loadNamespaceMetadata(namespace, metadata);
+ }
+ return metadata;
}
@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
this.icebergsSparkCatalog.createNamespace(namespace, metadata);
+ if (PolarisCatalogUtils.isHudiExtensionEnabled()) {
+ HudiCatalogUtils.createNamespace(namespace, metadata);
+ }
}
@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
this.icebergsSparkCatalog.alterNamespace(namespace, changes);
+ if (PolarisCatalogUtils.isHudiExtensionEnabled()) {
+ HudiCatalogUtils.alterNamespace(namespace, changes);
Review Comment:
@rahil-c I tried out the current client without your change for hudi usage,
although it fails on loadTable, it doesn't really complain about namespaces, so
I strongly believe we don't need any namespace specific changes here. I will
check out the current change to verify also.
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -263,25 +279,39 @@ public String[][] listNamespaces(String[] namespace)
throws NoSuchNamespaceExcep
@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
- return this.icebergsSparkCatalog.loadNamespaceMetadata(namespace);
+ Map<String, String> metadata =
this.icebergsSparkCatalog.loadNamespaceMetadata(namespace);
+ if (PolarisCatalogUtils.isHudiExtensionEnabled()) {
+ HudiCatalogUtils.loadNamespaceMetadata(namespace, metadata);
+ }
+ return metadata;
}
@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
this.icebergsSparkCatalog.createNamespace(namespace, metadata);
+ if (PolarisCatalogUtils.isHudiExtensionEnabled()) {
+ HudiCatalogUtils.createNamespace(namespace, metadata);
+ }
}
@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
this.icebergsSparkCatalog.alterNamespace(namespace, changes);
+ if (PolarisCatalogUtils.isHudiExtensionEnabled()) {
+ HudiCatalogUtils.alterNamespace(namespace, changes);
Review Comment:
After checking out your code with testing, I believe the fact you need those
namespace operations are because you are calling the following code when load
hudi table
```
catalogTable =
sparkSession.sessionState().catalog().getTableMetadata(tableIdentifier);
```
This triggers the spark internal session catalog, which actually checks
whether db exists. However, i don't think we need to do that for loading hudi
tables. I think we should just create a V1Table, and let the HudiCatalog take
care of the final table format. To construct V1Table, you can either do that
manually with filling all possible field, or another approach is do it in a
similar way as Unity Catalog here
https://github.com/unitycatalog/unitycatalog/blob/main/connectors/spark/src/main/scala/io/unitycatalog/spark/UCSingleCatalog.scala#L283
##########
plugins/spark/v3.5/integration/build.gradle.kts:
##########
@@ -60,12 +60,51 @@ dependencies {
exclude("org.apache.logging.log4j", "log4j-core")
exclude("org.slf4j", "jul-to-slf4j")
}
+
+ // Add spark-hive for Hudi integration - provides HiveExternalCatalog that
Hudi needs
+
testImplementation("org.apache.spark:spark-hive_${scalaVersion}:${spark35Version}")
{
+ // exclude log4j dependencies to match spark-sql exclusions
Review Comment:
sorry, what i mean is does spark-hive contains spark-sql? if yes, can we
remove
```
testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}")
{
// exclude log4j dependencies. Explicit dependencies for the log4j
libraries are
// enforced below to ensure the version compatibility
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
exclude("org.apache.logging.log4j", "log4j-1.2-api")
exclude("org.apache.logging.log4j", "log4j-core")
exclude("org.slf4j", "jul-to-slf4j")
}
```
##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -87,6 +119,32 @@ public static Table loadSparkTable(GenericTable
genericTable) {
provider, new CaseInsensitiveStringMap(tableProperties),
scala.Option.empty());
}
+ public static Table loadHudiSparkTable(GenericTable genericTable, Identifier
identifier) {
+ SparkSession sparkSession = SparkSession.active();
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.putAll(genericTable.getProperties());
+ tableProperties.put(
+ TABLE_PATH_KEY,
genericTable.getProperties().get(TableCatalog.PROP_LOCATION));
+ String namespacePath = String.join(".", identifier.namespace());
+ TableIdentifier tableIdentifier =
+ new TableIdentifier(identifier.name(), Option.apply(namespacePath));
+ CatalogTable catalogTable = null;
+ try {
+ catalogTable =
sparkSession.sessionState().catalog().getTableMetadata(tableIdentifier);
+ } catch (NoSuchDatabaseException e) {
+ throw new RuntimeException(
+ "No database found for the given tableIdentifier:" +
tableIdentifier, e);
+ } catch (NoSuchTableException e) {
+ LOG.debug("No table currently exists, as an initial create table");
+ }
+ return new HoodieInternalV2Table(
Review Comment:
I take a look at how HoodieCatalog.scala works
https://github.com/apache/hudi/blob/3369b09fd8fca1db1f8e655f79ecb7a97c57367b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala#L123,
it actually works very similar as DeltaCatalog,
https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L230.
Before, the approach I have tried is to let PolarisSparkCatalog returns
V1Table object, and in SparkCatalog we just call DeltaCatalog.loadTable, and it
will return DeltaV2Table, in this way, we will not introduce any format
specific dependency to the client side. I didn't end up doing that because
Delta actually provided the DataSourceV2 support, and the loadTableUtils
automatically loads the DeltaV2Table.
Since Hudi doesn't support DataSourceV2 yet, i think you can go with my
original approach, let PolarisSparkCatalog returns V1Table on load, and let the
HudiCatalog.loadTable takes care of the final returned table format
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]