This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 6b4356b19 Spark: Setup repository code structure and build (#1190)
6b4356b19 is described below
commit 6b4356b193e7673b7b2880995dc1b3ad7ac1a785
Author: gh-yzou <[email protected]>
AuthorDate: Tue Apr 1 15:41:24 2025 -0700
Spark: Setup repository code structure and build (#1190)
---
plugins/pluginlibs.versions.toml | 22 +++
plugins/spark/README.md | 40 +++++
plugins/spark/spark-scala.properties | 22 +++
plugins/spark/v3.5/build.gradle.kts | 101 +++++++++++++
.../org/apache/polaris/spark/SparkCatalog.java | 161 +++++++++++++++++++++
.../org/apache/polaris/spark/SparkCatalogTest.java | 97 +++++++++++++
settings.gradle.kts | 20 +++
7 files changed, 463 insertions(+)
diff --git a/plugins/pluginlibs.versions.toml b/plugins/pluginlibs.versions.toml
new file mode 100644
index 000000000..0a4a515e5
--- /dev/null
+++ b/plugins/pluginlibs.versions.toml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[versions]
+iceberg = "1.8.1"
+spark35 = "3.5.5"
diff --git a/plugins/spark/README.md b/plugins/spark/README.md
new file mode 100644
index 000000000..6386a914c
--- /dev/null
+++ b/plugins/spark/README.md
@@ -0,0 +1,40 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Polaris Spark Plugin
+
+The Polaris Spark plugin provides a SparkCatalog class, which communicates
with the Polaris
+REST endpoints, and provides implementations for Apache Spark's
+[TableCatalog](https://github.com/apache/spark/blob/v3.5.5/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java),
+[SupportsNamespaces](https://github.com/apache/spark/blob/v3.5.5/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java),
+[ViewCatalog](https://github.com/apache/spark/blob/v3.5.5/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java)
classes.
+
+Right now, the plugin only provides support for Spark 3.5, Scala version 2.12
and 2.13,
+and depends on iceberg-spark-runtime 1.8.1.
+
+# Build Plugin Jar
+A task createPolarisSparkJar is added to build a jar for the Polaris Spark
plugin, the jar is named as:
+"polaris-iceberg-<iceberg_version>-spark-runtime-<spark_major_version>_<scala_version>.jar"
+
+Building the Polaris project produces client jars for both Scala 2.12 and
2.13, and CI runs the Spark
+client tests for both Scala versions as well.
+
+The Jar can also be built alone with a specific version using target
`:polaris-spark-3.5_<scala_version>`. For example:
+- `./gradlew :polaris-spark-3.5_2.12:createPolarisSparkJar` - Build a jar for
the Polaris Spark plugin with scala version 2.12.
+The result jar is located at plugins/spark/build/<scala_version>/libs after
the build.
diff --git a/plugins/spark/spark-scala.properties
b/plugins/spark/spark-scala.properties
new file mode 100644
index 000000000..2ed71b574
--- /dev/null
+++ b/plugins/spark/spark-scala.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+sparkVersions=3.5
+
+scalaVersions=2.12,2.13
diff --git a/plugins/spark/v3.5/build.gradle.kts
b/plugins/spark/v3.5/build.gradle.kts
new file mode 100644
index 000000000..36ca6d528
--- /dev/null
+++ b/plugins/spark/v3.5/build.gradle.kts
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
+
+plugins {
+ id("polaris-client")
+ alias(libs.plugins.jandex)
+}
+
+fun getAndUseScalaVersionForProject(): String {
+ val sparkScala = project.name.split("-").last().split("_")
+
+ val scalaVersion = sparkScala[1]
+
+ // direct the build to build/<scalaVersion> to avoid potential collision
problem
+
project.layout.buildDirectory.set(layout.buildDirectory.dir(scalaVersion).get())
+
+ return scalaVersion
+}
+
+// get version information
+val sparkMajorVersion = "3.5"
+val scalaVersion = getAndUseScalaVersionForProject()
+val icebergVersion = pluginlibs.versions.iceberg.get()
+val spark35Version = pluginlibs.versions.spark35.get()
+
+dependencies {
+ implementation(project(":polaris-api-iceberg-service")) {
+ // exclude the iceberg and jackson dependencies, use the
+ // dependencies packed in the iceberg-spark dependency
+ exclude("org.apache.iceberg", "*")
+ exclude("com.fasterxml.jackson.core", "*")
+ }
+
+ implementation(
+
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
+ )
+
+ compileOnly("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
+ // exclude log4j dependencies
+ exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
+ exclude("org.apache.logging.log4j", "log4j-api")
+ exclude("org.apache.logging.log4j", "log4j-1.2-api")
+ exclude("org.slf4j", "jul-to-slf4j")
+ }
+
+ testImplementation(platform(libs.junit.bom))
+ testImplementation("org.junit.jupiter:junit-jupiter")
+ testImplementation(libs.assertj.core)
+ testImplementation(libs.mockito.core)
+
+ testImplementation(
+
"org.apache.iceberg:iceberg-spark-runtime-3.5_${scalaVersion}:${icebergVersion}"
+ )
+
testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}")
{
+ // exclude log4j dependencies
+ exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
+ exclude("org.apache.logging.log4j", "log4j-api")
+ exclude("org.apache.logging.log4j", "log4j-1.2-api")
+ exclude("org.slf4j", "jul-to-slf4j")
+ }
+}
+
+tasks.register<ShadowJar>("createPolarisSparkJar") {
+ archiveClassifier = null
+ archiveBaseName =
+
"polaris-iceberg-${icebergVersion}-spark-runtime-${sparkMajorVersion}_${scalaVersion}"
+ isZip64 = true
+
+ dependencies { exclude("META-INF/**") }
+
+ // pack both the source code and dependencies
+ from(sourceSets.main.get().output)
+ configurations = listOf(project.configurations.runtimeClasspath.get())
+
+ mergeServiceFiles()
+
+ // Optimization: Minimize the JAR (remove unused classes from dependencies)
+ // The iceberg-spark-runtime plugin is always packaged along with our
polaris-spark plugin,
+ // therefore excluded from the optimization.
+ minimize {
exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*")) }
+}
+
+tasks.withType(Jar::class).named("sourcesJar") {
dependsOn("createPolarisSparkJar") }
diff --git
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
new file mode 100644
index 000000000..2ec0450a0
--- /dev/null
+++
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.*;
+import org.apache.spark.sql.connector.catalog.*;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkCatalog implements TableCatalog, SupportsNamespaces,
ViewCatalog {
+ private static final Set<String> DEFAULT_NS_KEYS =
ImmutableSet.of(TableCatalog.PROP_OWNER);
+ private String catalogName = null;
+ private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
+
+ // TODO: Add Polaris Specific REST Catalog
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ @Override
+ public void initialize(String name, CaseInsensitiveStringMap options) {
+ this.catalogName = name;
+ }
+
+ @Override
+ public Table loadTable(Identifier ident) throws NoSuchTableException {
+ throw new UnsupportedOperationException("loadTable");
+ }
+
+ @Override
+ public Table createTable(
+ Identifier ident, StructType schema, Transform[] transforms, Map<String,
String> properties)
+ throws TableAlreadyExistsException {
+ throw new UnsupportedOperationException("createTable");
+ }
+
+ @Override
+ public Table alterTable(Identifier ident, TableChange... changes) throws
NoSuchTableException {
+ throw new UnsupportedOperationException("alterTable");
+ }
+
+ @Override
+ public boolean dropTable(Identifier ident) {
+ throw new UnsupportedOperationException("dropTable");
+ }
+
+ @Override
+ public void renameTable(Identifier from, Identifier to)
+ throws NoSuchTableException, TableAlreadyExistsException {
+ throw new UnsupportedOperationException("renameTable");
+ }
+
+ @Override
+ public Identifier[] listTables(String[] namespace) {
+ throw new UnsupportedOperationException("listTables");
+ }
+
+ @Override
+ public String[] defaultNamespace() {
+ throw new UnsupportedOperationException("defaultNamespace");
+ }
+
+ @Override
+ public String[][] listNamespaces() {
+ throw new UnsupportedOperationException("listNamespaces");
+ }
+
+ @Override
+ public String[][] listNamespaces(String[] namespace) throws
NoSuchNamespaceException {
+ throw new UnsupportedOperationException("listNamespaces");
+ }
+
+ @Override
+ public Map<String, String> loadNamespaceMetadata(String[] namespace)
+ throws NoSuchNamespaceException {
+ throw new UnsupportedOperationException("loadNamespaceMetadata");
+ }
+
+ @Override
+ public void createNamespace(String[] namespace, Map<String, String> metadata)
+ throws NamespaceAlreadyExistsException {
+ throw new UnsupportedOperationException("createNamespace");
+ }
+
+ @Override
+ public void alterNamespace(String[] namespace, NamespaceChange... changes)
+ throws NoSuchNamespaceException {
+ throw new UnsupportedOperationException("alterNamespace");
+ }
+
+ @Override
+ public boolean dropNamespace(String[] namespace, boolean cascade)
+ throws NoSuchNamespaceException {
+ throw new UnsupportedOperationException("dropNamespace");
+ }
+
+ @Override
+ public Identifier[] listViews(String... namespace) {
+ throw new UnsupportedOperationException("listViews");
+ }
+
+ @Override
+ public View loadView(Identifier ident) throws NoSuchViewException {
+ throw new UnsupportedOperationException("loadView");
+ }
+
+ @Override
+ public View createView(
+ Identifier ident,
+ String sql,
+ String currentCatalog,
+ String[] currentNamespace,
+ StructType schema,
+ String[] queryColumnNames,
+ String[] columnAliases,
+ String[] columnComments,
+ Map<String, String> properties)
+ throws ViewAlreadyExistsException, NoSuchNamespaceException {
+ throw new UnsupportedOperationException("createView");
+ }
+
+ @Override
+ public View alterView(Identifier ident, ViewChange... changes)
+ throws NoSuchViewException, IllegalArgumentException {
+ throw new UnsupportedOperationException("alterView");
+ }
+
+ @Override
+ public boolean dropView(Identifier ident) {
+ throw new UnsupportedOperationException("dropView");
+ }
+
+ @Override
+ public void renameView(Identifier fromIdentifier, Identifier toIdentifier)
+ throws NoSuchViewException, ViewAlreadyExistsException {
+ throw new UnsupportedOperationException("renameView");
+ }
+}
diff --git
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
new file mode 100644
index 000000000..50c1e645a
--- /dev/null
+++
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class SparkCatalogTest {
+ private SparkCatalog catalog;
+ private String catalogName;
+
+ @BeforeEach
+ public void setup() {
+ catalogName = "test_" + UUID.randomUUID();
+ Map<String, String> catalogConfig = Maps.newHashMap();
+ catalogConfig.put(CATALOG_IMPL,
"org.apache.iceberg.inmemory.InMemoryCatalog");
+ catalogConfig.put("cache-enabled", "false");
+ catalog = new SparkCatalog();
+ catalog.initialize(catalogName, new
CaseInsensitiveStringMap(catalogConfig));
+ }
+
+ @Test
+ public void testUnsupportedOperations() {
+ String[] namespace = new String[] {"ns1"};
+ Identifier identifier = Identifier.of(namespace, "table1");
+ Identifier new_identifier = Identifier.of(namespace, "table2");
+ // table methods
+ assertThatThrownBy(() -> catalog.loadTable(identifier))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(
+ () -> catalog.createTable(identifier,
Mockito.mock(StructType.class), null, null))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.alterTable(identifier))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.dropTable(identifier))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.renameTable(identifier, new_identifier))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.listTables(namespace))
+ .isInstanceOf(UnsupportedOperationException.class);
+
+ // namespace methods
+ assertThatThrownBy(() -> catalog.loadNamespaceMetadata(namespace))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.listNamespaces())
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.listNamespaces(namespace))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.createNamespace(namespace, null))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.alterNamespace(namespace))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.dropNamespace(namespace, false))
+ .isInstanceOf(UnsupportedOperationException.class);
+
+ // view methods
+ assertThatThrownBy(() -> catalog.listViews(namespace))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.loadView(identifier))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(
+ () -> catalog.createView(identifier, null, null, null, null, null,
null, null, null))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.alterView(identifier))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.dropView(identifier))
+ .isInstanceOf(UnsupportedOperationException.class);
+ assertThatThrownBy(() -> catalog.renameView(identifier, new_identifier))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 707592ead..3884bea5b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -57,6 +57,21 @@
loadProperties(file("gradle/projects.main.properties")).forEach { name, director
polarisProject(name as String, file(directory as String))
}
+// load the polaris spark plugin projects
+val polarisSparkDir = "plugins/spark"
+val sparkScalaVersions =
loadProperties(file("${polarisSparkDir}/spark-scala.properties"))
+val sparkVersions =
sparkScalaVersions["sparkVersions"].toString().split(",").map { it.trim() }
+
+for (sparkVersion in sparkVersions) {
+ val scalaVersions =
sparkScalaVersions["scalaVersions"].toString().split(",").map { it.trim() }
+ for (scalaVersion in scalaVersions) {
+ polarisProject(
+ "polaris-spark-${sparkVersion}_${scalaVersion}",
+ file("${polarisSparkDir}/v${sparkVersion}"),
+ )
+ }
+}
+
pluginManagement {
repositories {
mavenCentral() // prefer Maven Central, in case Gradle's repo has issues
@@ -72,6 +87,11 @@ dependencyResolutionManagement {
}
}
+dependencyResolutionManagement {
+ // version catalog used by the polaris plugin code, such as polaris-spark-3.5
+ versionCatalogs { create("pluginlibs") {
from(files("plugins/pluginlibs.versions.toml")) } }
+}
+
gradle.beforeProject {
version = baseVersion
group = "org.apache.polaris"