rahil-c commented on code in PR #1862:
URL: https://github.com/apache/polaris/pull/1862#discussion_r2232176108


##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -91,6 +109,85 @@ public static Table loadSparkTable(GenericTable 
genericTable) {
         provider, new CaseInsensitiveStringMap(tableProperties), 
scala.Option.empty());
   }
 
+  /**
+   * Extract catalog name from Spark session configuration. Looks for 
configuration like:
+   * spark.sql.catalog.<CATALOG_NAME>=org.apache.polaris.spark.SparkCatalog
+   */
+  private static String getCatalogName() {
+    SparkSession spark = SparkSession.active();
+    String catalogPrefix = "spark.sql.catalog.";
+    String polarisSparkCatalog = "org.apache.polaris.spark.SparkCatalog";
+
+    scala.collection.Iterator<scala.Tuple2<String, String>> configIterator =
+        spark.conf().getAll().iterator();
+    while (configIterator.hasNext()) {
+      scala.Tuple2<String, String> config = configIterator.next();
+      String key = config._1();
+      String value = config._2();
+
+      if (key.startsWith(catalogPrefix) && polarisSparkCatalog.equals(value)) {
+        return key.substring(catalogPrefix.length());
+      }
+    }
+
+    throw new IllegalStateException(
+        "Could not obtain Polaris catalog identifier."
+            + "Expected following configuration to be set in session: 
spark.sql.catalog.<CATALOG_NAME>=org.apache.polaris.spark.SparkCatalog");
+  }
+
+  public static Table loadV1SparkHudiTable(GenericTable genericTable, 
Identifier identifier) {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.putAll(genericTable.getProperties());
+    tableProperties.put(
+        TABLE_PATH_KEY, 
genericTable.getProperties().get(TableCatalog.PROP_LOCATION));
+
+    // Need full identifier in order to construct CatalogTable correctly for 
Hudi
+    String namespacePath = String.join(".", identifier.namespace());
+    TableIdentifier tableIdentifier =
+        new TableIdentifier(
+            identifier.name(), Option.apply(namespacePath), 
Option.apply(getCatalogName()));
+
+    scala.collection.immutable.Map<String, String> scalaOptions =
+        (scala.collection.immutable.Map<String, String>)
+            scala.collection.immutable.Map$.MODULE$.apply(
+                
scala.collection.JavaConverters.mapAsScalaMap(tableProperties).toSeq());
+
+    org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat storage =
+        DataSource.buildStorageFormatFromOptions(scalaOptions);
+
+    // Currently Polaris generic table does not contain any schema 
information, partition columns,
+    // stats, etc
+    // for now we will just use fill the parameters we have from catalog, and 
let underlying client
+    // resolve the rest within its catalog implementation
+    org.apache.spark.sql.types.StructType emptySchema = new 
org.apache.spark.sql.types.StructType();
+    scala.collection.immutable.Seq<String> emptyStringSeq =
+        scala.collection.JavaConverters.asScalaBuffer(new 
java.util.ArrayList<String>()).toList();
+    CatalogTable catalogTable =
+        new CatalogTable(
+            tableIdentifier,
+            CatalogTableType.EXTERNAL(),
+            storage,
+            emptySchema,
+            Option.apply(genericTable.getProperties().get("provider")),

Review Comment:
   I believe these values should be the same, but can try using 
`genericTable.format`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to