This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 536eb6fb51 [#8838][#8837] feat(catalogs): Support create/load/list
table operation for lance table (#9181)
536eb6fb51 is described below
commit 536eb6fb514029c79dab397297ee1a15682b114e
Author: Junda Yang <[email protected]>
AuthorDate: Wed Nov 19 15:22:16 2025 -0800
[#8838][#8837] feat(catalogs): Support create/load/list table operation for
lance table (#9181)
### What changes were proposed in this pull request?
This PR implements create, load, and list table operations for Lance
format tables in the generic lakehouse catalog:
1. Added LanceCatalogOperations and LanceDataTypeConverter for
Lance-specific operations
2. Introduced GenericTable API and GenericTableEntity for generic format
table support
3. Implemented table versioning system with TableVersionMapper and
related SQL providers
4. Extended GenericLakehouseCatalogOperations to delegate to
format-specific implementations
5. Updated database schema to store table storage details (format,
location, version)
### Why are the changes needed?
Lance is a columnar format optimized for ML workloads. This enables
users to manage Lance tables through Gravitino's unified catalog API
alongside other formats like Iceberg and Hudi in a multi-format
lakehouse architecture.
Fix: #8838 #8837
### Does this PR introduce _any_ user-facing change?
Yes:
1. New Lance table support: Users can create/load Lance tables via REST
API with "format": "lance" property
2. New table properties: format (table storage format) and location
(storage path)
Example:
`POST /api/metalakes/test/catalogs/lance_catalog/schemas/schema/tables{
"name": "my_table", "columns": [...], "properties": {"format": "lance",
"location": "/path/to/table"}}`
### How was this patch tested?
Tested locally with manual REST API calls:
1. Created Lance tables with primary keys and default values ✓
2. Loaded existing tables and verified metadata ✓
3. Listed tables in schema ✓
4. Verified on-disk storage structure (_transactions/, _versions/) ✓
Co-authored-by: Mini Yu <[email protected]>
---
api/build.gradle.kts | 2 +
.../org/apache/gravitino/rel/GenericTable.java | 47 ++++++
.../org/apache/gravitino/rel/indexes/Indexes.java | 87 ++++++++++
.../java/org/apache/gravitino/rel/TestIndex.java | 57 +++++++
.../catalog-generic-lakehouse/build.gradle.kts | 1 +
.../GenericLakehouseCatalogOperations.java | 147 ++++++++++++++--
.../lakehouse/LakehouseCatalogOperations.java | 25 +++
.../lakehouse/lance/LanceCatalogOperations.java | 173 +++++++++++++++++++
.../lakehouse/lance/LanceDataTypeConverter.java | 123 ++++++++++++++
.../apache/gravitino/config/ConfigConstants.java | 2 +-
.../catalog/TableOperationDispatcher.java | 74 +++++++-
.../connector/GenericLakehouseColumn.java | 56 +++++++
.../gravitino/connector/GenericLakehouseTable.java | 86 ++++++++++
.../apache/gravitino/meta/GenericTableEntity.java | 186 +++++++++++++++++++++
.../org/apache/gravitino/meta/TableEntity.java | 10 +-
.../relational/mapper/TableVersionMapper.java | 36 ++++
.../mapper/TableVersionSQLProviderFactory.java | 62 +++++++
.../provider/DefaultMapperPackageProvider.java | 4 +-
.../provider/base/TableMetaBaseSQLProvider.java | 41 +++--
.../provider/base/TableVersionBaseSQLProvider.java | 79 +++++++++
.../postgresql/TableVersionPostgreSQLProvider.java | 24 +++
.../gravitino/storage/relational/po/TablePO.java | 46 +++++
.../relational/service/TableMetaService.java | 16 +-
.../storage/relational/utils/POConverters.java | 61 ++++++-
.../storage/relational/utils/SessionUtils.java | 12 ++
25 files changed, 1409 insertions(+), 48 deletions(-)
diff --git a/api/build.gradle.kts b/api/build.gradle.kts
index b4399b13c0..f0fe3ba5ee 100644
--- a/api/build.gradle.kts
+++ b/api/build.gradle.kts
@@ -26,6 +26,8 @@ dependencies {
implementation(libs.commons.lang3)
implementation(libs.commons.collections4)
implementation(libs.guava)
+ implementation(libs.jackson.annotations)
+ implementation(libs.jackson.databind)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
diff --git a/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
b/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
new file mode 100644
index 0000000000..4796421c53
--- /dev/null
+++ b/api/src/main/java/org/apache/gravitino/rel/GenericTable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.rel;
+
+/** A generic table interface that extends the Table interface. */
+public interface GenericTable extends Table {
+
+ /**
+ * Formats the table as a string representation.
+ *
+ * @return the formatted string representation of the table
+ */
+ String format();
+
+ /**
+ * Gets the location of the table.
+ *
+ * @return the location of the table
+ */
+ String location();
+
+ /**
+ * Indicates whether the table is external.
+ *
+ * @return true if the table is external, false otherwise
+ */
+ default boolean external() {
+ return false;
+ }
+}
diff --git a/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
b/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
index ce10fd0a0f..d1b1a1f523 100644
--- a/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
+++ b/api/src/main/java/org/apache/gravitino/rel/indexes/Indexes.java
@@ -18,6 +18,22 @@
*/
package org.apache.gravitino.rel.indexes;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
/** Helper methods to create index to pass into Apache Gravitino. */
public class Indexes {
@@ -73,10 +89,81 @@ public class Indexes {
.build();
}
+ /** Custom JSON serializer for Index objects. */
+ public static class IndexSerializer extends JsonSerializer<Index> {
+ @Override
+ public void serialize(Index value, JsonGenerator gen, SerializerProvider
serializers)
+ throws IOException {
+ gen.writeStartObject();
+ gen.writeStringField("indexType",
value.type().name().toUpperCase(Locale.ROOT));
+ if (null != value.name()) {
+ gen.writeStringField("name", value.name());
+ }
+ gen.writeFieldName("fieldNames");
+ gen.writeObject(value.fieldNames());
+ gen.writeEndObject();
+ }
+ }
+
+ /** Custom JSON deserializer for Index objects. */
+ public static class IndexDeserializer extends JsonDeserializer<Index> {
+
+ @Override
+ public Index deserialize(JsonParser p, DeserializationContext ctxt) throws
IOException {
+ JsonNode node = p.getCodec().readTree(p);
+ Preconditions.checkArgument(
+ node != null && !node.isNull() && node.isObject(),
+ "Index must be a valid JSON object, but found: %s",
+ node);
+
+ IndexImpl.Builder builder = IndexImpl.builder();
+ Preconditions.checkArgument(
+ node.has("indexType"), "Cannot parse index from missing type: %s",
node);
+ String indexType = getString("indexType", node);
+
builder.withIndexType(Index.IndexType.valueOf(indexType.toUpperCase(Locale.ROOT)));
+ if (node.has("name")) {
+ builder.withName(getString("name", node));
+ }
+ Preconditions.checkArgument(
+ node.has("fieldNames"), "Cannot parse index from missing field
names: %s", node);
+ List<String[]> fieldNames = Lists.newArrayList();
+ node.get("fieldNames").forEach(field ->
fieldNames.add(getStringArray((ArrayNode) field)));
+ builder.withFieldNames(fieldNames.toArray(new String[0][0]));
+ return builder.build();
+ }
+
+ private static String[] getStringArray(ArrayNode node) {
+ String[] array = new String[node.size()];
+ for (int i = 0; i < node.size(); i++) {
+ array[i] = node.get(i).asText();
+ }
+ return array;
+ }
+
+ private static String getString(String property, JsonNode node) {
+ Preconditions.checkArgument(node.has(property), "Cannot parse missing
string: %s", property);
+ JsonNode pNode = node.get(property);
+ return convertToString(property, pNode);
+ }
+
+ private static String convertToString(String property, JsonNode pNode) {
+ Preconditions.checkArgument(
+ pNode != null && !pNode.isNull() && pNode.isTextual(),
+ "Cannot parse to a string value %s: %s",
+ property,
+ pNode);
+ return pNode.asText();
+ }
+ }
+
/** The user side implementation of the index. */
+ @JsonSerialize(using = IndexSerializer.class)
+ @JsonDeserialize(using = IndexDeserializer.class)
public static final class IndexImpl implements Index {
private final IndexType indexType;
+
private final String name;
+
private final String[][] fieldNames;
/**
diff --git a/api/src/test/java/org/apache/gravitino/rel/TestIndex.java
b/api/src/test/java/org/apache/gravitino/rel/TestIndex.java
new file mode 100644
index 0000000000..4a807fbb7b
--- /dev/null
+++ b/api/src/test/java/org/apache/gravitino/rel/TestIndex.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.rel;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.cfg.EnumFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestIndex {
+
+ @Test
+ void testIndexSerialization() throws JsonProcessingException {
+ String[][] fields = {{"column1"}, {"column2", "subcolumn"}};
+ Index index = Indexes.unique("test_index", fields);
+
+ JsonMapper jsonMapper =
+ JsonMapper.builder()
+ .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+ .configure(EnumFeature.WRITE_ENUMS_TO_LOWERCASE, true)
+ .enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS)
+ .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+
.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .build();
+
+ String json = jsonMapper.writeValueAsString(index);
+
+ Index deserializedIndex = jsonMapper.readValue(json, IndexImpl.class);
+ Assertions.assertEquals(index.type(), deserializedIndex.type());
+ Assertions.assertEquals(index.name(), deserializedIndex.name());
+ Assertions.assertArrayEquals(index.fieldNames(),
deserializedIndex.fieldNames());
+ }
+}
diff --git a/catalogs/catalog-generic-lakehouse/build.gradle.kts
b/catalogs/catalog-generic-lakehouse/build.gradle.kts
index fceac14304..704dbda7e3 100644
--- a/catalogs/catalog-generic-lakehouse/build.gradle.kts
+++ b/catalogs/catalog-generic-lakehouse/build.gradle.kts
@@ -43,6 +43,7 @@ dependencies {
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.hadoop3.client.api)
+ implementation(libs.lance)
annotationProcessor(libs.lombok)
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 1ddb177f17..63a6935544 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -18,11 +18,17 @@
*/
package org.apache.gravitino.catalog.lakehouse;
+import static org.apache.gravitino.Entity.EntityType.TABLE;
+
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
@@ -30,16 +36,20 @@ import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.catalog.ManagedSchemaOperations;
+import org.apache.gravitino.catalog.lakehouse.lance.LanceCatalogOperations;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.SupportsSchemas;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.GenericTableEntity;
+import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
@@ -66,6 +76,11 @@ public class GenericLakehouseCatalogOperations
@SuppressWarnings("unused") // todo: remove this after implementing table
operations
private Optional<Path> catalogLakehouseLocation;
+ private static final Map<String, LakehouseCatalogOperations>
SUPPORTED_FORMATS =
+ Maps.newHashMap();
+
+ private CatalogInfo catalogInfo;
+ private HasPropertyMetadata propertiesMetadata;
/**
* Initializes the generic lakehouse catalog operations with the provided
configuration.
*
@@ -152,10 +167,25 @@ public class GenericLakehouseCatalogOperations
@Override
public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
- // TODO(#8838): Implement table listing
- throw new UnsupportedOperationException(
- "Table operations are not yet implemented. "
- + "This feature is planned for a future release.");
+ EntityStore store = GravitinoEnv.getInstance().entityStore();
+ NameIdentifier identifier = NameIdentifier.of(namespace.levels());
+ try {
+ store.get(identifier, Entity.EntityType.SCHEMA, SchemaEntity.class);
+ } catch (NoSuchTableException e) {
+ throw new NoSuchEntityException(e, "Schema %s does not exist",
namespace);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to get schema " + identifier);
+ }
+
+ try {
+ List<GenericTableEntity> tableEntityList =
+ store.list(namespace, GenericTableEntity.class, TABLE);
+ return tableEntityList.stream()
+ .map(e -> NameIdentifier.of(namespace, e.name()))
+ .toArray(NameIdentifier[]::new);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to list tables under schema " +
namespace, e);
+ }
}
@Override
@@ -177,14 +207,66 @@ public class GenericLakehouseCatalogOperations
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
- // TODO(#8838): Implement table creation
- // This should:
- // 1. Determine table format from properties
- // 2. Delegate to format-specific implementation (e.g.,
LanceCatalogOperations)
- // 3. Store metadata in Gravitino entity store
- throw new UnsupportedOperationException(
- "Table operations are not yet implemented. "
- + "This feature is planned for a future release.");
+ String format = properties.getOrDefault("format", "lance");
+ String tableLocation = calculateTableLocation(ident, properties);
+ Map<String, String> newProperties = Maps.newHashMap(properties);
+ newProperties.put("location", tableLocation);
+
+ LakehouseCatalogOperations lakehouseCatalogOperations =
+ SUPPORTED_FORMATS.compute(
+ format,
+ (k, v) ->
+ v == null
+ ? createLakehouseCatalogOperations(
+ format, properties, catalogInfo, propertiesMetadata)
+ : v);
+
+ return lakehouseCatalogOperations.createTable(
+ ident, columns, comment, newProperties, partitions, distribution,
sortOrders, indexes);
+ }
+
+ private String calculateTableLocation(
+ NameIdentifier tableIdent, Map<String, String> tableProperties) {
+ String tableLocation = tableProperties.get("location");
+ if (StringUtils.isNotBlank(tableLocation)) {
+ return ensureTrailingSlash(tableLocation);
+ }
+
+ String schemaLocation;
+ try {
+ Schema schema =
loadSchema(NameIdentifier.of(tableIdent.namespace().levels()));
+ schemaLocation = schema.properties().get("location");
+ } catch (NoSuchSchemaException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to load schema for table %s to determine default
location.", tableIdent),
+ e);
+ }
+
+ // If we do not set location in table properties, and schema location is
set, use schema
+ // location
+ // as the base path.
+ if (StringUtils.isNotBlank(schemaLocation)) {
+ return ensureTrailingSlash(schemaLocation) + tableIdent.name() + SLASH;
+ }
+
+ // If the schema location is not set, use catalog lakehouse dir as the
base path. Or else, throw
+ // an exception.
+ if (catalogLakehouseLocation.isEmpty()) {
+ throw new RuntimeException(
+ String.format(
+ "No location specified for table %s, you need to set location
either in catalog, schema, or table properties",
+ tableIdent));
+ }
+
+ String catalogLakehousePath = catalogLakehouseLocation.get().toString();
+ String[] nsLevels = tableIdent.namespace().levels();
+ String schemaName = nsLevels[nsLevels.length - 1];
+ return ensureTrailingSlash(catalogLakehousePath)
+ + schemaName
+ + SLASH
+ + tableIdent.name()
+ + SLASH;
}
@Override
@@ -198,13 +280,46 @@ public class GenericLakehouseCatalogOperations
@Override
public boolean dropTable(NameIdentifier ident) {
- // TODO(#8838): Implement table dropping
- throw new UnsupportedOperationException(
- "Table operations are not yet implemented. "
- + "This feature is planned for a future release.");
+ EntityStore store = GravitinoEnv.getInstance().entityStore();
+ Namespace namespace = ident.namespace();
+ try {
+ GenericTableEntity tableEntity =
+ store.get(ident, Entity.EntityType.TABLE, GenericTableEntity.class);
+ Map<String, String> tableProperties = tableEntity.getProperties();
+ String format = tableProperties.getOrDefault("format", "lance");
+ LakehouseCatalogOperations lakehouseCatalogOperations =
+ SUPPORTED_FORMATS.compute(
+ format,
+ (k, v) ->
+ v == null
+ ? createLakehouseCatalogOperations(
+ format, tableProperties, catalogInfo,
propertiesMetadata)
+ : v);
+ return lakehouseCatalogOperations.dropTable(ident);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to list tables under schema " +
namespace, e);
+ }
}
private String ensureTrailingSlash(String path) {
return path.endsWith(SLASH) ? path : path + SLASH;
}
+
+ private LakehouseCatalogOperations createLakehouseCatalogOperations(
+ String format,
+ Map<String, String> properties,
+ CatalogInfo catalogInfo,
+ HasPropertyMetadata propertiesMetadata) {
+ LakehouseCatalogOperations operations;
+ switch (format.toLowerCase()) {
+ case "lance":
+ operations = new LanceCatalogOperations();
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported lakehouse format:
" + format);
+ }
+
+ operations.initialize(properties, catalogInfo, propertiesMetadata);
+ return operations;
+ }
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
new file mode 100644
index 0000000000..66c7147626
--- /dev/null
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/LakehouseCatalogOperations.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse;
+
+import org.apache.gravitino.connector.CatalogOperations;
+import org.apache.gravitino.rel.TableCatalog;
+
+public interface LakehouseCatalogOperations extends CatalogOperations,
TableCatalog {}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
new file mode 100644
index 0000000000..3e1146b7ad
--- /dev/null
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.lance;
+
+import com.google.common.collect.ImmutableMap;
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.lakehouse.LakehouseCatalogOperations;
+import org.apache.gravitino.connector.CatalogInfo;
+import org.apache.gravitino.connector.GenericLakehouseTable;
+import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class LanceCatalogOperations implements LakehouseCatalogOperations {
+
+ private Map<String, String> lancePropertiesMap;
+
+ @Override
+ public void initialize(
+ Map<String, String> config, CatalogInfo info, HasPropertyMetadata
propertiesMetadata)
+ throws RuntimeException {
+ lancePropertiesMap = ImmutableMap.copyOf(config);
+ }
+
+ @Override
+ public void testConnection(
+ NameIdentifier catalogIdent,
+ Catalog.Type type,
+ String provider,
+ String comment,
+ Map<String, String> properties)
+ throws Exception {}
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
+ return new NameIdentifier[0];
+ }
+
+ @Override
+ public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+ // Should not come here.
+ return null;
+ }
+
+ @Override
+ public Table createTable(
+ NameIdentifier ident,
+ Column[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitions,
+ Distribution distribution,
+ SortOrder[] sortOrders,
+ Index[] indexes)
+ throws NoSuchSchemaException, TableAlreadyExistsException {
+ // Ignore partitions, distributions, sortOrders, and indexes for Lance
tables;
+ String location = properties.get("location");
+ try (Dataset dataset =
+ Dataset.create(
+ new RootAllocator(),
+ location,
+ convertColumnsToSchema(columns),
+ new WriteParams.Builder().build())) {
+ GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
+ return builder
+ .withName(ident.name())
+ .withColumns(columns)
+ .withComment(comment)
+ .withProperties(properties)
+ .withDistribution(distribution)
+ .withIndexes(indexes)
+ .withAuditInfo(
+ AuditInfo.builder()
+ .withCreator(PrincipalUtils.getCurrentUserName())
+ .withCreateTime(Instant.now())
+ .build())
+ .withPartitioning(partitions)
+ .withSortOrders(sortOrders)
+ .withFormat("lance")
+ .build();
+ }
+ }
+
+ private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToSchema(Column[] columns) {
+ LanceDataTypeConverter converter = new LanceDataTypeConverter();
+ List<Field> fields =
+ Arrays.stream(columns)
+ .map(
+ col -> {
+ boolean nullable = col.nullable();
+ if (nullable) {
+ return new org.apache.arrow.vector.types.pojo.Field(
+ col.name(),
+ org.apache.arrow.vector.types.pojo.FieldType.nullable(
+ converter.fromGravitino(col.dataType())),
+ null);
+ }
+
+ // not nullable
+ return new org.apache.arrow.vector.types.pojo.Field(
+ col.name(),
+ org.apache.arrow.vector.types.pojo.FieldType.notNullable(
+ converter.fromGravitino(col.dataType())),
+ null);
+ })
+ .collect(Collectors.toList());
+ return new org.apache.arrow.vector.types.pojo.Schema(fields);
+ }
+
+ @Override
+ public Table alterTable(NameIdentifier ident, TableChange... changes)
+ throws NoSuchTableException, IllegalArgumentException {
+ // Use another PRs to implement alter table for Lance tables
+ return null;
+ }
+
+ @Override
+ public boolean dropTable(NameIdentifier ident) {
+ try {
+ String location = lancePropertiesMap.get("location");
+ // Remove the directory on storage
+ FileSystem fs = FileSystem.get(new Configuration());
+ return fs.delete(new Path(location), true);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to drop Lance table: " +
ident.name(), e);
+ }
+ }
+}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
new file mode 100644
index 0000000000..117863659e
--- /dev/null
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.catalog.lakehouse.lance;
+
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
+import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.gravitino.connector.DataTypeConverter;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.rel.types.Types.FixedType;
+
+public class LanceDataTypeConverter implements DataTypeConverter<ArrowType,
ArrowType> {
+
+ @Override
+ public ArrowType fromGravitino(Type type) {
+ switch (type.name()) {
+ case BOOLEAN:
+ return Bool.INSTANCE;
+ case BYTE:
+ return new Int(8, true);
+ case SHORT:
+ return new Int(16, true);
+ case INTEGER:
+ return new Int(32, true);
+ case LONG:
+ return new Int(64, true);
+ case FLOAT:
+ return new FloatingPoint(FloatingPointPrecision.SINGLE);
+ case DOUBLE:
+ return new FloatingPoint(FloatingPointPrecision.DOUBLE);
+ case DECIMAL:
+ // Lance uses FIXED_SIZE_BINARY for decimal types
+ return new ArrowType.FixedSizeBinary(16); // assuming 16 bytes for
decimal
+ case DATE:
+ return new ArrowType.Date(DateUnit.DAY);
+ case TIME:
+ return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+ case TIMESTAMP:
+ return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+ case VARCHAR:
+ case STRING:
+ return new ArrowType.Utf8();
+ case FIXED:
+ FixedType fixedType = (FixedType) type;
+ return new ArrowType.FixedSizeBinary(fixedType.length());
+ case BINARY:
+ return new ArrowType.Binary();
+ default:
+ throw new UnsupportedOperationException("Unsupported Gravitino type: "
+ type.name());
+ }
+ }
+
+ @Override
+ public Type toGravitino(ArrowType arrowType) {
+ if (arrowType instanceof Bool) {
+ return Types.BooleanType.get();
+ } else if (arrowType instanceof Int intType) {
+ switch (intType.getBitWidth()) {
+ case 8 -> {
+ return Types.ByteType.get();
+ }
+ case 16 -> {
+ return Types.ShortType.get();
+ }
+ case 32 -> {
+ return Types.IntegerType.get();
+ }
+ case 64 -> {
+ return Types.LongType.get();
+ }
+ default -> throw new UnsupportedOperationException(
+ "Unsupported Int bit width: " + intType.getBitWidth());
+ }
+ } else if (arrowType instanceof FloatingPoint floatingPoint) {
+ switch (floatingPoint.getPrecision()) {
+ case SINGLE:
+ return Types.FloatType.get();
+ case DOUBLE:
+ return Types.DoubleType.get();
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported FloatingPoint precision: " +
floatingPoint.getPrecision());
+ }
+ } else if (arrowType instanceof ArrowType.FixedSizeBinary) {
+ ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary)
arrowType;
+ return Types.FixedType.of(fixedSizeBinary.getByteWidth());
+ } else if (arrowType instanceof ArrowType.Date) {
+ return Types.DateType.get();
+ } else if (arrowType instanceof ArrowType.Time) {
+ return Types.TimeType.get();
+ } else if (arrowType instanceof ArrowType.Timestamp) {
+ return Types.TimestampType.withoutTimeZone();
+ } else if (arrowType instanceof ArrowType.Utf8) {
+ return Types.StringType.get();
+ } else if (arrowType instanceof ArrowType.Binary) {
+ return Types.BinaryType.get();
+ } else {
+ throw new UnsupportedOperationException("Unsupported Arrow type: " +
arrowType);
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
b/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
index 0eb0faa08a..0ef761a6ca 100644
--- a/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
+++ b/common/src/main/java/org/apache/gravitino/config/ConfigConstants.java
@@ -84,5 +84,5 @@ public final class ConfigConstants {
public static final String VERSION_1_1_0 = "1.1.0";
/** The current version of backend storage initialization script. */
- public static final String CURRENT_SCRIPT_VERSION = VERSION_1_0_0;
+ public static final String CURRENT_SCRIPT_VERSION = VERSION_1_1_0;
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index a777f2a511..1cbab0d6ed 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -27,6 +27,7 @@ import static
org.apache.gravitino.utils.NameIdentifierUtil.getSchemaIdentifier;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
+import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
@@ -36,6 +37,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.Catalog;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
@@ -52,8 +54,10 @@ import org.apache.gravitino.lock.LockType;
import org.apache.gravitino.lock.TreeLockUtils;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.GenericTableEntity;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.GenericTable;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
@@ -487,6 +491,19 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
}
private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ if (isGenericLakehouseCatalog(catalogIdent)) {
+ try {
+ GenericTableEntity tableEntity = store.get(ident, TABLE,
GenericTableEntity.class);
+ if (tableEntity != null) {
+ GenericTable genericTable = tableEntity.toGenericTable();
+ return EntityCombinedTable.of(genericTable).withImported(true);
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to load table entity " + ident,
ioe);
+ }
+ }
+
NameIdentifier catalogIdentifier = getCatalogIdentifier(ident);
Table table =
doWithCatalog(
@@ -597,18 +614,46 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
.mapToObj(i -> ColumnEntity.toColumnEntity(columns[i], i,
idGenerator.nextId(), audit))
.collect(Collectors.toList());
- TableEntity tableEntity =
- TableEntity.builder()
- .withId(uid)
- .withName(ident.name())
- .withNamespace(ident.namespace())
- .withColumns(columnEntityList)
- .withAuditInfo(audit)
- .build();
+ TableEntity tableEntity;
+ if (isGenericLakehouseCatalog(catalogIdent)) {
+ // For generic lakehouse catalog, we only create the table entity with
basic info.
+ GenericTable genericTable = (GenericTable) table;
+ tableEntity =
+ GenericTableEntity.getBuilder()
+ .withId(uid)
+ .withName(ident.name())
+ .withNamespace(ident.namespace())
+ .withFormat(genericTable.format())
+ .withAuditInfo(audit)
+ .withColumns(columnEntityList)
+ .withIndexes(table.index())
+ .withDistribution(table.distribution())
+ .withFormat(genericTable.format())
+ .withPartitions(table.partitioning())
+ .withSortOrder(table.sortOrder())
+ .withProperties(genericTable.properties())
+ .withComment(genericTable.comment())
+ .build();
+ } else {
+ tableEntity =
+ TableEntity.builder()
+ .withId(uid)
+ .withName(ident.name())
+ .withNamespace(ident.namespace())
+ .withColumns(columnEntityList)
+ .withAuditInfo(audit)
+ .build();
+ }
try {
store.put(tableEntity, true /* overwrite */);
} catch (Exception e) {
+ if (isGenericLakehouseCatalog(catalogIdent)) {
+ // Drop table
+ doWithCatalog(
+ catalogIdent, c -> c.doWithTableOps(t -> t.dropTable(ident)),
RuntimeException.class);
+ }
+
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
.withHiddenProperties(
@@ -616,6 +661,7 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
}
+ // For managed table, we can use table entity to indicate the table is
created successfully.
return EntityCombinedTable.of(table, tableEntity)
.withHiddenProperties(
getHiddenPropertyNames(
@@ -630,6 +676,18 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
.collect(Collectors.toList());
}
+ private boolean isGenericLakehouseCatalog(NameIdentifier catalogIdent) {
+ CatalogManager catalogManager =
GravitinoEnv.getInstance().catalogManager();
+ try {
+ Catalog catalog = catalogManager.loadCatalog(catalogIdent);
+ return catalog.type() == Catalog.Type.RELATIONAL
+ && catalog.provider().equals("generic-lakehouse");
+ } catch (NoSuchEntityException e) {
+ LOG.warn("Catalog not found: {}", catalogIdent, e);
+ return false;
+ }
+ }
+
private boolean isSameColumn(Column left, int columnPosition, ColumnEntity
right) {
return Objects.equal(left.name(), right.name())
&& columnPosition == right.position()
diff --git
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
new file mode 100644
index 0000000000..b84b265256
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseColumn.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.connector;
+
+import org.apache.gravitino.tag.SupportsTags;
+
+public class GenericLakehouseColumn extends BaseColumn {
+ @Override
+ public SupportsTags supportsTags() {
+ return super.supportsTags();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder extends BaseColumnBuilder<Builder,
GenericLakehouseColumn> {
+
+ /** Creates a new instance of {@link Builder}. */
+ private Builder() {}
+
+ /**
+ * Internal method to build a HiveColumn instance using the provided
values.
+ *
+ * @return A new HiveColumn instance with the configured values.
+ */
+ @Override
+ protected GenericLakehouseColumn internalBuild() {
+ GenericLakehouseColumn hiveColumn = new GenericLakehouseColumn();
+
+ hiveColumn.name = name;
+ hiveColumn.comment = comment;
+ hiveColumn.dataType = dataType;
+ hiveColumn.nullable = nullable;
+ hiveColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET :
defaultValue;
+ return hiveColumn;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
new file mode 100644
index 0000000000..a9379a5b31
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/connector/GenericLakehouseTable.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.connector;
+
+import org.apache.gravitino.rel.GenericTable;
+
+public class GenericLakehouseTable extends BaseTable implements GenericTable {
+ @SuppressWarnings("unused")
+ private String schemaName;
+
+ private String format;
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public String format() {
+ return format;
+ }
+
+ @Override
+ public String location() {
+ return properties.get("location");
+ }
+
+ @Override
+ public boolean external() {
+ return properties.get("external") != null &&
Boolean.parseBoolean(properties.get("external"));
+ }
+
+ @Override
+ protected TableOperations newOps() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ public static class Builder extends BaseTableBuilder<Builder,
GenericLakehouseTable> {
+
+ private String schemaName;
+ private String format;
+
+ public Builder withSchemaName(String schemaName) {
+ this.schemaName = schemaName;
+ return this;
+ }
+
+ public Builder withFormat(String format) {
+ this.format = format;
+ return this;
+ }
+
+ @Override
+ protected GenericLakehouseTable internalBuild() {
+ GenericLakehouseTable genericLakehouseTable = new
GenericLakehouseTable();
+ genericLakehouseTable.schemaName = this.schemaName;
+ genericLakehouseTable.format = this.format;
+ genericLakehouseTable.columns = this.columns;
+ genericLakehouseTable.comment = this.comment;
+ genericLakehouseTable.properties = this.properties;
+ genericLakehouseTable.auditInfo = this.auditInfo;
+ genericLakehouseTable.distribution = this.distribution;
+ genericLakehouseTable.indexes = this.indexes;
+ genericLakehouseTable.name = this.name;
+ genericLakehouseTable.partitioning = this.partitioning;
+ genericLakehouseTable.sortOrders = this.sortOrders;
+ return genericLakehouseTable;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
b/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
new file mode 100644
index 0000000000..4b2dd9ad03
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/GenericTableEntity.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.meta;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import lombok.Getter;
+import org.apache.gravitino.Field;
+import org.apache.gravitino.connector.GenericLakehouseColumn;
+import org.apache.gravitino.connector.GenericLakehouseTable;
+import org.apache.gravitino.rel.GenericTable;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.indexes.Index;
+
+@Getter
+public class GenericTableEntity extends TableEntity {
+ public static final Field FORMAT = Field.required("format", Long.class, "The
table's format");
+ public static final Field PROPERTIES =
+ Field.optional("properties", Map.class, "The table's properties");
+
+ public static final Field PARTITIONS =
+ Field.optional("partitions", Transform[].class, "The table's partition");
+
+ public static final Field SORT_ORDER =
+ Field.optional("sortOrders", SortOrder[].class, "The table's sort
order");
+
+ public static final Field DISTRIBUTION =
+ Field.optional("distribution", Distribution.class, "The table's
distribution");
+
+ public static final Field INDEXES =
+ Field.optional("indexes", Index[].class, "The table's indexes");
+
+ public static final Field COMMENT =
+ Field.optional("comment", String.class, "The table's comment");
+
+ public GenericTableEntity() {
+ super();
+ }
+
+ @Override
+ public Map<Field, Object> fields() {
+ Map<Field, Object> superFields = super.fields();
+ Map<Field, Object> result = Maps.newHashMap(superFields);
+ result.put(FORMAT, format);
+ result.put(PROPERTIES, properties);
+ result.put(PARTITIONS, partitions);
+ result.put(SORT_ORDER, sortOrder);
+ result.put(DISTRIBUTION, distribution);
+ result.put(INDEXES, indexes);
+ result.put(COMMENT, comment);
+
+ return result;
+ }
+
+ private String format;
+ @Getter private Map<String, String> properties;
+ private Transform[] partitions;
+ private SortOrder[] sortOrder;
+ private Distribution distribution;
+ private Index[] indexes;
+ private String comment;
+
+ public static class Builder {
+ private final GenericTableEntity tableEntity;
+
+ public Builder() {
+ this.tableEntity = new GenericTableEntity();
+ }
+
+ public Builder withId(Long id) {
+ tableEntity.id = id;
+ return this;
+ }
+
+ public Builder withName(String name) {
+ tableEntity.name = name;
+ return this;
+ }
+
+ public Builder withAuditInfo(AuditInfo auditInfo) {
+ tableEntity.auditInfo = auditInfo;
+ return this;
+ }
+
+ public Builder withColumns(java.util.List<ColumnEntity> columns) {
+ tableEntity.columns = columns;
+ return this;
+ }
+
+ public Builder withNamespace(org.apache.gravitino.Namespace namespace) {
+ tableEntity.namespace = namespace;
+ return this;
+ }
+
+ public Builder withFormat(String format) {
+ tableEntity.format = format;
+ return this;
+ }
+
+ public Builder withProperties(Map<String, String> properties) {
+ tableEntity.properties = properties;
+ return this;
+ }
+
+ public Builder withPartitions(Transform[] partitions) {
+ tableEntity.partitions = partitions;
+ return this;
+ }
+
+ public Builder withSortOrder(SortOrder[] sortOrder) {
+ tableEntity.sortOrder = sortOrder;
+ return this;
+ }
+
+ public Builder withDistribution(Distribution distribution) {
+ tableEntity.distribution = distribution;
+ return this;
+ }
+
+ public Builder withIndexes(Index[] indexes) {
+ tableEntity.indexes = indexes;
+ return this;
+ }
+
+ public Builder withComment(String comment) {
+ tableEntity.comment = comment;
+ return this;
+ }
+
+ public GenericTableEntity build() {
+ return tableEntity;
+ }
+ }
+
+ public static GenericTableEntity.Builder getBuilder() {
+ return new GenericTableEntity.Builder();
+ }
+
+ public GenericTable toGenericTable() {
+ return GenericLakehouseTable.builder()
+ .withFormat(format)
+ .withProperties(properties)
+ .withAuditInfo(auditInfo)
+ .withSortOrders(sortOrder)
+ .withPartitioning(partitions)
+ .withDistribution(distribution)
+ .withColumns(
+ columns.stream()
+ .map(this::toGenericLakehouseColumn)
+ .toArray(GenericLakehouseColumn[]::new))
+ .withIndexes(indexes)
+ .withName(name)
+ .withComment(comment)
+ .build();
+ }
+
+ private GenericLakehouseColumn toGenericLakehouseColumn(ColumnEntity
columnEntity) {
+ return GenericLakehouseColumn.builder()
+ .withName(columnEntity.name())
+ .withComment(columnEntity.comment())
+ .withAutoIncrement(columnEntity.autoIncrement())
+ .withNullable(columnEntity.nullable())
+ .withType(columnEntity.dataType())
+ .withDefaultValue(columnEntity.defaultValue())
+ .build();
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
index 9d15be7df6..595defed08 100644
--- a/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/TableEntity.java
@@ -42,15 +42,15 @@ public class TableEntity implements Entity, Auditable,
HasIdentifier {
public static final Field COLUMNS =
Field.optional("columns", List.class, "The columns of the table");
- private Long id;
+ protected Long id;
- private String name;
+ protected String name;
- private AuditInfo auditInfo;
+ protected AuditInfo auditInfo;
- private Namespace namespace;
+ protected Namespace namespace;
- private List<ColumnEntity> columns;
+ protected List<ColumnEntity> columns;
/**
* Returns a map of the fields and their corresponding values for this table.
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
new file mode 100644
index 0000000000..a723c3db4a
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper;
+
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+
+public interface TableVersionMapper {
+ String TABLE_NAME = "table_version_info";
+
+ @InsertProvider(type = TableVersionSQLProviderFactory.class, method =
"insertTableVersion")
+ void insertTableVersion(@Param("tablePO") TablePO tablePO);
+
+ @InsertProvider(
+ type = TableVersionSQLProviderFactory.class,
+ method = "insertTableVersionOnDuplicateKeyUpdate")
+ void insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") TablePO
tablePO);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
new file mode 100644
index 0000000000..ab27353c00
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableVersionSQLProviderFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.TableVersionBaseSQLProvider;
+import
org.apache.gravitino.storage.relational.mapper.provider.postgresql.TableVersionPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class TableVersionSQLProviderFactory {
+
+ private static final Map<JDBCBackendType, TableVersionBaseSQLProvider>
+ TABLE_VERSION_SQL_PROVIDER_MAP =
+ ImmutableMap.of(
+ JDBCBackendType.MYSQL, new
TableVersionSQLProviderFactory.TableVersionMySQLProvider(),
+ JDBCBackendType.H2, new
TableVersionSQLProviderFactory.TableVersionH2Provider(),
+ JDBCBackendType.POSTGRESQL, new
TableVersionPostgreSQLProvider());
+
+ public static TableVersionBaseSQLProvider getProvider() {
+ String databaseId =
+ SqlSessionFactoryHelper.getInstance()
+ .getSqlSessionFactory()
+ .getConfiguration()
+ .getDatabaseId();
+
+ JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+ return TABLE_VERSION_SQL_PROVIDER_MAP.get(jdbcBackendType);
+ }
+
+ static class TableVersionMySQLProvider extends TableVersionBaseSQLProvider {}
+
+ static class TableVersionH2Provider extends TableVersionBaseSQLProvider {}
+
+ public static String insertTableVersion(@Param("tablePO") TablePO tablePO) {
+ return getProvider().insertTableVersion(tablePO);
+ }
+
+ public static String
insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO") TablePO tablePO) {
+ return getProvider().insertTableVersionOnDuplicateKeyUpdate(tablePO);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
index f214bd1962..aaf22ccda8 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/DefaultMapperPackageProvider.java
@@ -41,6 +41,7 @@ import
org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
import org.apache.gravitino.storage.relational.mapper.TagMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
@@ -78,6 +79,7 @@ public class DefaultMapperPackageProvider implements
MapperPackageProvider {
TagMetaMapper.class,
TopicMetaMapper.class,
UserMetaMapper.class,
- UserRoleRelMapper.class);
+ UserRoleRelMapper.class,
+ TableVersionMapper.class);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
index 9360e2c354..8065476a61 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
@@ -21,20 +21,29 @@ package
org.apache.gravitino.storage.relational.mapper.provider.base;
import static
org.apache.gravitino.storage.relational.mapper.TableMetaMapper.TABLE_NAME;
import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
import org.apache.gravitino.storage.relational.po.TablePO;
import org.apache.ibatis.annotations.Param;
public class TableMetaBaseSQLProvider {
public String listTablePOsBySchemaId(@Param("schemaId") Long schemaId) {
- return "SELECT table_id as tableId, table_name as tableName,"
- + " metalake_id as metalakeId, catalog_id as catalogId,"
- + " schema_id as schemaId, audit_info as auditInfo,"
- + " current_version as currentVersion, last_version as lastVersion,"
- + " deleted_at as deletedAt"
+ return "SELECT tm.table_id as tableId, tm.table_name as tableName,"
+ + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId,"
+ + " tm.schema_id as schemaId, tm.audit_info as auditInfo,"
+ + " tm.current_version as currentVersion, tm.last_version as
lastVersion,"
+ + " tm.deleted_at as deletedAt,"
+ + " tv.format as format, "
+ + " tv.properties as properties,"
+ + " tv.partitioning as partitions, tv.sort_orders as sortOrders,"
+ + " tv.distribution as distribution, tv.indexes as indexes,"
+ + " tv.comment as comment"
+ " FROM "
+ TABLE_NAME
- + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+ + " tm LEFT JOIN "
+ + TableVersionMapper.TABLE_NAME
+ + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version AND tv.deleted_at = 0"
+ + " WHERE tm.schema_id = #{schemaId} AND tm.deleted_at = 0";
}
public String listTablePOsByTableIds(List<Long> tableIds) {
@@ -65,14 +74,22 @@ public class TableMetaBaseSQLProvider {
public String selectTableMetaBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("tableName") String name) {
- return "SELECT table_id as tableId, table_name as tableName,"
- + " metalake_id as metalakeId, catalog_id as catalogId,"
- + " schema_id as schemaId, audit_info as auditInfo,"
- + " current_version as currentVersion, last_version as lastVersion,"
- + " deleted_at as deletedAt"
+ return "SELECT tm.table_id as tableId, tm.table_name as tableName,"
+ + " tm.metalake_id as metalakeId, tm.catalog_id as catalogId,"
+ + " tm.schema_id as schemaId, tm.audit_info as auditInfo,"
+ + " tm.current_version as currentVersion, tm.last_version as
lastVersion,"
+ + " tm.deleted_at as deletedAt,"
+ + " tv.format as format, "
+ + " tv.properties as properties,"
+ + " tv.partitioning as partitions, tv.sort_orders as sortOrders,"
+ + " tv.distribution as distribution, tv.indexes as indexes,"
+ + " tv.comment as comment"
+ " FROM "
+ TABLE_NAME
- + " WHERE schema_id = #{schemaId} AND table_name = #{tableName} AND
deleted_at = 0";
+ + " tm LEFT JOIN "
+ + TableVersionMapper.TABLE_NAME
+ + " tv ON tm.table_id = tv.table_id AND tm.current_version =
tv.version AND tv.deleted_at = 0"
+ + " WHERE tm.schema_id = #{schemaId} AND tm.table_name = #{tableName}
AND tm.deleted_at = 0";
}
public String selectTableMetaById(@Param("tableId") Long tableId) {
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
new file mode 100644
index 0000000000..3501abe10c
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableVersionBaseSQLProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static
org.apache.gravitino.storage.relational.mapper.TableVersionMapper.TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.po.TablePO;
+import org.apache.ibatis.annotations.Param;
+
+public class TableVersionBaseSQLProvider {
+
+ public String insertTableVersion(@Param("tablePO") TablePO tablePO) {
+ return "INSERT INTO "
+ + TABLE_NAME
+ + " (table_id, format, properties, partitioning"
+ + " distribution, sort_orders, indexes, comment,"
+ + " version, last_version, deleted_at)"
+ + " VALUES ("
+ + " #{tablePO.tableId},"
+ + " #{tablePO.format},"
+ + " #{tablePO.properties},"
+ + " #{tablePO.partitions},"
+ + " #{tablePO.distribution},"
+ + " #{tablePO.sortOrders},"
+ + " #{tablePO.indexes},"
+ + " #{tablePO.comment},"
+ + " #{tablePO.currentVersion},"
+ + " #{tablePO.lastVersion},"
+ + " #{tablePO.deletedAt}"
+ + " )";
+ }
+
+ public String insertTableVersionOnDuplicateKeyUpdate(@Param("tablePO")
TablePO tablePO) {
+ return "INSERT INTO "
+ + TABLE_NAME
+ + " (table_id, format, properties, partitioning,"
+ + " distribution, sort_orders, indexes, comment,"
+ + " version, deleted_at)"
+ + " VALUES ("
+ + " #{tablePO.tableId},"
+ + " #{tablePO.format},"
+ + " #{tablePO.properties},"
+ + " #{tablePO.partitions},"
+ + " #{tablePO.distribution},"
+ + " #{tablePO.sortOrders},"
+ + " #{tablePO.indexes},"
+ + " #{tablePO.comment},"
+ + " #{tablePO.currentVersion},"
+ + " #{tablePO.deletedAt}"
+ + " )"
+ + " ON DUPLICATE KEY UPDATE"
+ + " format = #{tablePO.format},"
+ + " properties = #{tablePO.properties},"
+ + " partitioning = #{tablePO.partitions},"
+ + " distribution = #{tablePO.distribution},"
+ + " sort_orders = #{tablePO.sortOrders},"
+ + " indexes = #{tablePO.indexes},"
+ + " comment = #{tablePO.comment},"
+ + " version = #{tablePO.currentVersion},"
+ + " deleted_at = #{tablePO.deletedAt}";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
new file mode 100644
index 0000000000..e0a7413b1c
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/TableVersionPostgreSQLProvider.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import
org.apache.gravitino.storage.relational.mapper.provider.base.TableVersionBaseSQLProvider;
+
+public class TableVersionPostgreSQLProvider extends
TableVersionBaseSQLProvider {}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
b/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
index 693105e772..56fea38337 100644
--- a/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
+++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/TablePO.java
@@ -20,7 +20,9 @@ package org.apache.gravitino.storage.relational.po;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import lombok.Getter;
+@Getter
public class TablePO {
private Long tableId;
private String tableName;
@@ -32,6 +34,15 @@ public class TablePO {
private Long lastVersion;
private Long deletedAt;
+ private String format;
+
+ private String properties;
+ private String partitions;
+ private String sortOrders;
+ private String distribution;
+ private String indexes;
+ private String comment;
+
public Long getTableId() {
return tableId;
}
@@ -154,6 +165,41 @@ public class TablePO {
return this;
}
+ public Builder withFormat(String format) {
+ tablePO.format = format;
+ return this;
+ }
+
+ public Builder withProperties(String properties) {
+ tablePO.properties = properties;
+ return this;
+ }
+
+ public Builder withPartitions(String partitions) {
+ tablePO.partitions = partitions;
+ return this;
+ }
+
+ public Builder withSortOrders(String sortOrders) {
+ tablePO.sortOrders = sortOrders;
+ return this;
+ }
+
+ public Builder withDistribution(String distribution) {
+ tablePO.distribution = distribution;
+ return this;
+ }
+
+ public Builder withIndexes(String indexes) {
+ tablePO.indexes = indexes;
+ return this;
+ }
+
+ public Builder withComment(String comment) {
+ tablePO.comment = comment;
+ return this;
+ }
+
private void validate() {
Preconditions.checkArgument(tablePO.tableId != null, "Table id is
required");
Preconditions.checkArgument(tablePO.tableName != null, "Table name is
required");
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index 326ba63b5f..f4bbf7a6f6 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -40,6 +40,7 @@ import
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMap
import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableVersionMapper;
import
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.po.ColumnPO;
import org.apache.gravitino.storage.relational.po.TablePO;
@@ -118,12 +119,12 @@ public class TableMetaService {
fillTablePOBuilderParentEntityId(builder, tableEntity.namespace());
AtomicReference<TablePO> tablePORef = new AtomicReference<>();
+ TablePO po = POConverters.initializeTablePOWithVersion(tableEntity,
builder);
SessionUtils.doMultipleWithCommit(
() ->
SessionUtils.doWithoutCommit(
TableMetaMapper.class,
mapper -> {
- TablePO po =
POConverters.initializeTablePOWithVersion(tableEntity, builder);
tablePORef.set(po);
if (overwrite) {
mapper.insertTableMetaOnDuplicateKeyUpdate(po);
@@ -131,6 +132,18 @@ public class TableMetaService {
mapper.insertTableMeta(po);
}
}),
+ () ->
+ SessionUtils.doWithCommit(
+ TableVersionMapper.class,
+ mapper -> {
+ if (po.getFormat() != null) {
+ if (overwrite) {
+ mapper.insertTableVersionOnDuplicateKeyUpdate(po);
+ } else {
+ mapper.insertTableVersion(po);
+ }
+ }
+ }),
() -> {
// We need to delete the columns first if we want to overwrite the
table.
if (overwrite) {
@@ -292,7 +305,6 @@ public class TableMetaService {
SessionUtils.getWithoutCommit(
TableMetaMapper.class,
mapper -> mapper.selectTableMetaBySchemaIdAndName(schemaId,
tableName));
-
if (tablePO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 127cb022e8..62bc11f891 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -45,6 +45,7 @@ import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.meta.ColumnEntity;
import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.GenericTableEntity;
import org.apache.gravitino.meta.GroupEntity;
import org.apache.gravitino.meta.ModelEntity;
import org.apache.gravitino.meta.ModelVersionEntity;
@@ -60,6 +61,7 @@ import org.apache.gravitino.policy.Policy;
import org.apache.gravitino.policy.PolicyContent;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.expressions.Expression;
+import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
import org.apache.gravitino.rel.types.Type;
import org.apache.gravitino.storage.relational.po.CatalogPO;
import org.apache.gravitino.storage.relational.po.ColumnPO;
@@ -390,14 +392,44 @@ public class POConverters {
public static TablePO initializeTablePOWithVersion(
TableEntity tableEntity, TablePO.Builder builder) {
try {
- return builder
+ builder
.withTableId(tableEntity.id())
.withTableName(tableEntity.name())
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(tableEntity.auditInfo()))
.withCurrentVersion(INIT_VERSION)
.withLastVersion(INIT_VERSION)
- .withDeletedAt(DEFAULT_DELETED_AT)
- .build();
+ .withDeletedAt(DEFAULT_DELETED_AT);
+
+ if (tableEntity instanceof GenericTableEntity genericTable) {
+ builder.withFormat(genericTable.getFormat());
+ builder.withComment(genericTable.getComment());
+ builder.withProperties(
+ genericTable.getProperties() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
+
+ // TODO store the following information to databases;
+ /**
+ * builder.withDistribution( genericTable.getDistribution() == null ?
null :
+ *
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getDistribution()));
+ * builder.withPartitions( genericTable.getPartitions() == null ? null
:
+ *
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getPartitions()));
+ */
+ builder.withIndexes(
+ genericTable.getIndexes() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getIndexes()));
+ builder.withProperties(
+ genericTable.getProperties() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getProperties()));
+ builder.withSortOrders(
+ genericTable.getSortOrder() == null
+ ? null
+ :
JsonUtils.anyFieldMapper().writeValueAsString(genericTable.getSortOrder()));
+ }
+
+ return builder.build();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize json object:", e);
}
@@ -455,6 +487,29 @@ public class POConverters {
public static TableEntity fromTableAndColumnPOs(
TablePO tablePO, List<ColumnPO> columnPOs, Namespace namespace) {
try {
+ if (tablePO.getFormat() != null) {
+ return GenericTableEntity.getBuilder()
+ .withId(tablePO.getTableId())
+ .withName(tablePO.getTableName())
+ .withNamespace(namespace)
+ .withColumns(fromColumnPOs(columnPOs))
+ .withAuditInfo(
+ JsonUtils.anyFieldMapper().readValue(tablePO.getAuditInfo(),
AuditInfo.class))
+ // TODO add field partition, distribution and sort order;
+ .withIndexes(
+ StringUtils.isBlank(tablePO.getIndexes())
+ ? null
+ :
JsonUtils.anyFieldMapper().readValue(tablePO.getIndexes(), IndexImpl[].class))
+ .withFormat(tablePO.getFormat())
+ .withComment(tablePO.getComment())
+ .withProperties(
+ StringUtils.isBlank(tablePO.getProperties())
+ ? null
+ :
JsonUtils.anyFieldMapper().readValue(tablePO.getProperties(), Map.class))
+ .withColumns(fromColumnPOs(columnPOs))
+ .build();
+ }
+
return TableEntity.builder()
.withId(tablePO.getTableId())
.withName(tablePO.getTableName())
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
index 752d89533d..0482bfecfd 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/SessionUtils.java
@@ -106,4 +106,16 @@ public class SessionUtils {
throw e;
}
}
+
+ public static void beginTransaction() {
+ SqlSessions.getSqlSession();
+ }
+
+ public static void commitTransaction() {
+ SqlSessions.commitAndCloseSqlSession();
+ }
+
+ public static void rollbackTransaction() {
+ SqlSessions.rollbackAndCloseSqlSession();
+ }
}