openinx commented on a change in pull request #4221: URL: https://github.com/apache/iceberg/pull/4221#discussion_r817541051
########## File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java ########## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import com.emc.object.s3.S3Exception; +import com.emc.object.s3.S3ObjectMetadata; +import com.emc.object.s3.bean.GetObjectResult; +import com.emc.object.s3.bean.ListObjectsResult; +import com.emc.object.s3.bean.S3Object; +import com.emc.object.s3.request.ListObjectsRequest; +import com.emc.object.s3.request.PutObjectRequest; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.dell.DellClientFactories; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EcsCatalog extends BaseMetastoreCatalog + implements Closeable, SupportsNamespaces, Configurable<Object> { + + /** + * Suffix of table metadata object + */ + private static final String TABLE_OBJECT_SUFFIX = ".table"; + /** + * Suffix of namespace metadata object + */ + private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace"; + /** + * Key of properties version in ECS object user metadata. + */ + private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version"; + + private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class); + + private S3Client client; + private Object hadoopConf; + private String catalogName; + /** + * Warehouse is unified with other catalog that without delimiter. + */ + private String warehouseLocation; + private DellProperties dellProperties; + private PropertiesSerDes propertiesSerDes; + private FileIO fileIO; + + /** + * No-arg constructor to load the catalog dynamically. + * <p> + * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later. + */ + public EcsCatalog() { + } + + @Override + public void initialize(String name, Map<String, String> properties) { + this.catalogName = name; + this.dellProperties = new DellProperties(properties); + this.warehouseLocation = + cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter()); + this.client = DellClientFactories.from(properties).ecsS3(); + this.propertiesSerDes = PropertiesSerDes.current(); + this.fileIO = initializeFileIO(properties); + } + + private String cleanWarehouse(String path, String delimiter) { + Preconditions.checkArgument( + path != null && path.length() > 0, + "Cannot initialize EcsCatalog because warehousePath must not be null"); + int len = path.length(); + if (path.endsWith(delimiter)) { + return path.substring(0, len - delimiter.length()); + } else { + return path; + } + } + + private FileIO initializeFileIO(Map<String, String> properties) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + if (fileIOImpl == null) { + FileIO io = new EcsFileIO(); + io.initialize(properties); + return io; + } else { + return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf); + } + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return new EcsTableOperations(catalogName + "." + tableIdentifier, Review comment: Nit: we usually don't use `+` to concat multiple strings in iceberg, it's recommended to use `String.format`. ########## File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java ########## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import com.emc.object.s3.S3Exception; +import com.emc.object.s3.S3ObjectMetadata; +import com.emc.object.s3.bean.GetObjectResult; +import com.emc.object.s3.bean.ListObjectsResult; +import com.emc.object.s3.bean.S3Object; +import com.emc.object.s3.request.ListObjectsRequest; +import com.emc.object.s3.request.PutObjectRequest; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.dell.DellClientFactories; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EcsCatalog extends BaseMetastoreCatalog + implements Closeable, SupportsNamespaces, Configurable<Object> { + + /** + * Suffix of table metadata object + */ + private static final String TABLE_OBJECT_SUFFIX = ".table"; Review comment: Please leave an empty line for the separate variable definitions. ########## File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java ########## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import com.emc.object.s3.S3Exception; +import com.emc.object.s3.S3ObjectMetadata; +import com.emc.object.s3.bean.GetObjectResult; +import com.emc.object.s3.bean.ListObjectsResult; +import com.emc.object.s3.bean.S3Object; +import com.emc.object.s3.request.ListObjectsRequest; +import com.emc.object.s3.request.PutObjectRequest; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.dell.DellClientFactories; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EcsCatalog extends BaseMetastoreCatalog + implements Closeable, SupportsNamespaces, Configurable<Object> { + + /** + * Suffix of table metadata object + */ + private static final String TABLE_OBJECT_SUFFIX = ".table"; + /** + * Suffix of namespace metadata object + */ + private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace"; + /** + * Key of properties version in ECS object user metadata. + */ + private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version"; + + private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class); + + private S3Client client; + private Object hadoopConf; + private String catalogName; + /** + * Warehouse is unified with other catalog that without delimiter. + */ + private String warehouseLocation; + private DellProperties dellProperties; + private PropertiesSerDes propertiesSerDes; + private FileIO fileIO; + + /** + * No-arg constructor to load the catalog dynamically. + * <p> + * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later. + */ + public EcsCatalog() { + } + + @Override + public void initialize(String name, Map<String, String> properties) { + this.catalogName = name; + this.dellProperties = new DellProperties(properties); + this.warehouseLocation = + cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter()); + this.client = DellClientFactories.from(properties).ecsS3(); + this.propertiesSerDes = PropertiesSerDes.current(); + this.fileIO = initializeFileIO(properties); + } + + private String cleanWarehouse(String path, String delimiter) { + Preconditions.checkArgument( + path != null && path.length() > 0, + "Cannot initialize EcsCatalog because warehousePath must not be null"); + int len = path.length(); + if (path.endsWith(delimiter)) { + return path.substring(0, len - delimiter.length()); + } else { + return path; + } + } + + private FileIO initializeFileIO(Map<String, String> properties) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + if (fileIOImpl == null) { + FileIO io = new EcsFileIO(); + io.initialize(properties); + return io; + } else { + return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf); + } + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return new EcsTableOperations(catalogName + "." + tableIdentifier, + tableURI(tableIdentifier), fileIO, this); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + StringBuilder builder = new StringBuilder(); + builder.append(warehouseLocation); + for (String level : tableIdentifier.namespace().levels()) { Review comment: All catalogs are following the same default location style (saying `<warehouse>/<database>.db/<table>` ), Why do we need to introduce a new naming style to define the default location ? https://github.com/apache/iceberg/blob/9d9bab1f27c2c0b22d0024766bd062011f0817a4/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java#L186-L190 ########## File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java ########## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import com.emc.object.s3.S3Exception; +import com.emc.object.s3.S3ObjectMetadata; +import com.emc.object.s3.bean.GetObjectResult; +import com.emc.object.s3.bean.ListObjectsResult; +import com.emc.object.s3.bean.S3Object; +import com.emc.object.s3.request.ListObjectsRequest; +import com.emc.object.s3.request.PutObjectRequest; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.dell.DellClientFactories; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EcsCatalog extends BaseMetastoreCatalog + implements Closeable, SupportsNamespaces, Configurable<Object> { + + /** + * Suffix of table metadata object + */ + private static final String TABLE_OBJECT_SUFFIX = ".table"; + /** + * Suffix of namespace metadata object + */ + private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace"; + /** + * Key of properties version in ECS object user metadata. + */ + private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version"; + + private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class); + + private S3Client client; + private Object hadoopConf; + private String catalogName; + /** + * Warehouse is unified with other catalog that without delimiter. + */ + private String warehouseLocation; + private DellProperties dellProperties; + private PropertiesSerDes propertiesSerDes; + private FileIO fileIO; + + /** + * No-arg constructor to load the catalog dynamically. + * <p> + * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later. + */ + public EcsCatalog() { + } + + @Override + public void initialize(String name, Map<String, String> properties) { + this.catalogName = name; + this.dellProperties = new DellProperties(properties); + this.warehouseLocation = + cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter()); + this.client = DellClientFactories.from(properties).ecsS3(); + this.propertiesSerDes = PropertiesSerDes.current(); + this.fileIO = initializeFileIO(properties); + } + + private String cleanWarehouse(String path, String delimiter) { + Preconditions.checkArgument( + path != null && path.length() > 0, + "Cannot initialize EcsCatalog because warehousePath must not be null"); + int len = path.length(); + if (path.endsWith(delimiter)) { + return path.substring(0, len - delimiter.length()); + } else { + return path; + } + } + + private FileIO initializeFileIO(Map<String, String> properties) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + if (fileIOImpl == null) { + FileIO io = new EcsFileIO(); + io.initialize(properties); + return io; + } else { + return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf); + } + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return new EcsTableOperations(catalogName + "." + tableIdentifier, + tableURI(tableIdentifier), fileIO, this); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + StringBuilder builder = new StringBuilder(); + builder.append(warehouseLocation); + for (String level : tableIdentifier.namespace().levels()) { + builder.append(dellProperties.ecsCatalogDelimiter()); + builder.append(level); + } + + builder.append(dellProperties.ecsCatalogDelimiter()); + builder.append(tableIdentifier.name()); + return builder.toString(); + } + + /** + * Iterate all table objects with the namespace prefix. + */ + @Override + public List<TableIdentifier> listTables(Namespace namespace) { + if (!namespace.isEmpty() && !namespaceExists(namespace)) { + throw new NoSuchNamespaceException("Namespace %s does not exist", namespace); + } + + String marker = null; + List<TableIdentifier> results = Lists.newArrayList(); + EcsURI prefix = namespacePrefix(namespace); + do { + ListObjectsResult listObjectsResult = client.listObjects( + new ListObjectsRequest(prefix.bucket()) + .withDelimiter(dellProperties.ecsCatalogDelimiter()) + .withPrefix(prefix.name()) + .withMarker(marker)); + marker = listObjectsResult.getNextMarker(); + results.addAll(listObjectsResult.getObjects().stream() + .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX)) + .map(object -> parseTableId(namespace, prefix, object)) + .collect(Collectors.toList())); + } while (marker != null); Review comment: Why not just following the `HadoopCatalog` to list tables based on the inherited path ? ########## File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java ########## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import com.emc.object.s3.S3Exception; +import com.emc.object.s3.S3ObjectMetadata; +import com.emc.object.s3.bean.GetObjectResult; +import com.emc.object.s3.bean.ListObjectsResult; +import com.emc.object.s3.bean.S3Object; +import com.emc.object.s3.request.ListObjectsRequest; +import com.emc.object.s3.request.PutObjectRequest; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.dell.DellClientFactories; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EcsCatalog extends BaseMetastoreCatalog + implements Closeable, SupportsNamespaces, Configurable<Object> { + + /** + * Suffix of table metadata object + */ + private static final String TABLE_OBJECT_SUFFIX = ".table"; + /** + * Suffix of namespace metadata object + */ + private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace"; + /** + * Key of properties version in ECS object user metadata. + */ + private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version"; + + private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class); + + private S3Client client; + private Object hadoopConf; + private String catalogName; + /** + * Warehouse is unified with other catalog that without delimiter. + */ + private String warehouseLocation; + private DellProperties dellProperties; + private PropertiesSerDes propertiesSerDes; + private FileIO fileIO; + + /** + * No-arg constructor to load the catalog dynamically. + * <p> + * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later. + */ + public EcsCatalog() { + } + + @Override + public void initialize(String name, Map<String, String> properties) { + this.catalogName = name; + this.dellProperties = new DellProperties(properties); + this.warehouseLocation = + cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter()); + this.client = DellClientFactories.from(properties).ecsS3(); + this.propertiesSerDes = PropertiesSerDes.current(); + this.fileIO = initializeFileIO(properties); + } + + private String cleanWarehouse(String path, String delimiter) { + Preconditions.checkArgument( + path != null && path.length() > 0, + "Cannot initialize EcsCatalog because warehousePath must not be null"); + int len = path.length(); + if (path.endsWith(delimiter)) { + return path.substring(0, len - delimiter.length()); + } else { + return path; + } + } + + private FileIO initializeFileIO(Map<String, String> properties) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + if (fileIOImpl == null) { + FileIO io = new EcsFileIO(); + io.initialize(properties); + return io; + } else { + return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf); + } + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return new EcsTableOperations(catalogName + "." + tableIdentifier, + tableURI(tableIdentifier), fileIO, this); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + StringBuilder builder = new StringBuilder(); + builder.append(warehouseLocation); + for (String level : tableIdentifier.namespace().levels()) { + builder.append(dellProperties.ecsCatalogDelimiter()); + builder.append(level); + } + + builder.append(dellProperties.ecsCatalogDelimiter()); + builder.append(tableIdentifier.name()); + return builder.toString(); + } + + /** + * Iterate all table objects with the namespace prefix. + */ + @Override + public List<TableIdentifier> listTables(Namespace namespace) { + if (!namespace.isEmpty() && !namespaceExists(namespace)) { + throw new NoSuchNamespaceException("Namespace %s does not exist", namespace); + } + + String marker = null; + List<TableIdentifier> results = Lists.newArrayList(); + EcsURI prefix = namespacePrefix(namespace); + do { + ListObjectsResult listObjectsResult = client.listObjects( + new ListObjectsRequest(prefix.bucket()) + .withDelimiter(dellProperties.ecsCatalogDelimiter()) + .withPrefix(prefix.name()) + .withMarker(marker)); + marker = listObjectsResult.getNextMarker(); + results.addAll(listObjectsResult.getObjects().stream() + .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX)) + .map(object -> parseTableId(namespace, prefix, object)) + .collect(Collectors.toList())); + } while (marker != null); + + LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results); + return results; + } + + /** + * Get object prefix of namespace. + */ + private EcsURI namespacePrefix(Namespace namespace) { + String prefix; + if (namespace.isEmpty()) { + prefix = dellProperties.ecsCatalogPrefix().name(); + } else { + prefix = dellProperties.ecsCatalogPrefix().name() + + String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) + + dellProperties.ecsCatalogDelimiter(); Review comment: Similar issue to the comment in line 144 ########## File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsTableOperations.java ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import java.util.Map; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +public class EcsTableOperations extends BaseMetastoreTableOperations { + + public static final String ICEBERG_METADATA_LOCATION = "iceberg_metadata_location"; + + private final String tableName; + private final FileIO fileIO; + private final EcsCatalog catalog; + private final EcsURI tableObject; + + /** + * Cached E-Tag for CAS commit + * + * @see #doRefresh() when reset this field + * @see #doCommit(TableMetadata, TableMetadata) when use this field + */ + private String eTag; + + public EcsTableOperations(String tableName, EcsURI tableObject, FileIO fileIO, EcsCatalog catalog) { + this.tableName = tableName; + this.tableObject = tableObject; + this.fileIO = fileIO; + this.catalog = catalog; + } + + @Override + protected String tableName() { + return tableName; + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + protected void doRefresh() { + String metadataLocation; + if (!catalog.objectMetadata(tableObject).isPresent()) { + if (currentMetadataLocation() != null) { + throw new NoSuchTableException("Metadata object %s is absent", tableObject); + } else { + metadataLocation = null; + } + } else { + EcsCatalog.Properties metadata = catalog.loadProperties(tableObject); + this.eTag = metadata.eTag(); + metadataLocation = metadata.content().get(ICEBERG_METADATA_LOCATION); + Preconditions.checkNotNull(metadataLocation, + "Can't find location from table metadata %s", tableObject); + } + + refreshFromMetadataLocation(metadataLocation); + } + + @Override + protected void doCommit(TableMetadata base, TableMetadata metadata) { + String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); + if (base == null) { + // create a new table, the metadataKey should be absent + if (!catalog.putNewProperties(tableObject, buildProperties(newMetadataLocation))) { Review comment: Is the Dell EMC's `putObject` atomic ? In this [putNewProperties](https://github.com/apache/iceberg/pull/4221/files#diff-d7308f132665917b84cff30f2688171a11a64e951fd9ed995e3040d0463cda51R465-R471), I think the `putObject` implementation will check the existence of the original key, if the key is present, then it will throw a `PreconditionFailed`, otherwise write the content into the backend storage. The key thing is : the two operations ( check existence of key and write the content ) should be atomic in a single transaction. Otherwise, assume there are two concurrent writers A and B, both A and B pass the existence check, and then try to overwrite the content to the same key. Finally, the result will be unknown because we don't know which one has overwrote the other one. If the atomicity is not be guaranteed, then we can not use this `putObject` to commit the transaction. Instead, we are recommended to use an external table lock service. ########## File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java ########## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.dell.ecs; + +import com.emc.object.s3.S3Client; +import com.emc.object.s3.S3Exception; +import com.emc.object.s3.S3ObjectMetadata; +import com.emc.object.s3.bean.GetObjectResult; +import com.emc.object.s3.bean.ListObjectsResult; +import com.emc.object.s3.bean.S3Object; +import com.emc.object.s3.request.ListObjectsRequest; +import com.emc.object.s3.request.PutObjectRequest; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.dell.DellClientFactories; +import org.apache.iceberg.dell.DellProperties; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EcsCatalog extends BaseMetastoreCatalog + implements Closeable, SupportsNamespaces, Configurable<Object> { + + /** + * Suffix of table metadata object + */ + private static final String TABLE_OBJECT_SUFFIX = ".table"; + /** + * Suffix of namespace metadata object + */ + private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace"; + /** + * Key of properties version in ECS object user metadata. + */ + private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version"; + + private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class); + + private S3Client client; + private Object hadoopConf; + private String catalogName; + /** + * Warehouse is unified with other catalog that without delimiter. + */ + private String warehouseLocation; + private DellProperties dellProperties; + private PropertiesSerDes propertiesSerDes; + private FileIO fileIO; + + /** + * No-arg constructor to load the catalog dynamically. + * <p> + * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later. + */ + public EcsCatalog() { + } + + @Override + public void initialize(String name, Map<String, String> properties) { + this.catalogName = name; + this.dellProperties = new DellProperties(properties); + this.warehouseLocation = + cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter()); + this.client = DellClientFactories.from(properties).ecsS3(); + this.propertiesSerDes = PropertiesSerDes.current(); + this.fileIO = initializeFileIO(properties); + } + + private String cleanWarehouse(String path, String delimiter) { + Preconditions.checkArgument( + path != null && path.length() > 0, + "Cannot initialize EcsCatalog because warehousePath must not be null"); + int len = path.length(); + if (path.endsWith(delimiter)) { + return path.substring(0, len - delimiter.length()); + } else { + return path; + } + } + + private FileIO initializeFileIO(Map<String, String> properties) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + if (fileIOImpl == null) { + FileIO io = new EcsFileIO(); + io.initialize(properties); + return io; + } else { + return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf); + } + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return new EcsTableOperations(catalogName + "." + tableIdentifier, + tableURI(tableIdentifier), fileIO, this); + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + StringBuilder builder = new StringBuilder(); + builder.append(warehouseLocation); + for (String level : tableIdentifier.namespace().levels()) { + builder.append(dellProperties.ecsCatalogDelimiter()); + builder.append(level); + } + + builder.append(dellProperties.ecsCatalogDelimiter()); + builder.append(tableIdentifier.name()); + return builder.toString(); + } + + /** + * Iterate all table objects with the namespace prefix. + */ + @Override + public List<TableIdentifier> listTables(Namespace namespace) { + if (!namespace.isEmpty() && !namespaceExists(namespace)) { + throw new NoSuchNamespaceException("Namespace %s does not exist", namespace); + } + + String marker = null; + List<TableIdentifier> results = Lists.newArrayList(); + EcsURI prefix = namespacePrefix(namespace); + do { + ListObjectsResult listObjectsResult = client.listObjects( + new ListObjectsRequest(prefix.bucket()) + .withDelimiter(dellProperties.ecsCatalogDelimiter()) + .withPrefix(prefix.name()) + .withMarker(marker)); + marker = listObjectsResult.getNextMarker(); + results.addAll(listObjectsResult.getObjects().stream() + .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX)) + .map(object -> parseTableId(namespace, prefix, object)) + .collect(Collectors.toList())); + } while (marker != null); + + LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results); + return results; + } + + /** + * Get object prefix of namespace. + */ + private EcsURI namespacePrefix(Namespace namespace) { + String prefix; + if (namespace.isEmpty()) { + prefix = dellProperties.ecsCatalogPrefix().name(); + } else { + prefix = dellProperties.ecsCatalogPrefix().name() + + String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) + + dellProperties.ecsCatalogDelimiter(); + } + + return new EcsURI(dellProperties.ecsCatalogPrefix().bucket(), prefix); + } + + private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) { + String key = s3Object.getKey(); + Preconditions.checkArgument(key.startsWith(prefix.name()), + "List result should have same prefix", key, prefix); + + String tableName = key.substring( + prefix.name().length(), + key.length() - TABLE_OBJECT_SUFFIX.length()); + return TableIdentifier.of(namespace, tableName); + } + + /** + * Remove table object. If the purge flag is set, remove all data objects. + */ + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + if (!tableExists(identifier)) { + throw new NoSuchTableException("Table %s does not exist", identifier); + } + + EcsURI tableObjectURI = tableURI(identifier); + if (purge) { + // if re-use the same instance, current() will throw exception. + TableOperations ops = newTableOps(identifier); + TableMetadata current = ops.current(); + if (current == null) { + return false; + } + + CatalogUtil.dropTableData(ops.io(), current); + } + + client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name()); + return true; + } + + private EcsURI tableURI(TableIdentifier id) { + EcsURI prefix = namespacePrefix(id.namespace()); + return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX); Review comment: If the table identifier don't have any namespace, then I think we should add a `delimiter` between the `prefix.name` and `id.name`, right ? ########## File path: dell/src/main/java/org/apache/iceberg/dell/DellProperties.java ########## @@ -38,6 +40,21 @@ */ public static final String ECS_S3_ENDPOINT = "ecs.s3.endpoint"; + /** + * Catalog prefix is used to store catalog data. If not set, use {@link CatalogProperties#WAREHOUSE_LOCATION}. + * <p> + * The value is an EcsURI which like ecs://bucket/prefix. + */ + public static final String ECS_CATALOG_PREFIX = "ecs.catalog.prefix"; Review comment: You mean people will want to store their metadata into another separate URI, what's the benefit ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
