adutra commented on code in PR #1303:
URL: https://github.com/apache/polaris/pull/1303#discussion_r2028482708
##########
plugins/spark/v3.5/build.gradle.kts:
##########
@@ -41,18 +41,37 @@ val scalaVersion = getAndUseScalaVersionForProject()
val icebergVersion = pluginlibs.versions.iceberg.get()
val spark35Version = pluginlibs.versions.spark35.get()
+val scalaLibraryVersion =
+ if (scalaVersion.equals("2.12")) {
+ pluginlibs.versions.scala212.get()
+ } else {
+ pluginlibs.versions.scala213.get()
+ }
+
dependencies {
implementation(project(":polaris-api-iceberg-service")) {
- // exclude the iceberg and jackson dependencies, use the
- // dependencies packed in the iceberg-spark dependency
+ // exclude the iceberg dependencies, use the ones pulled
+ // by iceberg-core
exclude("org.apache.iceberg", "*")
- exclude("com.fasterxml.jackson.core", "*")
}
+ implementation(project(":polaris-api-catalog-service"))
+ implementation(project(":polaris-core")) { exclude("org.apache.iceberg",
"*") }
+
+ implementation("org.apache.iceberg:iceberg-core:${icebergVersion}")
+ implementation("com.fasterxml.jackson.core:jackson-annotations")
+ implementation("com.fasterxml.jackson.core:jackson-core")
+ implementation("com.fasterxml.jackson.core:jackson-databind")
implementation(
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
- )
+ ) {
+ // exclude the iceberg rest dependencies, use the ones pulled
+ // with iceberg-core dependency
+ exclude("org.apache.iceberg.rest", "*")
Review Comment:
Hmm there is no module in Iceberg with group ID `org.apache.iceberg.rest` or
`org.apache.iceberg.hadoop`. All Iceberg modules have group ID
`org.apache.iceberg`.
I think you are mixing package names and group IDs? These exclusions imho
are not necessary.
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.collect.ImmutableMap;
Review Comment:
Some classes are using the unshaded Guava classes, like this one. But some
others, e.g. `PolarisInMemoryCatalog` are using the relocated Guava ones
(`org.apache.iceberg.relocated.com.google.common.*`).
Since we are depending at compile time on `iceberg-spark-runtime`, I think
it would make sense to use the relocated classes and actually exclude/ban the
unshaded Guava classes. Wdyt?
The same is valid for other relocated libraries, like Jackson or Caffeine.
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.polaris.service.types.GenericTable;
+
+/** InMemory implementation for the Polaris Catalog. This class is mainly used
by testing. */
+public class PolarisInMemoryCatalog implements PolarisCatalog {
+ private final ConcurrentMap<TableIdentifier, GenericTable> genericTables;
+
+ public PolarisInMemoryCatalog() {
+ this.genericTables = Maps.newConcurrentMap();
+ }
+
+ @Override
+ public List<TableIdentifier> listGenericTables(Namespace ns) {
+ return this.genericTables.keySet().stream()
+ .filter(t -> t.namespace().equals(ns))
+ .sorted(Comparator.comparing(TableIdentifier::toString))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public GenericTable loadGenericTable(TableIdentifier identifier) {
+ GenericTable table = this.genericTables.get(identifier);
+ if (table == null) {
+ throw new NoSuchTableException("Generic table does not exist: %s",
identifier);
+ }
+
+ return table;
+ }
+
+ @Override
+ public boolean dropGenericTable(TableIdentifier identifier) {
+ synchronized (this) {
+ if (null == this.genericTables.remove(identifier)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public GenericTable createGenericTable(
+ TableIdentifier identifier, String format, Map<String, String> props) {
+ synchronized (this) {
Review Comment:
This synchronization pattern in this class needs to be refactored:
* You need a `ConcurrentMap` because you need a failsafe iteration on the
map keys;
* But you also need to synchronize on `this` because the way insertion is
implemented is not atomic.
I'd suggest to use `putIfAbsent` here and get rid of the
`synchronized(this)` blocks:
```java
GenericTable previous = this.genericTables.putIfAbsent(
identifier,
GenericTable.builder()
.setName(identifier.name())
.setFormat(format)
.setProperties(props)
.build());
if (previous != null) {
throw new AlreadyExistsException(
"Generic table already exists: %s", identifier);
}
```
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
Review Comment:
Another option would be to do the opposite:
1. Ban all Iceberg relocated properties at compile time;
2. In the Shadow Jar configuration, add a relocation rule:
```kotlin
// Same relocation as Iceberg does
relocate("com.fasterxml.jackson",
"org.apache.iceberg.shaded.com.fasterxml.jackson")
```
The advantage is that the IDE would be happier since it would be dealing
with unshaded classes.
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import java.util.*;
Review Comment:
nit: star import.
##########
plugins/spark/v3.5/build.gradle.kts:
##########
@@ -41,18 +41,37 @@ val scalaVersion = getAndUseScalaVersionForProject()
val icebergVersion = pluginlibs.versions.iceberg.get()
val spark35Version = pluginlibs.versions.spark35.get()
+val scalaLibraryVersion =
+ if (scalaVersion.equals("2.12")) {
Review Comment:
nit:
```suggestion
if (scalaVersion == "2.12") {
```
##########
plugins/spark/v3.5/build.gradle.kts:
##########
@@ -95,7 +114,10 @@ tasks.register<ShadowJar>("createPolarisSparkJar") {
// Optimization: Minimize the JAR (remove unused classes from dependencies)
// The iceberg-spark-runtime plugin is always packaged along with our
polaris-spark plugin,
// therefore excluded from the optimization.
- minimize {
exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*")) }
+ minimize {
+ exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*"))
Review Comment:
That's surprising, do we want to include the _entire_ Spark runtime jar in
this plugin jar?
Why not use both jars side by side, e.g.
```shell
spark-sql \
--packages
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.polaris:polaris-spark-3.5_2.12
```
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.*;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.core.rest.PolarisResourcePaths;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest;
+import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
+
+public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
+ public static final String REST_PAGE_SIZE = "rest-page-size";
+
+ private RESTClient restClient = null;
+ private CloseableGroup closeables = null;
+ private Set<Endpoint> endpoints;
+ private OAuth2Util.AuthSession catalogAuth = null;
+ private PolarisResourcePaths paths = null;
+ private Integer pageSize = null;
+
+ // the default endpoints to config if server doesn't specify the 'endpoints'
configuration.
+ private static final Set<Endpoint> DEFAULT_ENDPOINTS =
PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
+
+ public void initialize(Map<String, String> unresolved,
OAuth2Util.AuthSession catalogAuth) {
+ Preconditions.checkArgument(unresolved != null, "Invalid configuration:
null");
+
+ // resolve any configuration that is supplied by environment variables
+ Map<String, String> props = EnvironmentUtil.resolveAll(unresolved);
+
+ // TODO: switch to use authManager once iceberg dependency is updated to
1.9.0
+ this.catalogAuth = catalogAuth;
+
+ this.restClient =
Review Comment:
This first client needs to be closed.
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.rest.*;
Review Comment:
nit: star import.
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
Review Comment:
You _probably_ should avoid importing unshaded Jackson and use
`org.apache.hadoop.shaded.com.fasterxml.jackson.annotation.*` instead.
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.polaris.service.types.GenericTable;
+
+/** InMemory implementation for the Polaris Catalog. This class is mainly used
by testing. */
Review Comment:
Can we move it to the `test` folder then?
##########
plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import com.google.common.collect.Maps;
+import java.lang.reflect.Field;
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.RESTSessionCatalog;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.execution.datasources.DataSource;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class PolarisCatalogUtils {
+ public static final String TABLE_PROVIDER_KEY = "provider";
+ public static final String TABLE_PATH_KEY = "path";
+
+ // whether enable the inMemory catalogs, used by testing.
+ public static final String ENABLE_IN_MEMORY_CATALOG_KEY =
"enable_in_memory_catalog";
+
+ public static void checkIdentifierIsValid(TableIdentifier tableIdentifier) {
+ if (tableIdentifier.namespace().isEmpty()) {
+ throw new NoSuchTableException("Invalid table identifier: %s",
tableIdentifier);
+ }
+ }
+
+ /** Check whether the table provider is iceberg. */
+ public static boolean useIceberg(String provider) {
+ return provider == null || "iceberg".equalsIgnoreCase(provider);
+ }
+
+ /** Check whether the table provider is delta. */
+ public static boolean useDelta(String provider) {
+ return "delta".equalsIgnoreCase(provider);
+ }
+
+ /**
+ * Load spark table using DataSourceV2.
+ *
+ * @return V2Table if DataSourceV2 is available for the table format. For
delta table, it returns
+ * DeltaTableV2.
+ */
+ public static Table loadSparkTable(GenericTable genericTable) {
+ SparkSession sparkSession = SparkSession.active();
+ TableProvider provider =
+ DataSource.lookupDataSourceV2(genericTable.getFormat(),
sparkSession.sessionState().conf())
+ .get();
+ Map<String, String> properties = genericTable.getProperties();
+ boolean hasLocationClause = properties.get(TableCatalog.PROP_LOCATION) !=
null;
+ boolean hasPathClause = properties.get(TABLE_PATH_KEY) != null;
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.putAll(properties);
+ if (!hasPathClause && hasLocationClause) {
+ // DataSourceV2 requires the path property on table loading. However,
spark today
+ // doesn't create the corresponding path property if the path keyword is
not
+ // provided by user when location is provided. Here, we duplicate the
location
+ // property as path to make sure the table can be loaded.
+ tableProperties.put(TABLE_PATH_KEY,
properties.get(TableCatalog.PROP_LOCATION));
+ }
+ CaseInsensitiveStringMap property_map = new
CaseInsensitiveStringMap(tableProperties);
+ return DataSourceV2Utils.getTableFromProvider(
+ provider, property_map, scala.Option$.MODULE$.<StructType>empty());
+ }
+
+ /**
+ * Get the catalogAuth field inside the RESTSessionCatalog used by Iceberg
Spark Catalog use
+ * reflection. TODO: Deprecate this function once the iceberg client is
updated to 1.9.0 to use
+ * AuthManager and the capability of injecting an AuthManger is available.
Related iceberg PR:
+ * https://github.com/apache/iceberg/pull/12655
+ */
+ public static OAuth2Util.AuthSession getAuthSession(SparkCatalog
sparkCatalog) {
+ try {
+ Field icebergCatalogField =
sparkCatalog.getClass().getDeclaredField("icebergCatalog");
Review Comment:
I don't think we'll have the ability to share auth sessions in 1.9
unfortunately.
##########
plugins/spark/v3.5/build.gradle.kts:
##########
@@ -95,7 +114,10 @@ tasks.register<ShadowJar>("createPolarisSparkJar") {
// Optimization: Minimize the JAR (remove unused classes from dependencies)
Review Comment:
Not related to this PR, but why are we excluding the entire `META_INF/**`
path? This is probably not OK from a legal standpoint. Also, there is no point
in calling `mergeServiceFiles()` if in the end we exclude the entire
`META-INF/service/` folder.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]