This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 3088b77 HIVE-25613: Port Iceberg Hive fixes to the iceberg module (#2721) 3088b77 is described below commit 3088b77ab1d0bd9fd69addf5f859b3c175e28dba Author: pvary <pv...@cloudera.com> AuthorDate: Fri Oct 22 14:13:18 2021 +0200 HIVE-25613: Port Iceberg Hive fixes to the iceberg module (#2721) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Hive: Add table-level JVM lock on commits (#2547) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Core: Move ClientPool and ClientPoolImpl to core (#2491) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Hive: Improve code style (#2641) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Style: Delete blank line of CachedClientPool.java (#2787) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Hive: Fix some message typos in HiveCatalog: Matastore => Metastore (#2950) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Hive: Fix toString NPE with recommended constructor (#3021) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Build: Upgrade to Gradle 7.x (#2826) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Hive: Ensure tableLevelMutex is unlocked when uncommitted metadata delete fails (#3264) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Doc: refactor Hive documentation with catalog loading examples (#2544) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Hive: unify catalog experience across engines (#2565) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Core: Add HadoopConfigurable interface to serialize custom FileIO (#2678) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Move Assert.assertTrue(..) instance checks to AssertJ assertions (#2756) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Core: Support multiple specs in OutputFileFactory (#2858) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - MR: Use SerializableTable in IcebergSplit (#2988) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Fix exception exception message in IcebergInputFormat (#3153) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Data: Fix equality deletes with date/time types (#3135) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Build: Fix ErrorProne UnnecessarilyQualified warnings (#3262) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Build: Fix ErrorProne NewHashMapInt warnings (#3260) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Core: Fail if both Catalog type and catalog-impl are configured (#3162) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - MR: Support imported data in InputFormat using name mapping (#3312) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Hive: Fix Catalog initialization without Configuration (#3252) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Hive: Switch to RetryingHMSClient (allows configuration of retryDelays and retries) (#3099) * HIVE-25613: Port Iceberg Hive fixes to the iceberg module Source Iceberg PR - Hive: Fix Catalogs.hiveCatalog method for default catalogs (#3338) --- iceberg/iceberg-catalog/pom.xml | 6 +- .../org/apache/iceberg/hive/CachedClientPool.java | 16 +- .../java/org/apache/iceberg/hive/HiveCatalog.java | 38 +-- .../org/apache/iceberg/hive/HiveClientPool.java | 32 +-- .../org/apache/iceberg/hive/HiveSchemaUtil.java | 6 +- .../apache/iceberg/hive/HiveTableOperations.java | 40 ++- .../org/apache/iceberg/hive/TestHiveCatalog.java | 30 +++ .../apache/iceberg/hive/TestHiveClientPool.java | 98 ++++++++ .../apache/iceberg/hive/TestHiveCommitLocks.java | 48 +++- iceberg/iceberg-handler/pom.xml | 13 +- .../main/java/org/apache/iceberg/mr/Catalogs.java | 89 ++++--- .../org/apache/iceberg/mr/InputFormatConfig.java | 33 +++ .../org/apache/iceberg/mr/hive/Deserializer.java | 3 +- .../iceberg/mr/hive/HiveIcebergInputFormat.java | 2 +- .../iceberg/mr/hive/HiveIcebergOutputFormat.java | 18 +- .../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 4 +- .../iceberg/mr/mapreduce/IcebergInputFormat.java | 26 +- .../apache/iceberg/mr/mapreduce/IcebergSplit.java | 51 ++-- .../java/org/apache/iceberg/mr/TestCatalogs.java | 189 +++++++++----- .../apache/iceberg/mr/TestIcebergInputFormats.java | 9 +- .../iceberg/mr/TestInputFormatReaderDeletes.java | 2 + .../iceberg/mr/hive/TestHiveIcebergCTAS.java | 11 +- .../iceberg/mr/hive/TestHiveIcebergInserts.java | 6 +- .../iceberg/mr/hive/TestHiveIcebergMigration.java | 28 ++- .../mr/hive/TestHiveIcebergOutputCommitter.java | 16 +- .../iceberg/mr/hive/TestHiveIcebergSelects.java | 7 +- .../iceberg/mr/hive/TestHiveIcebergStatistics.java | 12 +- .../TestHiveIcebergStorageHandlerLocalScan.java | 3 +- .../hive/TestHiveIcebergStorageHandlerNoScan.java | 32 +-- ...eIcebergStorageHandlerWithMultipleCatalogs.java | 8 +- .../org/apache/iceberg/mr/hive/TestTables.java | 30 ++- iceberg/patched-iceberg-core/pom.xml | 18 +- .../main/java/org/apache/iceberg/CatalogUtil.java | 276 +++++++++++++++++++++ .../main/java/org/apache/iceberg}/ClientPool.java | 4 +- .../java/org/apache/iceberg}/ClientPoolImpl.java | 17 +- .../org/apache/iceberg/jdbc/JdbcClientPool.java | 71 ++++++ .../java/org/apache/iceberg/jdbc/JdbcUtil.java | 102 ++++++++ iceberg/pom.xml | 13 + 38 files changed, 1109 insertions(+), 298 deletions(-) diff --git a/iceberg/iceberg-catalog/pom.xml b/iceberg/iceberg-catalog/pom.xml index a3af94d..a425eec 100644 --- a/iceberg/iceberg-catalog/pom.xml +++ b/iceberg/iceberg-catalog/pom.xml @@ -64,6 +64,10 @@ <classifier>tests</classifier> <scope>test</scope> </dependency> - + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java index eca505c..e2dc990 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CachedClientPool.java @@ -25,13 +25,14 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ClientPool; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.util.PropertyUtil; import org.apache.thrift.TException; -public class CachedClientPool implements ClientPool<HiveMetaStoreClient, TException> { +public class CachedClientPool implements ClientPool<IMetaStoreClient, TException> { private static Cache<String, HiveClientPool> clientPoolCache; @@ -40,7 +41,7 @@ public class CachedClientPool implements ClientPool<HiveMetaStoreClient, TExcept private final int clientPoolSize; private final long evictionInterval; - CachedClientPool(Configuration conf, Map<String, String> properties) { + public CachedClientPool(Configuration conf, Map<String, String> properties) { this.conf = conf; this.metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""); this.clientPoolSize = PropertyUtil.propertyAsInt(properties, @@ -57,7 +58,6 @@ public class CachedClientPool implements ClientPool<HiveMetaStoreClient, TExcept return clientPoolCache.get(metastoreUri, k -> new HiveClientPool(clientPoolSize, conf)); } - private synchronized void init() { if (clientPoolCache == null) { clientPoolCache = Caffeine.newBuilder().expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) @@ -72,7 +72,13 @@ public class CachedClientPool implements ClientPool<HiveMetaStoreClient, TExcept } @Override - public <R> R run(Action<R, HiveMetaStoreClient, TException> action) throws TException, InterruptedException { + public <R> R run(Action<R, IMetaStoreClient, TException> action) throws TException, InterruptedException { return clientPool().run(action); } + + @Override + public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry) + throws TException, InterruptedException { + return clientPool().run(action, retry); + } } diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 477e591..880d60d 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -38,6 +38,7 @@ import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.ClientPool; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Namespace; @@ -63,7 +64,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa private String name; private Configuration conf; private FileIO fileIO; - private ClientPool<HiveMetaStoreClient, TException> clients; + private ClientPool<IMetaStoreClient, TException> clients; public HiveCatalog() { } @@ -71,9 +72,9 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa /** * Hive Catalog constructor. * + * @param conf Hadoop Configuration * @deprecated please use the no-arg constructor, setConf and initialize to construct the catalog. Will be removed in * v0.13.0 - * @param conf Hadoop Configuration */ @Deprecated public HiveCatalog(Configuration conf) { @@ -94,6 +95,11 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa @Override public void initialize(String inputName, Map<String, String> properties) { this.name = inputName; + if (conf == null) { + LOG.warn("No Hadoop Configuration was set, using the default environment Configuration"); + this.conf = new Configuration(); + } + if (properties.containsKey(CatalogProperties.URI)) { this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get(CatalogProperties.URI)); } @@ -235,7 +241,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa !namespace.isEmpty(), "Cannot create namespace with invalid name: %s", namespace); Preconditions.checkArgument(isValidateNamespace(namespace), - "Cannot support multi part namespace in Hive MetaStore: %s", namespace); + "Cannot support multi part namespace in Hive Metastore: %s", namespace); try { clients.run(client -> { @@ -250,12 +256,12 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa namespace); } catch (TException e) { - throw new RuntimeException("Failed to create namespace " + namespace + " in Hive MataStore", e); + throw new RuntimeException("Failed to create namespace " + namespace + " in Hive Matastore", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException( - "Interrupted in call to createDatabase(name) " + namespace + " in Hive MataStore", e); + "Interrupted in call to createDatabase(name) " + namespace + " in Hive Matastore", e); } } @@ -268,7 +274,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa return ImmutableList.of(); } try { - List<Namespace> namespaces = clients.run(HiveMetaStoreClient::getAllDatabases) + List<Namespace> namespaces = clients.run(IMetaStoreClient::getAllDatabases) .stream() .map(Namespace::of) .collect(Collectors.toList()); @@ -277,12 +283,12 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa return namespaces; } catch (TException e) { - throw new RuntimeException("Failed to list all namespace: " + namespace + " in Hive MataStore", e); + throw new RuntimeException("Failed to list all namespace: " + namespace + " in Hive Matastore", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException( - "Interrupted in call to getAllDatabases() " + namespace + " in Hive MataStore", e); + "Interrupted in call to getAllDatabases() " + namespace + " in Hive Matastore", e); } } @@ -311,12 +317,12 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa return false; } catch (TException e) { - throw new RuntimeException("Failed to drop namespace " + namespace + " in Hive MataStore", e); + throw new RuntimeException("Failed to drop namespace " + namespace + " in Hive Matastore", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException( - "Interrupted in call to drop dropDatabase(name) " + namespace + " in Hive MataStore", e); + "Interrupted in call to drop dropDatabase(name) " + namespace + " in Hive Matastore", e); } } @@ -362,11 +368,11 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa } catch (TException e) { throw new RuntimeException( - "Failed to list namespace under namespace: " + namespace + " in Hive MataStore", e); + "Failed to list namespace under namespace: " + namespace + " in Hive Matastore", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted in call to getDatabase(name) " + namespace + " in Hive MataStore", e); + throw new RuntimeException("Interrupted in call to getDatabase(name) " + namespace + " in Hive Matastore", e); } } @@ -386,12 +392,12 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace); } catch (TException e) { - throw new RuntimeException("Failed to list namespace under namespace: " + namespace + " in Hive MataStore", e); + throw new RuntimeException("Failed to list namespace under namespace: " + namespace + " in Hive Matastore", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException( - "Interrupted in call to getDatabase(name) " + namespace + " in Hive MataStore", e); + "Interrupted in call to getDatabase(name) " + namespace + " in Hive Matastore", e); } } @@ -514,7 +520,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa public String toString() { return MoreObjects.toStringHelper(this) .add("name", name) - .add("uri", this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname)) + .add("uri", this.conf == null ? "" : this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname)) .toString(); } diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java index ef31e3b..e322792 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveClientPool.java @@ -21,35 +21,39 @@ package org.apache.iceberg.hive; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.ClientPoolImpl; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; -public class HiveClientPool extends ClientPoolImpl<HiveMetaStoreClient, TException> { +public class HiveClientPool extends ClientPoolImpl<IMetaStoreClient, TException> { - // use appropriate ctor depending on whether we're working with Hive2 or Hive3 dependencies - // we need to do this because there is a breaking API change between Hive2 and Hive3 - private static final DynConstructors.Ctor<HiveMetaStoreClient> CLIENT_CTOR = DynConstructors.builder() - .impl(HiveMetaStoreClient.class, HiveConf.class) - .impl(HiveMetaStoreClient.class, Configuration.class) - .build(); + // use appropriate ctor depending on whether we're working with Hive1, Hive2, or Hive3 dependencies + // we need to do this because there is a breaking API change between Hive1, Hive2, and Hive3 + private static final DynMethods.StaticMethod GET_CLIENT = DynMethods.builder("getProxy") + .impl(RetryingMetaStoreClient.class, HiveConf.class) + .impl(RetryingMetaStoreClient.class, HiveConf.class, Boolean.TYPE) + .impl(RetryingMetaStoreClient.class, Configuration.class, Boolean.TYPE) + .buildStatic(); private final HiveConf hiveConf; public HiveClientPool(int poolSize, Configuration conf) { - super(poolSize, TTransportException.class); + // Do not allow retry by default as we rely on RetryingHiveClient + super(poolSize, TTransportException.class, false); this.hiveConf = new HiveConf(conf, HiveClientPool.class); this.hiveConf.addResource(conf); } @Override - protected HiveMetaStoreClient newClient() { + protected IMetaStoreClient newClient() { try { try { - return CLIENT_CTOR.newInstance(hiveConf); + return GET_CLIENT.invoke(hiveConf, true); } catch (RuntimeException e) { // any MetaException would be wrapped into RuntimeException during reflection, so let's double-check type here if (e.getCause() instanceof MetaException) { @@ -71,7 +75,7 @@ public class HiveClientPool extends ClientPoolImpl<HiveMetaStoreClient, TExcepti } @Override - protected HiveMetaStoreClient reconnect(HiveMetaStoreClient client) { + protected IMetaStoreClient reconnect(IMetaStoreClient client) { try { client.close(); client.reconnect(); @@ -88,7 +92,7 @@ public class HiveClientPool extends ClientPoolImpl<HiveMetaStoreClient, TExcepti } @Override - protected void close(HiveMetaStoreClient client) { + protected void close(IMetaStoreClient client) { client.close(); } diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java index e6c6cf5..57123e1 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java @@ -262,6 +262,8 @@ public final class HiveSchemaUtil { case DATE: return "date"; case TIME: + case STRING: + case UUID: return "string"; case TIMESTAMP: Types.TimestampType timestampType = (Types.TimestampType) type; @@ -269,11 +271,7 @@ public final class HiveSchemaUtil { return "timestamp with local time zone"; } return "timestamp"; - case STRING: - case UUID: - return "string"; case FIXED: - return "binary"; case BINARY: return "binary"; case DECIMAL: diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 386d9ff..0ade33f 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -19,6 +19,8 @@ package org.apache.iceberg.hive; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; @@ -28,12 +30,14 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.LockComponent; @@ -48,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.ClientPool; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.TableMetadata; @@ -82,14 +87,18 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms"; private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms"; private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms"; + private static final String HIVE_TABLE_LEVEL_LOCK_EVICT_MS = "iceberg.hive.table-level-lock-evict-ms"; private static final long HIVE_ACQUIRE_LOCK_TIMEOUT_MS_DEFAULT = 3 * 60 * 1000; // 3 minutes private static final long HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT = 50; // 50 milliseconds private static final long HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT = 5 * 1000; // 5 seconds + private static final long HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT = TimeUnit.MINUTES.toMillis(10); private static final DynMethods.UnboundMethod ALTER_TABLE = DynMethods.builder("alter_table") - .impl(HiveMetaStoreClient.class, "alter_table_with_environmentContext", + .impl(IMetaStoreClient.class, "alter_table_with_environmentContext", String.class, String.class, Table.class, EnvironmentContext.class) - .impl(HiveMetaStoreClient.class, "alter_table", + .impl(IMetaStoreClient.class, "alter_table", String.class, String.class, Table.class, EnvironmentContext.class) + .impl(IMetaStoreClient.class, "alter_table", + String.class, String.class, Table.class) .build(); private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = ImmutableBiMap.of( // gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things but with different names @@ -99,6 +108,16 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { // Should be in org.apache.iceberg.hadoop.ConfigProperties, but that is not ported to Hive codebase public static final String KEEP_HIVE_STATS = "iceberg.hive.keep.stats"; + private static Cache<String, ReentrantLock> commitLockCache; + + private static synchronized void initTableLevelLockCache(long evictionTimeout) { + if (commitLockCache == null) { + commitLockCache = Caffeine.newBuilder() + .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS) + .build(); + } + } + /** * Provides key translation where necessary between Iceberg and HMS props. This translation is needed because some * properties control the same behaviour but are named differently in Iceberg and Hive. Therefore changes to these @@ -130,7 +149,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { private final long lockCheckMinWaitTime; private final long lockCheckMaxWaitTime; private final FileIO fileIO; - private final ClientPool<HiveMetaStoreClient, TException> metaClients; + private final ClientPool<IMetaStoreClient, TException> metaClients; protected HiveTableOperations(Configuration conf, ClientPool metaClients, FileIO fileIO, String catalogName, String database, String table) { @@ -146,6 +165,9 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { conf.getLong(HIVE_LOCK_CHECK_MIN_WAIT_MS, HIVE_LOCK_CHECK_MIN_WAIT_MS_DEFAULT); this.lockCheckMaxWaitTime = conf.getLong(HIVE_LOCK_CHECK_MAX_WAIT_MS, HIVE_LOCK_CHECK_MAX_WAIT_MS_DEFAULT); + long tableLevelLockCacheEvictionTimeout = + conf.getLong(HIVE_TABLE_LEVEL_LOCK_EVICT_MS, HIVE_TABLE_LEVEL_LOCK_EVICT_MS_DEFAULT); + initTableLevelLockCache(tableLevelLockCacheEvictionTimeout); } @Override @@ -195,6 +217,10 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { CommitStatus commitStatus = CommitStatus.FAILURE; boolean updateHiveTable = false; Optional<Long> lockId = Optional.empty(); + // getting a process-level lock per table to avoid concurrent commit attempts to the same table from the same + // JVM process, which would result in unnecessary and costly HMS lock acquisition requests + ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true)); + tableLevelMutex.lock(); try { lockId = Optional.of(acquireLock()); // TODO add lock heart beating for cases where default lock timeout is too low. @@ -275,7 +301,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { throw new RuntimeException("Interrupted during commit", e); } finally { - cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId); + cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex); } } @@ -457,7 +483,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { return lockId; } - private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional<Long> lockId) { + private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional<Long> lockId, + ReentrantLock tableLevelMutex) { try { if (commitStatus == CommitStatus.FAILURE) { // If we are sure the commit failed, clean up the uncommitted metadata file @@ -468,6 +495,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { throw e; } finally { unlock(lockId); + tableLevelMutex.unlock(); } } diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index d4741a1..9518540 100644 --- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -38,6 +38,7 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Types; @@ -45,6 +46,7 @@ import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.rules.TemporaryFolder; import static org.apache.iceberg.NullOrder.NULLS_FIRST; @@ -119,6 +121,34 @@ public class TestHiveCatalog extends HiveMetastoreTest { } @Test + public void testInitialize() { + Assertions.assertDoesNotThrow(() -> { + HiveCatalog catalog = new HiveCatalog(); + catalog.initialize("hive", Maps.newHashMap()); + }); + } + + @Test + public void testToStringWithoutSetConf() { + Assertions.assertDoesNotThrow(() -> { + HiveCatalog catalog = new HiveCatalog(); + catalog.toString(); + }); + } + + @Test + public void testInitializeCatalogWithProperties() { + Map<String, String> properties = Maps.newHashMap(); + properties.put("uri", "thrift://examplehost:9083"); + properties.put("warehouse", "/user/hive/testwarehouse"); + HiveCatalog catalog = new HiveCatalog(); + catalog.initialize("hive", properties); + + Assert.assertEquals(catalog.getConf().get("hive.metastore.uris"), "thrift://examplehost:9083"); + Assert.assertEquals(catalog.getConf().get("hive.metastore.warehouse.dir"), "/user/hive/testwarehouse"); + } + + @Test public void testCreateTableTxnBuilder() throws Exception { Schema schema = new Schema( required(1, "id", Types.IntegerType.get(), "unique ID"), diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java index b7870af..36996e3 100644 --- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java +++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveClientPool.java @@ -23,9 +23,23 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.FunctionType; +import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.thrift.transport.TTransportException; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class TestHiveClientPool { @@ -38,6 +52,20 @@ public class TestHiveClientPool { " </property>\n" + "</configuration>\n"; + HiveClientPool clients; + + @Before + public void before() { + HiveClientPool clientPool = new HiveClientPool(2, new Configuration()); + clients = Mockito.spy(clientPool); + } + + @After + public void after() { + clients.close(); + clients = null; + } + @Test public void testConf() { HiveConf conf = createHiveConf(); @@ -65,4 +93,74 @@ public class TestHiveClientPool { } return hiveConf; } + + @Test + public void testNewClientFailure() { + Mockito.doThrow(new RuntimeException("Connection exception")).when(clients).newClient(); + AssertHelpers.assertThrows("Should throw exception", RuntimeException.class, + "Connection exception", () -> clients.run(Object::toString)); + } + + @Test + public void testGetTablesFailsForNonReconnectableException() throws Exception { + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + Mockito.doReturn(hmsClient).when(clients).newClient(); + Mockito.doThrow(new MetaException("Another meta exception")) + .when(hmsClient).getTables(Mockito.anyString(), Mockito.anyString()); + AssertHelpers.assertThrows("Should throw exception", MetaException.class, + "Another meta exception", () -> clients.run(client -> client.getTables("default", "t"))); + } + + @Test + public void testConnectionFailureRestoreForMetaException() throws Exception { + HiveMetaStoreClient hmsClient = newClient(); + + // Throwing an exception may trigger the client to reconnect. + String metaMessage = "Got exception: org.apache.thrift.transport.TTransportException"; + Mockito.doThrow(new MetaException(metaMessage)).when(hmsClient).getAllDatabases(); + + // Create a new client when the reconnect method is called. + HiveMetaStoreClient newClient = reconnect(hmsClient); + + List<String> databases = Lists.newArrayList("db1", "db2"); + + Mockito.doReturn(databases).when(newClient).getAllDatabases(); + // The return is OK when the reconnect method is called. + Assert.assertEquals(databases, clients.run(client -> client.getAllDatabases(), true)); + + // Verify that the method is called. + Mockito.verify(clients).reconnect(hmsClient); + Mockito.verify(clients, Mockito.never()).reconnect(newClient); + } + + @Test + public void testConnectionFailureRestoreForTTransportException() throws Exception { + HiveMetaStoreClient hmsClient = newClient(); + Mockito.doThrow(new TTransportException()).when(hmsClient).getAllFunctions(); + + // Create a new client when getAllFunctions() failed. + HiveMetaStoreClient newClient = reconnect(hmsClient); + + GetAllFunctionsResponse response = new GetAllFunctionsResponse(); + response.addToFunctions( + new Function("concat", "db1", "classname", "root", PrincipalType.USER, 100, FunctionType.JAVA, null)); + Mockito.doReturn(response).when(newClient).getAllFunctions(); + + Assert.assertEquals(response, clients.run(client -> client.getAllFunctions(), true)); + + Mockito.verify(clients).reconnect(hmsClient); + Mockito.verify(clients, Mockito.never()).reconnect(newClient); + } + + private HiveMetaStoreClient newClient() { + HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); + Mockito.doReturn(hmsClient).when(clients).newClient(); + return hmsClient; + } + + private HiveMetaStoreClient reconnect(HiveMetaStoreClient obsoleteClient) { + HiveMetaStoreClient newClient = Mockito.mock(HiveMetaStoreClient.class); + Mockito.doReturn(newClient).when(clients).reconnect(obsoleteClient); + return newClient; + } } diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java index 531e511..3b4bb15 100644 --- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java +++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommitLocks.java @@ -20,9 +20,15 @@ package org.apache.iceberg.hive; import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.iceberg.AssertHelpers; @@ -42,7 +48,11 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TestHiveCommitLocks extends HiveTableBaseTest { @@ -50,8 +60,8 @@ public class TestHiveCommitLocks extends HiveTableBaseTest { private static HiveClientPool spyClientPool = null; private static CachedClientPool spyCachedClientPool = null; private static Configuration overriddenHiveConf = new Configuration(hiveConf); - private static AtomicReference<HiveMetaStoreClient> spyClientRef = new AtomicReference<>(); - private static HiveMetaStoreClient spyClient = null; + private static AtomicReference<IMetaStoreClient> spyClientRef = new AtomicReference<>(); + private static IMetaStoreClient spyClient = null; HiveTableOperations ops = null; TableMetadata metadataV1 = null; TableMetadata metadataV2 = null; @@ -71,12 +81,13 @@ public class TestHiveCommitLocks extends HiveTableBaseTest { // The spy clients are reused between methods and closed at the end of all tests in this class. spyClientPool = spy(new HiveClientPool(1, overriddenHiveConf)); when(spyClientPool.newClient()).thenAnswer(invocation -> { - HiveMetaStoreClient client = (HiveMetaStoreClient) invocation.callRealMethod(); - spyClientRef.set(spy(client)); + // cannot spy on RetryingHiveMetastoreClient as it is a proxy + IMetaStoreClient client = spy(new HiveMetaStoreClient(hiveConf)); + spyClientRef.set(client); return spyClientRef.get(); }); - spyClientPool.run(HiveMetaStoreClient::isLocalMetaStore); // To ensure new client is created. + spyClientPool.run(IMetaStoreClient::isLocalMetaStore); // To ensure new client is created. spyCachedClientPool = spy(new CachedClientPool(hiveConf, Collections.emptyMap())); when(spyCachedClientPool.clientPool()).thenAnswer(invocation -> spyClientPool); @@ -210,4 +221,31 @@ public class TestHiveCommitLocks extends HiveTableBaseTest { "Could not acquire the lock on", () -> spyOps.doCommit(metadataV2, metadataV1)); } + + @Test + public void testTableLevelProcessLockBlocksConcurrentHMSRequestsForSameTable() throws Exception { + int numConcurrentCommits = 10; + // resetting the spy client to forget about prior call history + reset(spyClient); + + // simulate several concurrent commit operations on the same table + ExecutorService executor = Executors.newFixedThreadPool(numConcurrentCommits); + IntStream.range(0, numConcurrentCommits).forEach(i -> + executor.submit(() -> { + try { + spyOps.doCommit(metadataV2, metadataV1); + } catch (CommitFailedException e) { + // failures are expected here when checking the base version + // it's no problem, we're not testing the actual commit success here, only the HMS lock acquisition attempts + } + })); + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + + // intra-process commits to the same table should be serialized now + // i.e. no thread should receive WAITING state from HMS and have to call checkLock periodically + verify(spyClient, never()).checkLock(any(Long.class)); + // all threads eventually got their turn + verify(spyClient, times(numConcurrentCommits)).lock(any(LockRequest.class)); + } } diff --git a/iceberg/iceberg-handler/pom.xml b/iceberg/iceberg-handler/pom.xml index c60e0ef..87c21fa 100644 --- a/iceberg/iceberg-handler/pom.xml +++ b/iceberg/iceberg-handler/pom.xml @@ -97,10 +97,15 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>test</scope> - </dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java index 710150a..ccd3c6e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java @@ -19,7 +19,6 @@ package org.apache.iceberg.mr; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -35,35 +34,46 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Streams; /** * Class for catalog resolution and accessing the common functions for {@link Catalog} API. * <p> - * Catalog resolution happens in this order: - * <ol> - * <li>Custom catalog if specified by {@link InputFormatConfig#CATALOG_LOADER_CLASS} - * <li>Hadoop or Hive catalog if specified by {@link InputFormatConfig#CATALOG} - * <li>Hadoop Tables - * </ol> + * If the catalog name is provided, get the catalog type from iceberg.catalog.<code>catalogName</code>.type config. + * <p> + * In case the catalog name is {@link #ICEBERG_HADOOP_TABLE_NAME location_based_table}, + * type is ignored and tables will be loaded using {@link HadoopTables}. + * <p> + * In case the value of catalog type is null, iceberg.catalog.<code>catalogName</code>.catalog-impl config + * is used to determine the catalog implementation class. + * <p> + * If catalog name is null, get the catalog type from {@link InputFormatConfig#CATALOG iceberg.mr.catalog} config: + * <ul> + * <li>hive: HiveCatalog</li> + * <li>location: HadoopTables</li> + * <li>hadoop: HadoopCatalog</li> + * </ul> + * <p> + * In case the value of catalog type is null, + * {@link InputFormatConfig#CATALOG_LOADER_CLASS iceberg.mr.catalog.loader.class} is used to determine + * the catalog implementation class. + * <p> + * Note: null catalog name mode is only supported for backwards compatibility. Using this mode is NOT RECOMMENDED. */ public final class Catalogs { public static final String ICEBERG_DEFAULT_CATALOG_NAME = "default_iceberg"; public static final String ICEBERG_HADOOP_TABLE_NAME = "location_based_table"; - private static final String HIVE_CATALOG_TYPE = "hive"; - private static final String HADOOP_CATALOG_TYPE = "hadoop"; - private static final String NO_CATALOG_TYPE = "no catalog"; - public static final String NAME = "name"; public static final String LOCATION = "location"; + private static final String NO_CATALOG_TYPE = "no catalog"; private static final Set<String> PROPERTIES_TO_REMOVE = ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME, InputFormatConfig.CATALOG_NAME); @@ -141,7 +151,7 @@ public final class Catalogs { String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); // Create a table property map without the controlling properties - Map<String, String> map = new HashMap<>(props.size()); + Map<String, String> map = Maps.newHashMapWithExpectedSize(props.size()); for (Object key : props.keySet()) { if (!PROPERTIES_TO_REMOVE.contains(key)) { map.put(key.toString(), props.get(key).toString()); @@ -193,16 +203,20 @@ public final class Catalogs { */ public static boolean hiveCatalog(Configuration conf, Properties props) { String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME); - return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(getCatalogType(conf, catalogName)); + String catalogType = getCatalogType(conf, catalogName); + if (catalogType != null) { + return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType); + } + catalogType = getCatalogType(conf, ICEBERG_DEFAULT_CATALOG_NAME); + if (catalogType != null) { + return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType); + } + return getCatalogProperties(conf, catalogName, catalogType).get(CatalogProperties.CATALOG_IMPL) == null; } @VisibleForTesting static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) { String catalogType = getCatalogType(conf, catalogName); - if (catalogType == null) { - throw new NoSuchNamespaceException("Catalog definition for %s is not found.", catalogName); - } - if (NO_CATALOG_TYPE.equalsIgnoreCase(catalogType)) { return Optional.empty(); } else { @@ -238,10 +252,18 @@ public final class Catalogs { */ private static Map<String, String> addCatalogPropertiesIfMissing(Configuration conf, String catalogType, Map<String, String> catalogProperties) { - catalogProperties.putIfAbsent(CatalogUtil.ICEBERG_CATALOG_TYPE, catalogType); - if (catalogType.equalsIgnoreCase(HADOOP_CATALOG_TYPE)) { - catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION, - conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION)); + if (catalogType != null) { + catalogProperties.putIfAbsent(CatalogUtil.ICEBERG_CATALOG_TYPE, catalogType); + } + + String legacyCatalogImpl = conf.get(InputFormatConfig.CATALOG_LOADER_CLASS); + if (legacyCatalogImpl != null) { + catalogProperties.putIfAbsent(CatalogProperties.CATALOG_IMPL, legacyCatalogImpl); + } + + String legacyWarehouseLocation = conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION); + if (legacyWarehouseLocation != null) { + catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION, legacyWarehouseLocation); } return catalogProperties; } @@ -249,35 +271,24 @@ public final class Catalogs { /** * Return the catalog type based on the catalog name. * <p> - * If the catalog name is provided get the catalog type from 'iceberg.catalog.<code>catalogName</code>.type' config. - * In case the value of this property is null, return with no catalog definition (Hadoop Table) - * </p> - * <p> - * If catalog name is null, check the global conf for 'iceberg.mr.catalog' property. If the value of the property is: - * <ul> - * <li>null/hive -> Hive Catalog</li> - * <li>location -> Hadoop Table</li> - * <li>hadoop -> Hadoop Catalog</li> - * <li>any other value -> Custom Catalog</li> - * </ul> - * </p> + * See {@link Catalogs} documentation for catalog type resolution strategy. + * * @param conf global hive configuration * @param catalogName name of the catalog * @return type of the catalog, can be null */ private static String getCatalogType(Configuration conf, String catalogName) { if (catalogName != null) { - String catalogType = conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName)); - if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME) || catalogType == null) { + String catalogType = conf.get(InputFormatConfig.catalogPropertyConfigKey( + catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE)); + if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME)) { return NO_CATALOG_TYPE; } else { return catalogType; } } else { String catalogType = conf.get(InputFormatConfig.CATALOG); - if (catalogType == null) { - return HIVE_CATALOG_TYPE; - } else if (catalogType.equals(LOCATION)) { + if (catalogType != null && catalogType.equals(LOCATION)) { return NO_CATALOG_TYPE; } else { return catalogType; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java index e8e2163..32b777b 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java @@ -51,9 +51,30 @@ public class InputFormatConfig { public static final String SERIALIZED_TABLE_PREFIX = "iceberg.mr.serialized.table."; public static final String TABLE_CATALOG_PREFIX = "iceberg.mr.table.catalog."; public static final String LOCALITY = "iceberg.mr.locality"; + + /** + * @deprecated please use {@link #catalogPropertyConfigKey(String, String)} + * with config key {@link org.apache.iceberg.CatalogUtil#ICEBERG_CATALOG_TYPE} to specify the type of a catalog. + */ + @Deprecated public static final String CATALOG = "iceberg.mr.catalog"; + + /** + * @deprecated please use {@link #catalogPropertyConfigKey(String, String)} + * with config key {@link org.apache.iceberg.CatalogProperties#WAREHOUSE_LOCATION} + * to specify the warehouse location of a catalog. + */ + @Deprecated public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = "iceberg.mr.catalog.hadoop.warehouse.location"; + + /** + * @deprecated please use {@link #catalogPropertyConfigKey(String, String)} + * with config key {@link org.apache.iceberg.CatalogProperties#CATALOG_IMPL} + * to specify the implementation of a catalog. + */ + @Deprecated public static final String CATALOG_LOADER_CLASS = "iceberg.mr.catalog.loader.class"; + public static final String CTAS_TABLE_NAME = "iceberg.mr.ctas.table.name"; public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns"; public static final String EXTERNAL_TABLE_PURGE = "external.table.purge"; @@ -211,6 +232,18 @@ public class InputFormatConfig { return readColumns.split(conf.get(serdeConstants.COLUMN_NAME_DELIMITER, String.valueOf(SerDeUtils.COMMA))); } + /** + * Get Hadoop config key of a catalog property based on catalog name + * @param catalogName catalog name + * @param catalogProperty catalog property, can be any custom property, + * a commonly used list of properties can be found + * at {@link org.apache.iceberg.CatalogProperties} + * @return Hadoop config key of a catalog property for the catalog name + */ + public static String catalogPropertyConfigKey(String catalogName, String catalogProperty) { + return String.format("%s%s.%s", CATALOG_CONFIG_PREFIX, catalogName, catalogProperty); + } + private static Schema schema(Configuration conf, String key) { String json = conf.get(key); return json == null ? null : SchemaParser.fromJson(json); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java index 458affd..47e9f3e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/Deserializer.java @@ -33,6 +33,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.mr.hive.serde.objectinspector.WriteObjectInspector; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.schema.SchemaWithPartnerVisitor; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types.ListType; @@ -232,7 +233,7 @@ class Deserializer { FixNameMappingObjectInspectorPair(Schema schema, ObjectInspectorPair pair) { super(pair.writerInspector(), pair.sourceInspector()); - this.sourceNameMap = new HashMap<>(schema.columns().size()); + this.sourceNameMap = Maps.newHashMapWithExpectedSize(schema.columns().size()); List<? extends StructField> fields = ((StructObjectInspector) sourceInspector()).getAllStructFieldRefs(); for (int i = 0; i < schema.columns().size(); ++i) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java index 6fc8cc6..fc66890 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java @@ -116,7 +116,7 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record> IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit(); // bogus cast for favouring code reuse over syntax return (RecordReader) HIVE_VECTORIZED_RECORDREADER_CTOR.newInstance( - new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>(), + new IcebergInputFormat<>(), icebergSplit, job, reporter); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java index 9a3c54a..c7951ff 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java @@ -39,9 +39,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.mapred.Container; @@ -78,15 +76,15 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); FileIO io = table.io(); - LocationProvider location = table.locationProvider(); - EncryptionManager encryption = table.encryption(); - OutputFileFactory outputFileFactory = - new OutputFileFactory(spec, fileFormat, location, io, encryption, taskAttemptID.getTaskID().getId(), - taskAttemptID.getId(), jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID()); + int partitionId = taskAttemptID.getTaskID().getId(); + int taskId = taskAttemptID.getId(); + String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID(); + OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) + .format(fileFormat) + .operationId(operationId) + .build(); String tableName = jc.get(Catalogs.NAME); - HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, fileFormat, + return new HiveIcebergRecordWriter(schema, spec, fileFormat, new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, taskAttemptID, tableName); - - return writer; } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java index 6bd4214..c215e48 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java @@ -21,7 +21,6 @@ package org.apache.iceberg.mr.hive; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -56,6 +55,7 @@ import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector; import org.apache.iceberg.mr.mapred.Container; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +70,7 @@ public class HiveIcebergSerDe extends AbstractSerDe { private ObjectInspector inspector; private Schema tableSchema; private Collection<String> partitionColumns; - private Map<ObjectInspector, Deserializer> deserializers = new HashMap<>(1); + private Map<ObjectInspector, Deserializer> deserializers = Maps.newHashMapWithExpectedSize(1); private Container<Record> row = new Container<>(); @Override diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index c6525aa..fdba3df 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -43,6 +43,7 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -66,6 +67,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler; @@ -136,13 +138,14 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> { InputFormatConfig.InMemoryDataModel model = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL, InputFormatConfig.InMemoryDataModel.GENERIC); try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) { + Table serializableTable = SerializableTable.copyOf(table); tasksIterable.forEach(task -> { if (applyResidual && (model == InputFormatConfig.InMemoryDataModel.HIVE || model == InputFormatConfig.InMemoryDataModel.PIG)) { // TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet checkResiduals(task); } - splits.add(new IcebergSplit(conf, task, table.io(), table.encryption())); + splits.add(new IcebergSplit(serializableTable, conf, task)); }); } catch (IOException e) { throw new UncheckedIOException(String.format("Failed to close table scan: %s", scan), e); @@ -190,6 +193,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> { private TaskAttemptContext context; private Schema tableSchema; private Schema expectedSchema; + private String nameMapping; private boolean reuseContainers; private boolean caseSensitive; private InputFormatConfig.InMemoryDataModel inMemoryDataModel; @@ -205,10 +209,12 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> { // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances CombinedScanTask task = ((IcebergSplit) split).task(); this.context = newContext; - this.io = ((IcebergSplit) split).io(); - this.encryptionManager = ((IcebergSplit) split).encryptionManager(); + Table table = ((IcebergSplit) split).table(); + this.io = table.io(); + this.encryptionManager = table.encryption(); this.tasks = task.files().iterator(); this.tableSchema = InputFormatConfig.tableSchema(conf); + this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT); this.expectedSchema = readSchema(conf, tableSchema, caseSensitive); this.reuseContainers = conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false); @@ -333,6 +339,9 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> { if (reuseContainers) { avroReadBuilder.reuseContainers(); } + if (nameMapping != null) { + avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } switch (inMemoryDataModel) { case PIG: @@ -357,6 +366,9 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> { if (reuseContainers) { parquetReadBuilder.reuseContainers(); } + if (nameMapping != null) { + parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } switch (inMemoryDataModel) { case PIG: @@ -380,8 +392,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> { // ORC does not support reuse containers yet switch (inMemoryDataModel) { case PIG: - // TODO: implement value readers for Pig and Hive - throw new UnsupportedOperationException("ORC support not yet supported for Pig and Hive"); + // TODO: implement value readers for Pig + throw new UnsupportedOperationException("ORC support not yet supported for Pig"); case HIVE: if (MetastoreUtil.hive3PresentOnClasspath()) { orcIterator = HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile, task, idToConstant, context); @@ -398,6 +410,10 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> { orcReadBuilder.createReaderFunc( fileSchema -> GenericOrcReader.buildReader( readSchema, fileSchema, idToConstant)); + + if (nameMapping != null) { + orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } orcIterator = orcReadBuilder.build(); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java index 43b78d6..8bc332e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java @@ -26,11 +26,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.hadoop.HadoopFileIO; -import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.Util; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.util.SerializationUtil; @@ -40,9 +37,8 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred public static final String[] ANYWHERE = new String[]{"*"}; + private Table table; private CombinedScanTask task; - private FileIO io; - private EncryptionManager encryptionManager; private transient String[] locations; private transient Configuration conf; @@ -51,11 +47,10 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred public IcebergSplit() { } - IcebergSplit(Configuration conf, CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) { + IcebergSplit(Table table, Configuration conf, CombinedScanTask task) { + this.table = table; this.task = task; this.conf = conf; - this.io = io; - this.encryptionManager = encryptionManager; } public CombinedScanTask task() { @@ -88,45 +83,27 @@ public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred @Override public void write(DataOutput out) throws IOException { + byte[] tableData = SerializationUtil.serializeToBytes(table); + out.writeInt(tableData.length); + out.write(tableData); + byte[] data = SerializationUtil.serializeToBytes(this.task); out.writeInt(data.length); out.write(data); - - byte[] ioData; - if (io instanceof HadoopFileIO) { - SerializableConfiguration serializableConf = new SerializableConfiguration(((HadoopFileIO) io).conf()); - ioData = SerializationUtil.serializeToBytes(new HadoopFileIO(serializableConf::get)); - } else { - ioData = SerializationUtil.serializeToBytes(io); - } - out.writeInt(ioData.length); - out.write(ioData); - - byte[] encryptionManagerData = SerializationUtil.serializeToBytes(encryptionManager); - out.writeInt(encryptionManagerData.length); - out.write(encryptionManagerData); } @Override public void readFields(DataInput in) throws IOException { + byte[] tableData = new byte[in.readInt()]; + in.readFully(tableData); + this.table = SerializationUtil.deserializeFromBytes(tableData); + byte[] data = new byte[in.readInt()]; in.readFully(data); this.task = SerializationUtil.deserializeFromBytes(data); - - byte[] ioData = new byte[in.readInt()]; - in.readFully(ioData); - this.io = SerializationUtil.deserializeFromBytes(ioData); - - byte[] encryptionManagerData = new byte[in.readInt()]; - in.readFully(encryptionManagerData); - this.encryptionManager = SerializationUtil.deserializeFromBytes(encryptionManagerData); - } - - public FileIO io() { - return io; } - public EncryptionManager encryptionManager() { - return encryptionManager; + public Table table() { + return table; } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java index 04168eb..9b3ee40 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -38,6 +39,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -191,91 +193,149 @@ public class TestCatalogs { } @Test - public void testLoadCatalog() throws IOException { - conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION); - Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent()); - - String nonExistentCatalogType = "fooType"; + public void testLegacyLoadCatalogDefault() { + Optional<Catalog> defaultCatalog = Catalogs.loadCatalog(conf, null); + Assert.assertTrue(defaultCatalog.isPresent()); + Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class); + Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties())); + } - conf.set(InputFormatConfig.CATALOG, nonExistentCatalogType); - AssertHelpers.assertThrows( - "should complain about catalog not supported", UnsupportedOperationException.class, - "Unknown catalog type", () -> Catalogs.loadCatalog(conf, null)); + @Test + public void testLegacyLoadCatalogHive() { + conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE); + Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, null); + Assert.assertTrue(hiveCatalog.isPresent()); + Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class); + Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties())); + } + @Test + public void testLegacyLoadCatalogHadoop() { conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation"); Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, null); - Assert.assertTrue(hadoopCatalog.isPresent()); - Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog); - - conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE); - Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, null); + Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class); + Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties())); + } - Assert.assertTrue(hiveCatalog.isPresent()); - Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog); + @Test + public void testLegacyLoadCatalogCustom() { + conf.set(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalog.class.getName()); + conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation"); + Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, null); + Assert.assertTrue(customHadoopCatalog.isPresent()); + Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class); + Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties())); + } + @Test + public void testLegacyLoadCatalogLocation() { conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION); Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent()); + } - // arbitrary catalog name with non existent catalog type - String catalogName = "barCatalog"; - conf.unset(InputFormatConfig.CATALOG); - conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), nonExistentCatalogType); + @Test + public void testLegacyLoadCatalogUnknown() { + conf.set(InputFormatConfig.CATALOG, "fooType"); AssertHelpers.assertThrows( - "should complain about catalog not supported", UnsupportedOperationException.class, - "Unknown catalog type:", () -> Catalogs.loadCatalog(conf, catalogName)); + "should complain about catalog not supported", UnsupportedOperationException.class, + "Unknown catalog type", () -> Catalogs.loadCatalog(conf, null)); + } - // arbitrary catalog name with hadoop catalog type and default warehouse location - conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), - CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); - hadoopCatalog = Catalogs.loadCatalog(conf, catalogName); + @Test + public void testLoadCatalogDefault() { + String catalogName = "barCatalog"; + Optional<Catalog> defaultCatalog = Catalogs.loadCatalog(conf, catalogName); + Assert.assertTrue(defaultCatalog.isPresent()); + Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class); + Properties properties = new Properties(); + properties.put(InputFormatConfig.CATALOG_NAME, catalogName); + Assert.assertTrue(Catalogs.hiveCatalog(conf, properties)); + } - Assert.assertTrue(hadoopCatalog.isPresent()); - Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog); + @Test + public void testLoadCatalogHive() { + String catalogName = "barCatalog"; + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE), + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE); + Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, catalogName); + Assert.assertTrue(hiveCatalog.isPresent()); + Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class); + Properties properties = new Properties(); + properties.put(InputFormatConfig.CATALOG_NAME, catalogName); + Assert.assertTrue(Catalogs.hiveCatalog(conf, properties)); + } - // arbitrary catalog name with hadoop catalog type and provided warehouse location - conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), - CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); - conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalogName), "/tmp/mylocation"); - hadoopCatalog = Catalogs.loadCatalog(conf, catalogName); + @Test + public void testLegacyLoadCustomCatalogWithHiveCatalogTypeSet() { + String catalogName = "barCatalog"; + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE), + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE); + conf.set(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalog.class.getName()); + conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation"); + AssertHelpers.assertThrows("Should complain about both configs being set", IllegalArgumentException.class, + "both type and catalog-impl are set", () -> Catalogs.loadCatalog(conf, catalogName)); + } + @Test + public void testLoadCatalogHadoop() { + String catalogName = "barCatalog"; + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE), + CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogProperties.WAREHOUSE_LOCATION), + "/tmp/mylocation"); + Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, catalogName); Assert.assertTrue(hadoopCatalog.isPresent()); - Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog); + Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class); Assert.assertEquals("HadoopCatalog{name=barCatalog, location=/tmp/mylocation}", hadoopCatalog.get().toString()); + Properties properties = new Properties(); + properties.put(InputFormatConfig.CATALOG_NAME, catalogName); + Assert.assertFalse(Catalogs.hiveCatalog(conf, properties)); + } - // arbitrary catalog name with hive catalog type - conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), - CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE); - hiveCatalog = Catalogs.loadCatalog(conf, catalogName); - - Assert.assertTrue(hiveCatalog.isPresent()); - Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog); - - // arbitrary catalog name with custom catalog type without specific classloader - conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), "custom"); - AssertHelpers.assertThrows( - "should complain about catalog not supported", UnsupportedOperationException.class, - "Unknown catalog type:", () -> Catalogs.loadCatalog(conf, catalogName)); + @Test + public void testLoadCatalogHadoopWithLegacyWarehouseLocation() { + String catalogName = "barCatalog"; + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE), + CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation"); + Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, catalogName); + Assert.assertTrue(hadoopCatalog.isPresent()); + Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class); + Assert.assertEquals("HadoopCatalog{name=barCatalog, location=/tmp/mylocation}", hadoopCatalog.get().toString()); + Properties properties = new Properties(); + properties.put(InputFormatConfig.CATALOG_NAME, catalogName); + Assert.assertFalse(Catalogs.hiveCatalog(conf, properties)); + } - // arbitrary catalog name with custom catalog type and provided classloader - conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), "custom"); - conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalogName), CustomHadoopCatalog.class.getName()); + @Test + public void testLoadCatalogCustom() { + String catalogName = "barCatalog"; + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogProperties.CATALOG_IMPL), + CustomHadoopCatalog.class.getName()); + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogProperties.WAREHOUSE_LOCATION), + "/tmp/mylocation"); Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, catalogName); - Assert.assertTrue(customHadoopCatalog.isPresent()); - Assert.assertTrue(customHadoopCatalog.get() instanceof CustomHadoopCatalog); - - // arbitrary catalog name with location catalog type - conf.unset(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName)); - Assert.assertFalse(Catalogs.loadCatalog(conf, catalogName).isPresent()); + Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class); + Properties properties = new Properties(); + properties.put(InputFormatConfig.CATALOG_NAME, catalogName); + Assert.assertFalse(Catalogs.hiveCatalog(conf, properties)); + } - // default catalog configuration - conf.unset(InputFormatConfig.CATALOG); - hiveCatalog = Catalogs.loadCatalog(conf, null); + @Test + public void testLoadCatalogLocation() { + Assert.assertFalse(Catalogs.loadCatalog(conf, Catalogs.ICEBERG_HADOOP_TABLE_NAME).isPresent()); + } - Assert.assertTrue(hiveCatalog.isPresent()); - Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog); + @Test + public void testLoadCatalogUnknown() { + String catalogName = "barCatalog"; + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogUtil.ICEBERG_CATALOG_TYPE), "fooType"); + AssertHelpers.assertThrows( + "should complain about catalog not supported", UnsupportedOperationException.class, + "Unknown catalog type:", () -> Catalogs.loadCatalog(conf, catalogName)); } public static class CustomHadoopCatalog extends HadoopCatalog { @@ -291,9 +351,10 @@ public class TestCatalogs { } private void setCustomCatalogProperties(String catalogName, String warehouseLocation) { - conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalogName), warehouseLocation); - conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalogName), CustomHadoopCatalog.class.getName()); - conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), "custom"); + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogProperties.WAREHOUSE_LOCATION), + warehouseLocation); + conf.set(InputFormatConfig.catalogPropertyConfigKey(catalogName, CatalogProperties.CATALOG_IMPL), + CustomHadoopCatalog.class.getName()); conf.set(InputFormatConfig.CATALOG_NAME, catalogName); } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java index 6f3cbee..bdc1014 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -351,10 +352,10 @@ public class TestIcebergInputFormats { String warehouseLocation = temp.newFolder("hadoop_catalog").getAbsolutePath(); conf.set("warehouse.location", warehouseLocation); conf.set(InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME); - conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME), - CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); - conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME), - warehouseLocation); + conf.set(InputFormatConfig.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME, + CatalogUtil.ICEBERG_CATALOG_TYPE), CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + conf.set(InputFormatConfig.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME, + CatalogProperties.WAREHOUSE_LOCATION), warehouseLocation); Catalog catalog = new HadoopCatalog(conf, conf.get("warehouse.location")); TableIdentifier identifier = TableIdentifier.of("db", "t"); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java index ef964f8..2ba4e50 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java @@ -32,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.data.DeleteReadTests; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; @@ -104,6 +105,7 @@ public class TestInputFormatReaderDeletes extends DeleteReadTests { .filter(recordFactory -> recordFactory.name().equals(inputFormat)) .map(recordFactory -> recordFactory.create(builder.project(projected).conf()).getRecords()) .flatMap(List::stream) + .map(record -> new InternalRecordWrapper(projected.asStruct()).wrap(record)) .collect(Collectors.toList()) ); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java index 72a04a4..14760ee 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergCTAS.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Assume; @@ -45,9 +46,10 @@ public class TestHiveIcebergCTAS extends HiveIcebergStorageHandlerWithEngineBase HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, TableIdentifier.of("default", "source"), false)); shell.executeStatement(String.format( - "CREATE TABLE target STORED BY ICEBERG %s TBLPROPERTIES ('%s'='%s') AS SELECT * FROM source", + "CREATE TABLE target STORED BY ICEBERG %s %s AS SELECT * FROM source", testTables.locationForCreateTableSQL(TableIdentifier.of("default", "target")), - TableProperties.DEFAULT_FILE_FORMAT, fileFormat)); + testTables.propertiesForCreateTableSQL( + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.toString())))); List<Object[]> objects = shell.executeStatement("SELECT * FROM target ORDER BY id"); HiveIcebergTestUtils.validateData(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, @@ -64,8 +66,9 @@ public class TestHiveIcebergCTAS extends HiveIcebergStorageHandlerWithEngineBase shell.executeStatement(String.format( "CREATE TABLE target PARTITIONED BY (dept, name) " + - "STORED BY ICEBERG TBLPROPERTIES ('%s'='%s') AS SELECT * FROM source", - TableProperties.DEFAULT_FILE_FORMAT, fileFormat)); + "STORED BY ICEBERG %s AS SELECT * FROM source", + testTables.propertiesForCreateTableSQL( + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.toString())))); // check table can be read back correctly List<Object[]> objects = shell.executeStatement("SELECT id, name, dept FROM target ORDER BY id"); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java index 3143d32..5222361 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java @@ -30,6 +30,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.mr.TestHelper; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -305,10 +306,11 @@ public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineB // create Iceberg table without specifying a write format in the tbl properties // it should fall back to using the default file format - shell.executeStatement(String.format("CREATE EXTERNAL TABLE %s (id bigint, name string) STORED BY '%s' %s", + shell.executeStatement(String.format("CREATE EXTERNAL TABLE %s (id bigint, name string) STORED BY '%s' %s %s", identifier, HiveIcebergStorageHandler.class.getName(), - testTables.locationForCreateTableSQL(identifier))); + testTables.locationForCreateTableSQL(identifier), + testTables.propertiesForCreateTableSQL(ImmutableMap.of()))); shell.executeStatement(String.format("INSERT INTO %s VALUES (10, 'Linda')", identifier)); List<Object[]> results = shell.executeStatement(String.format("SELECT * FROM %s", identifier)); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java index 205f9c0..c8de0d1 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergMigration.java @@ -31,6 +31,7 @@ import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Assume; @@ -51,7 +52,8 @@ public class TestHiveIcebergMigration extends HiveIcebergStorageHandlerWithEngin Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET); String tableName = "tbl"; String createQuery = "CREATE EXTERNAL TABLE " + tableName + " (a int) STORED AS " + fileFormat.name() + " " + - testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)); + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) + + testTables.propertiesForCreateTableSQL(ImmutableMap.of()); shell.executeStatement(createQuery); shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2), (3)"); validateMigration(tableName); @@ -62,7 +64,8 @@ public class TestHiveIcebergMigration extends HiveIcebergStorageHandlerWithEngin Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET); String tableName = "tbl_part"; shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) PARTITIONED BY (b string) STORED AS " + - fileFormat.name() + " " + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName))); + fileFormat.name() + " " + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) + + testTables.propertiesForCreateTableSQL(ImmutableMap.of())); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') VALUES (1), (2), (3)"); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') VALUES (4), (5)"); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') VALUES (6)"); @@ -76,7 +79,8 @@ public class TestHiveIcebergMigration extends HiveIcebergStorageHandlerWithEngin String tableName = "tbl_part_bucketed"; shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) PARTITIONED BY (b string) clustered by " + "(a) INTO 2 BUCKETS STORED AS " + fileFormat.name() + " " + - testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName))); + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) + + testTables.propertiesForCreateTableSQL(ImmutableMap.of())); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') VALUES (1), (2), (3)"); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') VALUES (4), (5)"); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') VALUES (6)"); @@ -89,7 +93,8 @@ public class TestHiveIcebergMigration extends HiveIcebergStorageHandlerWithEngin Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET); String tableName = "tbl_rollback"; shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) STORED AS " + fileFormat.name() + " " + - testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName))); + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) + + testTables.propertiesForCreateTableSQL(ImmutableMap.of())); shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2), (3)"); validateMigrationRollback(tableName); } @@ -99,7 +104,8 @@ public class TestHiveIcebergMigration extends HiveIcebergStorageHandlerWithEngin Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET); String tableName = "tbl_rollback"; shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) PARTITIONED BY (b string) STORED AS " + - fileFormat.name() + " " + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName))); + fileFormat.name() + " " + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) + + testTables.propertiesForCreateTableSQL(ImmutableMap.of())); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') VALUES (1), (2), (3)"); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') VALUES (4), (5)"); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') VALUES (6)"); @@ -113,7 +119,8 @@ public class TestHiveIcebergMigration extends HiveIcebergStorageHandlerWithEngin String tableName = "tbl_rollback"; shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) PARTITIONED BY (b string, c int) " + "STORED AS " + fileFormat.name() + " " + - testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName))); + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) + + testTables.propertiesForCreateTableSQL(ImmutableMap.of())); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa', c='111') VALUES (1), (2), (3)"); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb', c='111') VALUES (4), (5)"); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa', c='222') VALUES (6)"); @@ -127,7 +134,8 @@ public class TestHiveIcebergMigration extends HiveIcebergStorageHandlerWithEngin String tableName = "tbl_part_bucketed"; shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) PARTITIONED BY (b string) clustered by " + "(a) INTO 2 BUCKETS STORED AS " + fileFormat.name() + " " + - testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName))); + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) + + testTables.propertiesForCreateTableSQL(ImmutableMap.of())); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') VALUES (1), (2), (3)"); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') VALUES (4), (5)"); shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') VALUES (6)"); @@ -144,7 +152,8 @@ public class TestHiveIcebergMigration extends HiveIcebergStorageHandlerWithEngin List<String> formats = ImmutableList.of("TEXTFILE", "JSONFILE", "RCFILE", "SEQUENCEFILE"); formats.forEach(format -> { shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) STORED AS " + format + " " + - testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName))); + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) + + testTables.propertiesForCreateTableSQL(ImmutableMap.of())); shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2), (3)"); AssertHelpers.assertThrows("Migrating a " + format + " table to Iceberg should have thrown an exception.", IllegalArgumentException.class, "Cannot convert hive table to iceberg with input format: ", @@ -161,7 +170,8 @@ public class TestHiveIcebergMigration extends HiveIcebergStorageHandlerWithEngin testTableType == TestTables.TestTableType.HIVE_CATALOG); String tableName = "tbl_unsupported"; shell.executeStatement("CREATE MANAGED TABLE " + tableName + " (a int) STORED AS " + fileFormat + " " + - testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName))); + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)) + + testTables.propertiesForCreateTableSQL(ImmutableMap.of())); shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2), (3)"); AssertHelpers.assertThrows("Migrating a managed table to Iceberg should have thrown an exception.", IllegalArgumentException.class, "Converting non-external, temporary or transactional hive table to iceberg", diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java index af8c383..7b4dce5 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java @@ -41,10 +41,8 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; -import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; @@ -266,18 +264,20 @@ public class TestHiveIcebergOutputCommitter { Table table = HiveIcebergStorageHandler.table(conf, name); FileIO io = table.io(); - LocationProvider location = table.locationProvider(); - EncryptionManager encryption = table.encryption(); Schema schema = HiveIcebergStorageHandler.schema(conf); PartitionSpec spec = table.spec(); for (int i = 0; i < taskNum; ++i) { List<Record> records = TestHelper.generateRandomRecords(schema, RECORD_NUM, i + attemptNum); TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, i, attemptNum); - OutputFileFactory outputFileFactory = - new OutputFileFactory(spec, FileFormat.PARQUET, location, io, encryption, taskId.getTaskID().getId(), - attemptNum, QUERY_ID + "-" + JOB_ID); - HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, spec, FileFormat.PARQUET, + int partitionId = taskId.getTaskID().getId(); + String operationId = QUERY_ID + "-" + JOB_ID; + FileFormat fileFormat = FileFormat.PARQUET; + OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, attemptNum) + .format(fileFormat) + .operationId(operationId) + .build(); + HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, spec, fileFormat, new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE, TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME)); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java index da21af3..84e8b57 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java @@ -26,9 +26,8 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; -import org.apache.iceberg.mr.Catalogs; -import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.TestHelper; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -171,9 +170,9 @@ public class TestHiveIcebergSelects extends HiveIcebergStorageHandlerWithEngineB // note: the Chinese character seems to be accepted in the column name, but not // in the table name - this is the case for both Iceberg and standard Hive tables. shell.executeStatement(String.format( - "CREATE TABLE `%s` (id bigint, `dep,! 是,t` string) STORED BY ICEBERG STORED AS %s %s TBLPROPERTIES ('%s'='%s')", + "CREATE TABLE `%s` (id bigint, `dep,! 是,t` string) STORED BY ICEBERG STORED AS %s %s %s", table.name(), fileFormat, testTables.locationForCreateTableSQL(table), - InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME)); + testTables.propertiesForCreateTableSQL(ImmutableMap.of()))); shell.executeStatement(String.format("INSERT INTO `%s` VALUES (1, 'moon'), (2, 'star')", table.name())); List<Object[]> result = shell.executeStatement(String.format( diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java index d601100..4dca587 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java @@ -30,6 +30,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveTableOperations; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Assume; @@ -147,9 +148,10 @@ public class TestHiveIcebergStatistics extends HiveIcebergStorageHandlerWithEngi shell.setHiveSessionValue(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname, true); shell.executeStatement(String.format( - "CREATE TABLE target STORED BY ICEBERG %s TBLPROPERTIES ('%s'='%s') AS SELECT * FROM source", + "CREATE TABLE target STORED BY ICEBERG %s %s AS SELECT * FROM source", testTables.locationForCreateTableSQL(TableIdentifier.of("default", "target")), - TableProperties.DEFAULT_FILE_FORMAT, fileFormat)); + testTables.propertiesForCreateTableSQL( + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.toString())))); checkColStat("target", "id", true); checkColStatMinMaxValue("target", "id", 0, 2); @@ -165,9 +167,9 @@ public class TestHiveIcebergStatistics extends HiveIcebergStorageHandlerWithEngi shell.setHiveSessionValue(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname, true); shell.executeStatement(String.format( - "CREATE TABLE target PARTITIONED BY (dept, name) " + - "STORED BY ICEBERG TBLPROPERTIES ('%s'='%s') AS SELECT * FROM source s", - TableProperties.DEFAULT_FILE_FORMAT, fileFormat)); + "CREATE TABLE target PARTITIONED BY (dept, name) STORED BY ICEBERG %s AS SELECT * FROM source s", + testTables.propertiesForCreateTableSQL( + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.toString())))); checkColStat("target", "id", true); checkColStat("target", "dept", true); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java index dcadacf..218114b 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java @@ -36,7 +36,6 @@ import org.apache.iceberg.TestHelpers.Row; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -233,7 +232,7 @@ public class TestHiveIcebergStorageHandlerLocalScan { "TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " + "'" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " + - "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"; + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"; runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data); } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index ec0cd4e..fc90722 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -175,7 +175,7 @@ public class TestHiveIcebergStorageHandlerNoScan { testTables.locationForCreateTableSQL(identifier) + " TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(schema) + "', " + - "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"); + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); shell.executeStatement("ALTER TABLE " + identifier + " SET PARTITION SPEC (month(ts))"); @@ -219,7 +219,7 @@ public class TestHiveIcebergStorageHandlerNoScan { testTables.locationForCreateTableSQL(identifier) + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(schema) + "', " + - "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"); + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); PartitionSpec spec = PartitionSpec.builderFor(schema) .year("year_field") @@ -280,7 +280,7 @@ public class TestHiveIcebergStorageHandlerNoScan { testTables.locationForCreateTableSQL(identifier) + " TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(schema) + "', " + - "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"); + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); Table table = testTables.loadTable(identifier); Assert.assertEquals(spec, table.spec()); } @@ -297,7 +297,7 @@ public class TestHiveIcebergStorageHandlerNoScan { "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " + "'dummy'='test', " + - "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"); + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); // Check the Iceberg table data org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); @@ -377,7 +377,7 @@ public class TestHiveIcebergStorageHandlerNoScan { "STRING) STORED BY iceBerg %s TBLPROPERTIES ('%s'='%s')", testTables.locationForCreateTableSQL(identifier), InputFormatConfig.CATALOG_NAME, - Catalogs.ICEBERG_DEFAULT_CATALOG_NAME); + testTables.catalogName()); shell.executeStatement(query); Assert.assertNotNull(testTables.loadTable(identifier)); } @@ -391,7 +391,7 @@ public class TestHiveIcebergStorageHandlerNoScan { "orc", testTables.locationForCreateTableSQL(identifier), InputFormatConfig.CATALOG_NAME, - Catalogs.ICEBERG_DEFAULT_CATALOG_NAME); + testTables.catalogName()); shell.executeStatement(query); Table table = testTables.loadTable(identifier); Assert.assertNotNull(table); @@ -407,7 +407,7 @@ public class TestHiveIcebergStorageHandlerNoScan { testTables.locationForCreateTableSQL(identifier) + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','" + - InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"); + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); // Check the Iceberg table partition data org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); @@ -425,7 +425,7 @@ public class TestHiveIcebergStorageHandlerNoScan { SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " + "'" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " + - "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"); + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); // Check the Iceberg table partition data org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); @@ -442,7 +442,7 @@ public class TestHiveIcebergStorageHandlerNoScan { "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " + "'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE', " + - "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"); + "'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers"); Properties tableProperties = new Properties(); @@ -514,7 +514,7 @@ public class TestHiveIcebergStorageHandlerNoScan { "STORED BY ICEBERG " + testTables.locationForCreateTableSQL(identifier) + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='WrongSchema'" + - ",'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"); + ",'" + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); } ); @@ -536,7 +536,7 @@ public class TestHiveIcebergStorageHandlerNoScan { "STORED BY ICEBERG " + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','" + - InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"); + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); } ); } @@ -556,7 +556,7 @@ public class TestHiveIcebergStorageHandlerNoScan { "STORED BY ICEBERG " + "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" + SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "',' " + - InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')"); + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "')"); } ); } else { @@ -579,8 +579,8 @@ public class TestHiveIcebergStorageHandlerNoScan { "PARTITIONED BY (first_name STRING) " + "STORED BY ICEBERG " + testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers")) + - " TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + - PartitionSpecParser.toJson(spec) + "')"); + testTables.propertiesForCreateTableSQL( + ImmutableMap.of(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(spec)))); } ); } @@ -791,7 +791,7 @@ public class TestHiveIcebergStorageHandlerNoScan { InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA), InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC), "custom_property", "initial_val", - InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME)); + InputFormatConfig.CATALOG_NAME, testTables.catalogName())); // Check the Iceberg table parameters @@ -985,7 +985,7 @@ public class TestHiveIcebergStorageHandlerNoScan { TableIdentifier identifier = TableIdentifier.of("default", "customers"); shell.executeStatement("CREATE EXTERNAL TABLE customers (id int, name string) STORED BY ICEBERG " + testTables.locationForCreateTableSQL(identifier) + " TBLPROPERTIES ('" + - InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "', " + + InputFormatConfig.CATALOG_NAME + "'='" + testTables.catalogName() + "', " + "'" + TableProperties.FORMAT_VERSION + "'='2')"); org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java index 2a0e33a..f32fb10 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java @@ -118,9 +118,9 @@ public class TestHiveIcebergStorageHandlerWithMultipleCatalogs { @Test public void testJoinTablesFromDifferentCatalogs() throws IOException { - createAndAddRecords(testTables1, fileFormat1, TableIdentifier.of("default", "customers1"), table1CatalogName, + createAndAddRecords(testTables1, fileFormat1, TableIdentifier.of("default", "customers1"), HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); - createAndAddRecords(testTables2, fileFormat2, TableIdentifier.of("default", "customers2"), table2CatalogName, + createAndAddRecords(testTables2, fileFormat2, TableIdentifier.of("default", "customers2"), HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); List<Object[]> rows = shell.executeStatement("SELECT c2.customer_id, c2.first_name, c2.last_name " + @@ -164,14 +164,14 @@ public class TestHiveIcebergStorageHandlerWithMultipleCatalogs { } private void createAndAddRecords(TestTables testTables, FileFormat fileFormat, TableIdentifier identifier, - String catalogName, List<Record> records) throws IOException { + List<Record> records) throws IOException { String createSql = String.format( "CREATE EXTERNAL TABLE %s (customer_id BIGINT, first_name STRING, last_name STRING)" + " STORED BY ICEBERG %s " + " TBLPROPERTIES ('%s'='%s', '%s'='%s')", identifier, testTables.locationForCreateTableSQL(identifier), - InputFormatConfig.CATALOG_NAME, catalogName, + InputFormatConfig.CATALOG_NAME, testTables.catalogName(), TableProperties.DEFAULT_FILE_FORMAT, fileFormat); shell.executeStatement(createSql); Table icebergTable = testTables.loadTable(identifier); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java index 7cc9848..5a6f38c 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; @@ -50,6 +51,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.hive.MetastoreUtil; +import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.TestCatalogs; import org.apache.iceberg.mr.TestHelper; @@ -98,6 +100,10 @@ abstract class TestTables { return tables; } + public String catalogName() { + return catalog; + } + /** * The location string needed to be provided for CREATE TABLE ... commands, * like "LOCATION 'file:///tmp/warehouse/default/tablename'. Empty ("") if LOCATION is not needed. @@ -108,8 +114,8 @@ abstract class TestTables { /** * The table properties string needed for the CREATE TABLE ... commands, - * like "TBLPROPERTIES('iceberg.catalog'='mycatalog') - * @return + * like {@code TBLPROPERTIES('iceberg.catalog'='mycatalog')} + * @return the tables properties string, such as {@code TBLPROPERTIES('iceberg.catalog'='mycatalog')} */ public String propertiesForCreateTableSQL(Map<String, String> tableProperties) { Map<String, String> properties = new HashMap<>(tableProperties); @@ -371,10 +377,9 @@ abstract class TestTables { @Override public Map<String, String> properties() { return ImmutableMap.of( - String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), "custom", - String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalog), + InputFormatConfig.catalogPropertyConfigKey(catalog, CatalogProperties.CATALOG_IMPL), TestCatalogs.CustomHadoopCatalog.class.getName(), - String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalog), + InputFormatConfig.catalogPropertyConfigKey(catalog, CatalogProperties.WAREHOUSE_LOCATION), warehouseLocation ); } @@ -403,8 +408,10 @@ abstract class TestTables { @Override public Map<String, String> properties() { return ImmutableMap.of( - String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), "hadoop", - String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalog), warehouseLocation + InputFormatConfig.catalogPropertyConfigKey(catalog, CatalogUtil.ICEBERG_CATALOG_TYPE), + CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, + InputFormatConfig.catalogPropertyConfigKey(catalog, CatalogProperties.WAREHOUSE_LOCATION), + warehouseLocation ); } @@ -414,8 +421,8 @@ abstract class TestTables { } static class HadoopTestTables extends TestTables { - HadoopTestTables(Configuration conf, TemporaryFolder temp, String catalogName) { - super(new HadoopTables(conf), temp, catalogName); + HadoopTestTables(Configuration conf, TemporaryFolder temp) { + super(new HadoopTables(conf), temp, Catalogs.ICEBERG_HADOOP_TABLE_NAME); } @Override @@ -454,7 +461,8 @@ abstract class TestTables { @Override public Map<String, String> properties() { - return ImmutableMap.of(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), "hive"); + return ImmutableMap.of(InputFormatConfig.catalogPropertyConfigKey(catalog, CatalogUtil.ICEBERG_CATALOG_TYPE), + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE); } @Override @@ -489,7 +497,7 @@ abstract class TestTables { enum TestTableType { HADOOP_TABLE { public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName) { - return new HadoopTestTables(conf, temporaryFolder, catalogName); + return new HadoopTestTables(conf, temporaryFolder); } }, HADOOP_CATALOG { diff --git a/iceberg/patched-iceberg-core/pom.xml b/iceberg/patched-iceberg-core/pom.xml index c2d2d38..507818c 100644 --- a/iceberg/patched-iceberg-core/pom.xml +++ b/iceberg/patched-iceberg-core/pom.xml @@ -48,6 +48,11 @@ <groupId>org.apache.iceberg</groupId> <artifactId>patched-iceberg-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <optional>true</optional> + </dependency> </dependencies> <build> <plugins> @@ -71,14 +76,11 @@ <overWrite>true</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <excludes> - **/SnapshotProducer.class - **/SnapshotSummary.class - **/BaseMeta.class - **/BaseMetadataTable.class - **/StaticTableOperations.class - **/CatalogProperties.class - **/BaseMetastoreCatalog.class - **/BaseMetastoreTableOperations.class + **/JdbcClientPool.class + **/JdbcUtil.class + **/CatalogUtil.class + **/ClientPool.class + **/ClientPoolImpl.class </excludes> </artifactItem> </artifactItems> diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/CatalogUtil.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/CatalogUtil.java new file mode 100644 index 0000000..bada6fd --- /dev/null +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.MapMaker; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.GC_ENABLED; +import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; + +public class CatalogUtil { + private static final Logger LOG = LoggerFactory.getLogger(CatalogUtil.class); + + /** + * Shortcut catalog property to load a catalog implementation through a short type name, + * instead of specifying a full java class through {@link CatalogProperties#CATALOG_IMPL}. + * Currently the following type to implementation mappings are supported: + * <ul> + * <li>hive: org.apache.iceberg.hive.HiveCatalog</li> + * <li>hadoop: org.apache.iceberg.hadoop.HadoopCatalog</li> + * </ul> + */ + public static final String ICEBERG_CATALOG_TYPE = "type"; + public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop"; + public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive"; + public static final String ICEBERG_CATALOG_HIVE = "org.apache.iceberg.hive.HiveCatalog"; + public static final String ICEBERG_CATALOG_HADOOP = "org.apache.iceberg.hadoop.HadoopCatalog"; + + private CatalogUtil() { + } + + /** + * Drops all data and metadata files referenced by TableMetadata. + * <p> + * This should be called by dropTable implementations to clean up table files once the table has been dropped in the + * metastore. + * + * @param io a FileIO to use for deletes + * @param metadata the last valid TableMetadata instance for a dropped table. + */ + public static void dropTableData(FileIO io, TableMetadata metadata) { + // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete + // as much of the delete work as possible and avoid orphaned data or manifest files. + + Set<String> manifestListsToDelete = Sets.newHashSet(); + Set<ManifestFile> manifestsToDelete = Sets.newHashSet(); + for (Snapshot snapshot : metadata.snapshots()) { + // add all manifests to the delete set because both data and delete files should be removed + Iterables.addAll(manifestsToDelete, snapshot.allManifests()); + // add the manifest list to the delete set, if present + if (snapshot.manifestListLocation() != null) { + manifestListsToDelete.add(snapshot.manifestListLocation()); + } + } + + LOG.info("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); + + // run all of the deletes + + boolean gcEnabled = PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT); + + if (gcEnabled) { + // delete data files only if we are sure this won't corrupt other tables + deleteFiles(io, manifestsToDelete); + } + + Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path)) + .noRetry().suppressFailureWhenFinished() + .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: {}", manifest, exc)) + .run(io::deleteFile); + + Tasks.foreach(manifestListsToDelete) + .noRetry().suppressFailureWhenFinished() + .onFailure((list, exc) -> LOG.warn("Delete failed for manifest list: {}", list, exc)) + .run(io::deleteFile); + + Tasks.foreach(metadata.metadataFileLocation()) + .noRetry().suppressFailureWhenFinished() + .onFailure((list, exc) -> LOG.warn("Delete failed for metadata file: {}", list, exc)) + .run(io::deleteFile); + } + + @SuppressWarnings("DangerousStringInternUsage") + private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) { + // keep track of deleted files in a map that can be cleaned up when memory runs low + Map<String, Boolean> deletedFiles = new MapMaker() + .concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE) + .weakKeys() + .makeMap(); + + Tasks.foreach(allManifests) + .noRetry().suppressFailureWhenFinished() + .executeWith(ThreadPools.getWorkerPool()) + .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) + .run(manifest -> { + try (ManifestReader<?> reader = ManifestFiles.open(manifest, io)) { + for (ManifestEntry<?> entry : reader.entries()) { + // intern the file path because the weak key map uses identity (==) instead of equals + String path = entry.file().path().toString().intern(); + Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true); + if (alreadyDeleted == null || !alreadyDeleted) { + try { + io.deleteFile(path); + } catch (RuntimeException e) { + // this may happen if the map of deleted files gets cleaned up by gc + LOG.warn("Delete failed for data file: {}", path, e); + } + } + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest.path()); + } + }); + } + + /** + * Load a custom catalog implementation. + * <p> + * The catalog must have a no-arg constructor. + * If the class implements {@link Configurable}, + * a Hadoop config will be passed using {@link Configurable#setConf(Configuration)}. + * {@link Catalog#initialize(String catalogName, Map options)} is called to complete the initialization. + * + * @param impl catalog implementation full class name + * @param catalogName catalog name + * @param properties catalog properties + * @param hadoopConf hadoop configuration if needed + * @return initialized catalog object + * @throws IllegalArgumentException if no-arg constructor not found or error during initialization + */ + public static Catalog loadCatalog( + String impl, + String catalogName, + Map<String, String> properties, + Configuration hadoopConf) { + Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog, impl class name is null"); + DynConstructors.Ctor<Catalog> ctor; + try { + ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize Catalog implementation %s: %s", impl, e.getMessage()), e); + } + + Catalog catalog; + try { + catalog = ctor.newInstance(); + + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize Catalog, %s does not implement Catalog.", impl), e); + } + + if (catalog instanceof Configurable) { + ((Configurable) catalog).setConf(hadoopConf); + } + + catalog.initialize(catalogName, properties); + return catalog; + } + + /** + * Build an Iceberg {@link Catalog} based on a map of catalog properties and optional Hadoop configuration. + * <p> + * This method examines both the {@link #ICEBERG_CATALOG_TYPE} and {@link CatalogProperties#CATALOG_IMPL} properties + * to determine the catalog implementation to load. + * If nothing is specified for both properties, Hive catalog will be loaded by default. + * + * @param name catalog name + * @param options catalog properties + * @param conf Hadoop configuration + * @return initialized catalog + */ + public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Configuration conf) { + String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL); + if (catalogImpl == null) { + String catalogType = PropertyUtil.propertyAsString(options, ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE); + switch (catalogType.toLowerCase(Locale.ENGLISH)) { + case ICEBERG_CATALOG_TYPE_HIVE: + catalogImpl = ICEBERG_CATALOG_HIVE; + break; + case ICEBERG_CATALOG_TYPE_HADOOP: + catalogImpl = ICEBERG_CATALOG_HADOOP; + break; + default: + throw new UnsupportedOperationException("Unknown catalog type: " + catalogType); + } + } else { + String catalogType = options.get(ICEBERG_CATALOG_TYPE); + Preconditions.checkArgument(catalogType == null, + "Cannot create catalog %s, both type and catalog-impl are set: type=%s, catalog-impl=%s", + name, catalogType, catalogImpl); + } + + return CatalogUtil.loadCatalog(catalogImpl, name, options, conf); + } + + /** + * Load a custom {@link FileIO} implementation. + * <p> + * The implementation must have a no-arg constructor. + * If the class implements {@link Configurable}, + * a Hadoop config will be passed using {@link Configurable#setConf(Configuration)}. + * {@link FileIO#initialize(Map properties)} is called to complete the initialization. + * + * @param impl full class name of a custom FileIO implementation + * @param hadoopConf hadoop configuration + * @return FileIO class + * @throws IllegalArgumentException if class path not found or + * right constructor not found or + * the loaded class cannot be casted to the given interface type + */ + public static FileIO loadFileIO( + String impl, + Map<String, String> properties, + Configuration hadoopConf) { + LOG.info("Loading custom FileIO implementation: {}", impl); + DynConstructors.Ctor<FileIO> ctor; + try { + ctor = DynConstructors.builder(FileIO.class).impl(impl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException(String.format( + "Cannot initialize FileIO, missing no-arg constructor: %s", impl), e); + } + + FileIO fileIO; + try { + fileIO = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize FileIO, %s does not implement FileIO.", impl), e); + } + + if (fileIO instanceof Configurable) { + ((Configurable) fileIO).setConf(hadoopConf); + } + + fileIO.initialize(properties); + return fileIO; + } +} diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPool.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPool.java similarity index 90% rename from iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPool.java rename to iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPool.java index 7f32cf9..117939a 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPool.java +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPool.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.hive; +package org.apache.iceberg; public interface ClientPool<C, E extends Exception> { interface Action<R, C, E extends Exception> { @@ -25,4 +25,6 @@ public interface ClientPool<C, E extends Exception> { } <R> R run(Action<R, C, E> action) throws E, InterruptedException; + + <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException; } diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPoolImpl.java similarity index 90% rename from iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java rename to iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPoolImpl.java index d1a44d3..18f7afa 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/ClientPoolImpl.java +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ClientPoolImpl.java @@ -17,12 +17,11 @@ * under the License. */ -package org.apache.iceberg.hive; +package org.apache.iceberg; import java.io.Closeable; import java.util.ArrayDeque; import java.util.Deque; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,25 +33,32 @@ public abstract class ClientPoolImpl<C, E extends Exception> implements Closeabl private final Deque<C> clients; private final Class<? extends E> reconnectExc; private final Object signal = new Object(); + private final boolean retryByDefault; private volatile int currentSize; private boolean closed; - ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc) { + public ClientPoolImpl(int poolSize, Class<? extends E> reconnectExc, boolean retryByDefault) { this.poolSize = poolSize; this.reconnectExc = reconnectExc; this.clients = new ArrayDeque<>(poolSize); this.currentSize = 0; this.closed = false; + this.retryByDefault = retryByDefault; } @Override public <R> R run(Action<R, C, E> action) throws E, InterruptedException { + return run(action, retryByDefault); + } + + @Override + public <R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException { C client = get(); try { return action.run(client); } catch (Exception exc) { - if (isConnectionException(exc)) { + if (retry && isConnectionException(exc)) { try { client = reconnect(client); } catch (Exception ignored) { @@ -138,8 +144,7 @@ public abstract class ClientPoolImpl<C, E extends Exception> implements Closeabl } } - @VisibleForTesting - int poolSize() { + public int poolSize() { return poolSize; } } diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java new file mode 100644 index 0000000..ba5edf5 --- /dev/null +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.util.Map; +import java.util.Properties; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ClientPoolImpl; + +class JdbcClientPool extends ClientPoolImpl<Connection, SQLException> { + + private final String dbUrl; + private final Map<String, String> properties; + + JdbcClientPool(String dbUrl, Map<String, String> props) { + this(Integer.parseInt(props.getOrDefault(CatalogProperties.CLIENT_POOL_SIZE, + String.valueOf(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT))), dbUrl, props); + } + + JdbcClientPool(int poolSize, String dbUrl, Map<String, String> props) { + super(poolSize, SQLNonTransientConnectionException.class, true); + properties = props; + this.dbUrl = dbUrl; + } + + @Override + protected Connection newClient() { + try { + Properties dbProps = JdbcUtil.filterAndRemovePrefix(properties, JdbcCatalog.PROPERTY_PREFIX); + return DriverManager.getConnection(dbUrl, dbProps); + } catch (SQLException e) { + throw new UncheckedSQLException(e, "Failed to connect: %s", dbUrl); + } + } + + @Override + protected Connection reconnect(Connection client) { + close(client); + return newClient(); + } + + @Override + protected void close(Connection client) { + try { + client.close(); + } catch (SQLException e) { + throw new UncheckedSQLException(e, "Failed to close connection"); + } + } +} diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java new file mode 100644 index 0000000..0b55839 --- /dev/null +++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.jdbc; + +import java.util.Map; +import java.util.Properties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +final class JdbcUtil { + protected static final String CATALOG_TABLE_NAME = "iceberg_tables"; + protected static final String CATALOG_NAME = "catalog_name"; + protected static final String TABLE_NAMESPACE = "table_namespace"; + protected static final String TABLE_NAME = "table_name"; + protected static final String METADATA_LOCATION = "metadata_location"; + protected static final String PREVIOUS_METADATA_LOCATION = "previous_metadata_location"; + public static final String DO_COMMIT_SQL = "UPDATE " + CATALOG_TABLE_NAME + + " SET " + METADATA_LOCATION + " = ? , " + PREVIOUS_METADATA_LOCATION + " = ? " + + " WHERE " + CATALOG_NAME + " = ? AND " + + TABLE_NAMESPACE + " = ? AND " + + TABLE_NAME + " = ? AND " + + METADATA_LOCATION + " = ?"; + protected static final String CREATE_CATALOG_TABLE = + "CREATE TABLE " + CATALOG_TABLE_NAME + + "(" + + CATALOG_NAME + " VARCHAR(255) NOT NULL," + + TABLE_NAMESPACE + " VARCHAR(255) NOT NULL," + + TABLE_NAME + " VARCHAR(255) NOT NULL," + + METADATA_LOCATION + " VARCHAR(5500)," + + PREVIOUS_METADATA_LOCATION + " VARCHAR(5500)," + + "PRIMARY KEY (" + CATALOG_NAME + ", " + TABLE_NAMESPACE + ", " + TABLE_NAME + ")" + + ")"; + protected static final String GET_TABLE_SQL = "SELECT * FROM " + CATALOG_TABLE_NAME + + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + TABLE_NAME + " = ? "; + protected static final String LIST_TABLES_SQL = "SELECT * FROM " + CATALOG_TABLE_NAME + + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ?"; + protected static final String RENAME_TABLE_SQL = "UPDATE " + CATALOG_TABLE_NAME + + " SET " + TABLE_NAMESPACE + " = ? , " + TABLE_NAME + " = ? " + + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + TABLE_NAME + " = ? "; + protected static final String DROP_TABLE_SQL = "DELETE FROM " + CATALOG_TABLE_NAME + + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " = ? AND " + TABLE_NAME + " = ? "; + protected static final String GET_NAMESPACE_SQL = "SELECT " + TABLE_NAMESPACE + " FROM " + CATALOG_TABLE_NAME + + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " LIKE ? LIMIT 1"; + protected static final String LIST_NAMESPACES_SQL = "SELECT DISTINCT " + TABLE_NAMESPACE + + " FROM " + CATALOG_TABLE_NAME + + " WHERE " + CATALOG_NAME + " = ? AND " + TABLE_NAMESPACE + " LIKE ?"; + protected static final String DO_COMMIT_CREATE_TABLE_SQL = "INSERT INTO " + CATALOG_TABLE_NAME + + " (" + CATALOG_NAME + ", " + TABLE_NAMESPACE + ", " + TABLE_NAME + + ", " + METADATA_LOCATION + ", " + PREVIOUS_METADATA_LOCATION + ") " + + " VALUES (?,?,?,?,null)"; + private static final Joiner JOINER_DOT = Joiner.on('.'); + private static final Splitter SPLITTER_DOT = Splitter.on('.'); + + private JdbcUtil() { + } + + public static Namespace stringToNamespace(String namespace) { + Preconditions.checkArgument(namespace != null, "Invalid namespace %s", namespace); + return Namespace.of(Iterables.toArray(SPLITTER_DOT.split(namespace), String.class)); + } + + public static String namespaceToString(Namespace namespace) { + return JOINER_DOT.join(namespace.levels()); + } + + public static TableIdentifier stringToTableIdentifier(String tableNamespace, String tableName) { + return TableIdentifier.of(JdbcUtil.stringToNamespace(tableNamespace), tableName); + } + + public static Properties filterAndRemovePrefix(Map<String, String> properties, + String prefix) { + Properties result = new Properties(); + properties.forEach((key, value) -> { + if (key.startsWith(prefix)) { + result.put(key.substring(prefix.length()), value); + } + }); + + return result; + } +} diff --git a/iceberg/pom.xml b/iceberg/pom.xml index 4a2f2f5..547baaf 100644 --- a/iceberg/pom.xml +++ b/iceberg/pom.xml @@ -38,6 +38,8 @@ <spotless.maven.plugin.version>2.5.0</spotless.maven.plugin.version> <google.errorprone.javac.version>9+181-r4173-1</google.errorprone.javac.version> <google.errorprone.version>2.5.1</google.errorprone.version> + <assertj.version>3.19.0</assertj.version> + <junit.jupiter.version>5.7.2</junit.jupiter.version> </properties> <modules> @@ -190,6 +192,17 @@ <classifier>tests</classifier> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>${assertj.version}</version> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <version>${junit.jupiter.version}</version> + </dependency> + <dependency> <groupId>com.esotericsoftware</groupId>