nk1506 commented on code in PR #9461: URL: https://github.com/apache/iceberg/pull/9461#discussion_r1449952807
########## hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java: ########## @@ -166,153 +164,58 @@ protected void doRefresh() { refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries); } - @SuppressWarnings("checkstyle:CyclomaticComplexity") @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); - boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf); - boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false); - - CommitStatus commitStatus = CommitStatus.FAILURE; - boolean updateHiveTable = false; - HiveLock lock = lockObject(metadata); try { - lock.lock(); - - Table tbl = loadHmsTable(); - - if (tbl != null) { - // If we try to create the table but the metadata location is already set, then we had a - // concurrent commit - if (newTable - && tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) - != null) { - throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); - } - - updateHiveTable = true; - LOG.debug("Committing existing table: {}", fullName); - } else { - tbl = - newHmsTable( - metadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); - LOG.debug("Committing new table: {}", fullName); - } - - tbl.setSd( - HiveOperationsBase.storageDescriptor( - metadata, hiveEngineEnabled)); // set to pickup any schema changes - - String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; - if (!Objects.equals(baseMetadataLocation, metadataLocation)) { - throw new CommitFailedException( - "Cannot commit: Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", - baseMetadataLocation, metadataLocation, database, tableName); - } - - // get Iceberg props that have been removed - Set<String> removedProps = Collections.emptySet(); - if (base != null) { - removedProps = - base.properties().keySet().stream() - .filter(key -> !metadata.properties().containsKey(key)) - .collect(Collectors.toSet()); - } - - Map<String, String> summary = - Optional.ofNullable(metadata.currentSnapshot()) - .map(Snapshot::summary) - .orElseGet(ImmutableMap::of); - setHmsTableParameters( - newMetadataLocation, tbl, metadata, removedProps, hiveEngineEnabled, summary); + commitWithLocking( + lock, base, metadata, baseMetadataLocation, newMetadataLocation, fullName, fileIO); + } finally { + lock.unlock(); + } - if (!keepHiveStats) { - tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE); - } + LOG.info( + "Committed to table {} with the new metadata location {}", fullName, newMetadataLocation); + } - lock.ensureActive(); - - try { - persistTable( - tbl, updateHiveTable, hiveLockEnabled(metadata, conf) ? null : baseMetadataLocation); - lock.ensureActive(); - - commitStatus = CommitStatus.SUCCESS; - } catch (LockException le) { - commitStatus = CommitStatus.UNKNOWN; - throw new CommitStateUnknownException( - "Failed to heartbeat for hive lock while " - + "committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. " - + "Please check the commit history. If you are running into this issue, try reducing " - + "iceberg.hive.lock-heartbeat-interval-ms.", - le); - } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { - throw new AlreadyExistsException(e, "Table already exists: %s.%s", database, tableName); - - } catch (InvalidObjectException e) { - throw new ValidationException(e, "Invalid Hive object for %s.%s", database, tableName); - - } catch (CommitFailedException | CommitStateUnknownException e) { - throw e; - - } catch (Throwable e) { - if (e.getMessage() - .contains( - "The table has been modified. The parameter value for key '" - + HiveTableOperations.METADATA_LOCATION_PROP - + "' is")) { - throw new CommitFailedException( - e, "The table %s.%s has been modified concurrently", database, tableName); - } - - if (e.getMessage() != null - && e.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) { - throw new RuntimeException( - "Failed to acquire locks from metastore because the underlying metastore " - + "table 'HIVE_LOCKS' does not exist. This can occur when using an embedded metastore which does not " - + "support transactions. To fix this use an alternative metastore.", - e); - } - - LOG.error( - "Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", - database, - tableName, - e); - commitStatus = checkCommitStatus(newMetadataLocation, metadata); - switch (commitStatus) { - case SUCCESS: - break; - case FAILURE: - throw e; - case UNKNOWN: - throw new CommitStateUnknownException(e); - } - } - } catch (TException e) { - throw new RuntimeException( - String.format("Metastore operation failed for %s.%s", database, tableName), e); + @Override + public BaseMetastoreTableOperations.CommitStatus validateNewLocationAndReturnCommitStatus( + TableMetadata metadata, String newMetadataLocation) { + return MetastoreOperationsUtil.checkCommitStatus( + fullName, + newMetadataLocation, + metadata.properties(), + this::calculateCommitStatusWithUpdatedLocation); + } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted during commit", e); + @Override + public StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) { + return HiveOperationsBase.storageDescriptor( + metadata.schema(), metadata.location(), hiveEngineEnabled); + } - } catch (LockException e) { - throw new CommitFailedException(e); + @Override + public Set<String> obsoleteProps(TableMetadata base, TableMetadata metadata) { + Set<String> obsoleteProps = Sets.newHashSet(); + if (base != null) { + obsoleteProps = + base.properties().keySet().stream() + .filter(key -> !metadata.properties().containsKey(key)) + .collect(Collectors.toSet()); + } - } finally { - cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lock); + if (!conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false)) { Review Comment: All the obsolete props are being merged together now. Earlier It was being managed by [removedProps](https://github.com/apache/iceberg/blob/53a1c8671dd1b9b93f4a857230008c812d79ddbf/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java#L217) and [hiveStatCheck](https://github.com/apache/iceberg/blob/53a1c8671dd1b9b93f4a857230008c812d79ddbf/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java#L232) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org