ZihanLi58 commented on a change in pull request #3172: URL: https://github.com/apache/incubator-gobblin/pull/3172#discussion_r562325302
########## File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java ########## @@ -0,0 +1,828 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.iceberg.writer; + +import com.codahale.metrics.Timer; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; +import com.google.common.io.Closer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist; +import org.apache.gobblin.hive.AutoCloseableHiveLock; +import org.apache.gobblin.hive.HiveLock; +import org.apache.gobblin.hive.HiveMetastoreClientPool; +import org.apache.gobblin.hive.HivePartition; +import org.apache.gobblin.hive.HiveRegister; +import org.apache.gobblin.hive.HiveTable; +import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; +import org.apache.gobblin.hive.spec.HiveSpec; +import org.apache.gobblin.iceberg.Utils.IcebergUtils; +import org.apache.gobblin.metadata.GobblinMetadataChangeEvent; +import org.apache.gobblin.metadata.OperationType; +import org.apache.gobblin.metrics.GobblinMetricsRegistry; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.GobblinEventBuilder; +import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry; +import org.apache.gobblin.metrics.kafka.SchemaRegistryException; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor; +import org.apache.gobblin.stream.RecordEnvelope; +import org.apache.gobblin.util.AutoReturnableObject; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.ClustersNames; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.ParallelRunner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.FindFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.HiveCatalogs; +import org.apache.thrift.TException; +import org.joda.time.DateTime; +import org.joda.time.format.PeriodFormatter; +import org.joda.time.format.PeriodFormatterBuilder; + + +/** + * This writer is used to calculate iceberg metadata from GMCE and register to iceberg + * iceberg metadata here includes: + * 1. Data files that contained by the table and the metrics of the data files + * 2. Properties of the table (origin table properties, data offset range, high watermark of the GMCE and schema created time) + * 3. Latest schema of the table + */ +@Slf4j +public class IcebergMetadataWriter implements MetadataWriter { + + public static final String USE_DATA_PATH_AS_TABLE_LOCATION = "use.data.path.as.table.location"; + public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata"; + public static final String GMCE_HIGH_WATERMARK_KEY = "gmce.high.watermark.%s"; + public static final String GMCE_LOW_WATERMARK_KEY = "gmce.low.watermark.%s"; + private final static String EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "gobblin.iceberg.dataset.expire.snapshots.lookBackTime"; + private final static String DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "3d"; + public static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist"; + public static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist"; + private static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier"; + private static final String CREATE_TABLE_TIME = "iceberg.create.table.time"; + private static final String SCHEMA_CREATION_TIME_KEY = "schema.creation.time"; + private static final String ADDED_FILES_CACHE_EXPIRING_TIME = "added.files.cache.expiring.time"; + private static final int DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME = 1; + private static final String OFFSET_RANGE_KEY_PREFIX = "offset.range."; + private static final String OFFSET_RANGE_KEY_FORMAT = OFFSET_RANGE_KEY_PREFIX + "%s"; + private static final String ICEBERG_FILE_PATH_COLUMN = "file_path"; + private static final String DEFAULT_CREATION_TIME = "0"; + private static final String SNAPSHOT_EXPIRE_THREADS = "snapshot.expire.threads"; + private static final long DEFAULT_WATERMARK = -1L; + private final WhitelistBlacklist whiteistBlacklist; + private final Closer closer = Closer.create(); + private final Map<TableIdentifier, Long> tableCurrentWaterMarkMap; + //Used to store the relationship between table and the gmce topicPartition + private final Map<TableIdentifier, String> tableTopicpartitionMap; + private final MetricContext metricContext; + private EventSubmitter eventSubmitter; + @Getter + private final KafkaSchemaRegistry schemaRegistry; + private final Map<TableIdentifier, TableMetadata> tableMetadataMap; + @Setter + private HiveCatalog catalog; + protected final Configuration conf; + protected final ReadWriteLock readWriteLock; + private final HiveLock locks; + private final ParallelRunner parallelRunner; + private final boolean useDataLoacationAsTableLocation; + + public IcebergMetadataWriter(State state) throws IOException { + this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties()); + conf = HadoopUtils.getConfFromState(state); + initializeCatalog(); + tableTopicpartitionMap = new HashMap<>(); + tableMetadataMap = new HashMap<>(); + tableCurrentWaterMarkMap = new HashMap<>(); + List<Tag<?>> tags = Lists.newArrayList(); + String clusterIdentifier = ClustersNames.getInstance().getClusterName(); + tags.add(new Tag<>(CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier)); + metricContext = closer.register( + GobblinMetricsRegistry.getInstance().getMetricContext(state, IcebergMetadataWriter.class, tags)); + this.eventSubmitter = + new EventSubmitter.Builder(this.metricContext, IcebergMCEMetadataKeys.METRICS_NAMESPACE_ICEBERG_WRITER).build(); + this.whiteistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""), + state.getProp(ICEBERG_REGISTRATION_BLACKLIST, "")); + // Use lock to make it safe when flush and write are called async + readWriteLock = new ReentrantReadWriteLock(); + this.locks = new HiveLock(state.getProperties()); + parallelRunner = closer.register(new ParallelRunner(state.getPropAsInt(SNAPSHOT_EXPIRE_THREADS, 20), + FileSystem.get(HadoopUtils.getConfFromState(state)))); + useDataLoacationAsTableLocation = state.getPropAsBoolean(USE_DATA_PATH_AS_TABLE_LOCATION, false); + } + + protected void initializeCatalog() { + catalog = HiveCatalogs.loadCatalog(conf); + } + + private org.apache.iceberg.Table getIcebergTable(TableIdentifier tid) throws NoSuchTableException { + TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()); + if (!tableMetadata.table.isPresent()) { + tableMetadata.table = Optional.of(catalog.loadTable(tid)); + } + return tableMetadata.table.get(); + } + + /** + * The method is used to get current watermark of the gmce topic partition for a table + * Make the watermark config name contains topicPartition in case we change the gmce topic name for some reason + */ + private Long getCurrentWaterMark(TableIdentifier tid, String topicPartition) { + if (tableCurrentWaterMarkMap.containsKey(tid)) { + return tableCurrentWaterMarkMap.get(tid); + } + org.apache.iceberg.Table icebergTable; + Long currentWatermark = DEFAULT_WATERMARK; + try { + icebergTable = getIcebergTable(tid); + } catch (NoSuchTableException e) { + return currentWatermark; + } + currentWatermark = + icebergTable.properties().containsKey(String.format(GMCE_HIGH_WATERMARK_KEY, topicPartition)) ? Long.parseLong( + icebergTable.properties().get(String.format(GMCE_HIGH_WATERMARK_KEY, topicPartition))) : DEFAULT_WATERMARK; + if (currentWatermark != DEFAULT_WATERMARK) { + // set the low watermark for current snapshot + tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()).lowWatermark = Optional.of(currentWatermark); + } + return currentWatermark; + } + + /** + *The write method will be responsible for process a given gmce and aggregate the metadata + * The logic of this function will be: + * 1. Check whether a table exists, if not then create the iceberg table + * 2. Compute schema from the gmce and update the cache for candidate schemas + * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or dropFile + * Note: this method only aggregate the metadata in cache without doing commitment. The actual commit will be done in flush method + */ + @SuppressWarnings("checkstyle:WhitespaceAround") + public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, + Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException { + System.out.println("$$here"); + TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName()); + TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()); + Table table; + try { + table = getIcebergTable(tid); + } catch (NoSuchTableException e) { + try { + if (gmce.getOperationType() == OperationType.drop_files) { + log.warn("Table {} does not exist, skip processing this drop_file event", tid.toString()); + return; + } + table = createTable(gmce, tableSpec); + tableMetadata.table = Optional.of(table); + } catch (Exception e1) { + log.error("skip processing {} for table {}.{} due to error when creating table", gmce.toString(), + tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName()); + log.debug(e1.toString()); + return; + } + } + computeCandidateSchema(gmce, tid, tableSpec); + tableMetadata.transaction = Optional.of(tableMetadata.transaction.or(table::newTransaction)); + tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime()); + switch (gmce.getOperationType()) { + case add_files: { + updateTableProperty(tableSpec, tid); + addFiles(gmce, newSpecsMap, table, tableMetadata); + if (gmce.getTopicPartitionOffsetsRange() != null) { + mergeOffsets(gmce, tid); + } + break; + } + case rewrite_files: { + updateTableProperty(tableSpec, tid); + rewriteFiles(gmce, newSpecsMap, oldSpecsMap, table, tableMetadata); + break; + } + case drop_files: { + dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid); + break; + } + default: { + log.error("unsupported operation {}", gmce.getOperationType().toString()); + return; + } + } + } + + private HashMap<String, List<Range>> getLastOffset(TableMetadata tableMetadata) { + HashMap<String, List<Range>> offsets = new HashMap<>(); + if (tableMetadata.lastProperties.isPresent()) { + for (Map.Entry<String, String> entry : tableMetadata.lastProperties.get().entrySet()) { + if (entry.getKey().startsWith(OFFSET_RANGE_KEY_PREFIX)) { + List<Range> ranges = Arrays.asList(entry.getValue().split(ConfigurationKeys.LIST_DELIMITER_KEY)) + .stream() + .map(s -> Range.openClosed( + Long.parseLong(Splitter.on(ConfigurationKeys.RANGE_DELIMITER_KEY).splitToList(s).get(0)), + Long.parseLong(Splitter.on(ConfigurationKeys.RANGE_DELIMITER_KEY).splitToList(s).get(1)))) + .collect(Collectors.toList()); + offsets.put(entry.getKey().substring(OFFSET_RANGE_KEY_PREFIX.length()), ranges); + } + } + } + return offsets; + } + + private void mergeOffsets(GobblinMetadataChangeEvent gmce, TableIdentifier tid) throws IOException { + TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()); + tableMetadata.dataOffsetRange = Optional.of(tableMetadata.dataOffsetRange.or(() -> getLastOffset(tableMetadata))); + Map<String, List<Range>> offsets = tableMetadata.dataOffsetRange.get(); + for (Map.Entry<String, String> entry : gmce.getTopicPartitionOffsetsRange().entrySet()) { + Range range = Range.openClosed( + Long.parseLong(Splitter.on(ConfigurationKeys.RANGE_DELIMITER_KEY).splitToList(entry.getValue()).get(0)), + Long.parseLong( + Splitter.on(ConfigurationKeys.RANGE_DELIMITER_KEY).splitToList(entry.getValue()).get(1))); + List<Range> existRanges = offsets.getOrDefault(entry.getKey(), new ArrayList<>()); + List<Range> newRanges = new ArrayList<>(); + for (Range r : existRanges) { + if (range.isConnected(r)) { + range = range.span(r); + } else { + newRanges.add(r); + } + } + newRanges.add(range); + Collections.sort(newRanges, new Comparator<Range>() { + @Override + public int compare(Range o1, Range o2) { + return o1.lowerEndpoint().compareTo(o2.lowerEndpoint()); + } + }); + offsets.put(entry.getKey(), newRanges); + } + } + + private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) { + org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(tableSpec.getTable()); + tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()).newProperties = + Optional.of(IcebergUtils.getTableProperties(table)); + } + + /** + * Compute the candidate schema from the gmce. + * If the schema source is schemaRegistry, we will use the schema creation time as the schema version to compute candidate schema and determine latest schema + * If the schema does not contain creation time, we will treat it the same as when schema source is event + * If the schema source is event, we will put it as default creation time, during flush, if we only have one candidate with default creation time, + * we'll use that to update schema. + * @param gmce + * @param tid + */ + private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdentifier tid, HiveSpec spec) + throws IOException { + Table table = getIcebergTable(tid); + TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()); + org.apache.hadoop.hive.metastore.api.Table hiveTable = HiveMetaStoreUtils.getTable(spec.getTable()); + tableMetadata.lastProperties = Optional.of(tableMetadata.lastProperties.or(() -> table.properties())); + Map<String, String> props = tableMetadata.lastProperties.get(); + tableMetadata.lastSchemaVersion = Optional.of( + tableMetadata.lastSchemaVersion.or(() -> props.getOrDefault(SCHEMA_CREATION_TIME_KEY, DEFAULT_CREATION_TIME))); + String lastSchemaVersion = tableMetadata.lastSchemaVersion.get(); + tableMetadata.candidateSchemas = Optional.of(tableMetadata.candidateSchemas.or(() -> CacheBuilder.newBuilder() + .expireAfterAccess(conf.getInt(GobblinMCEWriter.CACHE_EXPIRING_TIME, + GobblinMCEWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS) + .build())); + Cache<String, Schema> candidate = tableMetadata.candidateSchemas.get(); + try { + switch (gmce.getSchemaSource()) { + case SCHEMAREGISTRY: { + org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(gmce.getTableSchema()); + String createdOn = AvroUtils.getSchemaCreationTime(schema); + if (createdOn == null) { + candidate.put(DEFAULT_CREATION_TIME, + IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema); + } else if (!createdOn.equals(lastSchemaVersion)) { + candidate.put(createdOn, IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema); + } + break; + } + case EVENT: { + candidate.put(DEFAULT_CREATION_TIME, + IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema); + break; + } + case NONE: { + log.debug("Schema source set to be none, will ignore the schema"); + break; + } + default: { + throw new IOException(String.format("unsupported schema source %s", gmce.getSchemaSource())); + } + } + } catch (Exception e) { + log.error("Cannot get candidate schema from event due to", e); + } + } + + protected Table createTable(GobblinMetadataChangeEvent gmce, HiveSpec spec) { + String schema = gmce.getTableSchema(); + org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(spec.getTable()); + IcebergUtils.IcebergDataAndPartitionSchema schemas = IcebergUtils.getIcebergSchema(schema, table); + TableIdentifier tid = TableIdentifier.of(table.getDbName(), table.getTableName()); + Schema tableSchema = schemas.tableSchema; + Preconditions.checkState(tableSchema != null, "Table schema cannot be null when creating a table"); + PartitionSpec partitionSpec = IcebergUtils.getPartitionSpec(tableSchema, schemas.partitionSchema); + Table icebergTable = null; + String tableLocation = null; + if (useDataLoacationAsTableLocation) { + tableLocation = gmce.getDatasetIdentifier().getNativeName() + TABLE_LOCATION_SUFFIX; + } + try (Timer.Context context = metricContext.timer(CREATE_TABLE_TIME).time()) { + icebergTable = + catalog.createTable(tid, tableSchema, partitionSpec, tableLocation, IcebergUtils.getTableProperties(table)); + log.info("Created table {}, schema: {} partition spec: {}", tid, tableSchema, partitionSpec); + } catch (AlreadyExistsException e) { + log.warn("table {} already exist, there may be some other process try to create table concurrently", tid); + } + return icebergTable; + } + + protected void rewriteFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, + Map<String, Collection<HiveSpec>> oldSpecsMap, Table table, TableMetadata tableMetadata) throws IOException { + PartitionSpec partitionSpec = table.spec(); + Transaction transaction = tableMetadata.transaction.get(); + Set<DataFile> newDataFiles = new HashSet<>(); + getIcebergDataFilesToBeAdded(gmce.getNewFiles(), partitionSpec, newSpecsMap, + IcebergUtils.getSchemaIdMap(getSchemaWithOriginId(gmce), table.schema())).stream() + .filter(dataFile -> tableMetadata.addedFiles.getIfPresent(dataFile.path()) == null) + .forEach(dataFile -> { + newDataFiles.add(dataFile); + tableMetadata.addedFiles.put(dataFile.path(), ""); + }); + Set<DataFile> oldDataFiles = getIcebergDataFilesToBeDeleted(gmce, table, newSpecsMap, oldSpecsMap, partitionSpec); + if (oldDataFiles.isEmpty() && !newDataFiles.isEmpty()) { + //We randomly check whether one of the new data files already exists in the db to avoid duplication of re-write events + DataFile file = newDataFiles.iterator().next(); + Expression exp = Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, (String) file.path()); + if (FindFiles.in(table).withMetadataMatching(exp).collect().iterator().hasNext()) { + //This means this re-write event is duplicated with the one we already handled, so directly return + return; + } + //This is the case we register daily partition to additional database + //So we directly call addFiles interface to add file into the table. + tableMetadata.appendFiles = Optional.of(tableMetadata.appendFiles.or(transaction::newAppend)); + AppendFiles appendFiles = tableMetadata.appendFiles.get(); + newDataFiles.forEach(appendFiles::appendFile); + return; + } + transaction.newRewrite().rewriteFiles(oldDataFiles, newDataFiles).commit(); + } + + /** + * Given the GMCE, get the iceberg schema with the origin ID specified by data pipeline which + * is corresponding to the file metrics index. + * @param gmce GMCE emitted by data pipeline + * @return iceberg schema with the origin ID + */ + private Schema getSchemaWithOriginId(GobblinMetadataChangeEvent gmce) { + Schema schemaWithOriginId = null; + if (gmce.getAvroSchemaWithIcebergSchemaID() != null) { + org.apache.iceberg.shaded.org.apache.avro.Schema avroSchema = + new org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(gmce.getAvroSchemaWithIcebergSchemaID()); + schemaWithOriginId = AvroSchemaUtil.toIceberg(avroSchema); + } + return schemaWithOriginId; + } + + protected void dropFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> oldSpecsMap, Table table, + TableMetadata tableMetadata, TableIdentifier tid) throws IOException { + PartitionSpec partitionSpec = table.spec(); + Transaction transaction = tableMetadata.transaction.get(); + Set<DataFile> oldDataFiles = + getIcebergDataFilesToBeDeleted(gmce, table, new HashMap<>(), oldSpecsMap, partitionSpec); + tableMetadata.deleteFiles = Optional.of(tableMetadata.deleteFiles.or(transaction::newDelete)); + DeleteFiles deleteFiles = tableMetadata.deleteFiles.get(); + oldDataFiles.stream().forEach(deleteFiles::deleteFile); + parallelRunner.submitCallable(new Callable<Void>() { + @Override + public Void call() throws Exception { + try { + long olderThan = getExpireSnapshotTime(); + long start = System.currentTimeMillis(); + ExpireSnapshots expireSnapshots = table.expireSnapshots(); + final Table tmpTable = table; + expireSnapshots.deleteWith(new Consumer<String>() { + @Override + public void accept(String file) { + if (file.startsWith(tmpTable.location())) { + tmpTable.io().deleteFile(file); + } + } + }).expireOlderThan(olderThan).commit(); + //TODO: emit these metrics to Ingraphs, in addition to metrics for publishing new snapshots and other Iceberg metadata operations. + log.info("Spent {} ms to expire snapshots older than {} ({}) in table {}", System.currentTimeMillis() - start, + new DateTime(olderThan).toString(), olderThan, tid.toString()); + } catch (Exception e) { + log.error(String.format("Fail to expire snapshots for table %s due to exception ", tid.toString()), e); + } + return null; + } + }, tid.toString()); + } + + private long getExpireSnapshotTime() { + PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendYears() + .appendSuffix("y") + .appendMonths() + .appendSuffix("M") + .appendDays() + .appendSuffix("d") + .appendHours() + .appendSuffix("h") + .appendMinutes() + .appendSuffix("m") + .toFormatter(); + return DateTime.now() + .minus(periodFormatter.parsePeriod( + conf.get(EXPIRE_SNAPSHOTS_LOOKBACK_TIME, DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME))) + .getMillis(); + } + + protected void addFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, Table table, + TableMetadata tableMetadata) throws IOException { + PartitionSpec partitionSpec = table.spec(); + Transaction transaction = tableMetadata.transaction.get(); + tableMetadata.appendFiles = Optional.of(tableMetadata.appendFiles.or(transaction::newAppend)); + AppendFiles appendFiles = tableMetadata.appendFiles.get(); + getIcebergDataFilesToBeAdded(gmce.getNewFiles(), partitionSpec, newSpecsMap, + IcebergUtils.getSchemaIdMap(getSchemaWithOriginId(gmce), table.schema())).stream() + .filter(dataFile -> tableMetadata.addedFiles.getIfPresent(dataFile.path()) == null) + .forEach(dataFile -> { + appendFiles.appendFile(dataFile); + tableMetadata.addedFiles.put(dataFile.path(), ""); + }); + } + + /** + * Method to get dataFiles without metrics information + * This method is used to get files to be deleted from iceberg + * If oldFilePrefixes is specified in gmce, this method will use those prefixes to find old file in iceberg, + * or the method will callmethod {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path + */ + private Set<DataFile> getIcebergDataFilesToBeDeleted(GobblinMetadataChangeEvent gmce, Table table, + Map<String, Collection<HiveSpec>> newSpecsMap, Map<String, Collection<HiveSpec>> oldSpecsMap, + PartitionSpec partitionSpec) throws IOException { + Set<DataFile> oldDataFiles = new HashSet<>(); + if (gmce.getOldFilePrefixes() != null) { + Expression exp = Expressions.alwaysFalse(); + for (String prefix : gmce.getOldFilePrefixes()) { + //Use both full path and raw path to delete old files + exp = Expressions.or(exp, Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, prefix)); + String rawPathPrefix = new Path(prefix).toUri().getRawPath(); + exp = Expressions.or(exp, Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, rawPathPrefix)); + } + long start = System.currentTimeMillis(); + oldDataFiles.addAll(Sets.newHashSet(FindFiles.in(table).withMetadataMatching(exp).collect().iterator())); + //Use INFO level log here to get better estimate. + //This shouldn't overwhelm the log since we receive less than 3 rewrite_file gmces for one table in one day + log.info("Spent {}ms to query all old files in iceberg.", System.currentTimeMillis() - start); + } else { + for (String file : gmce.getOldFiles()) { + String specPath = new Path(file).getParent().toString(); + // For the use case of recompaction, the old path may contains /daily path, in this case, we find the spec from newSpecsMap + StructLike partition = getIcebergPartition( + oldSpecsMap.containsKey(specPath) ? oldSpecsMap.get(specPath) : newSpecsMap.get(specPath), file, + partitionSpec); + oldDataFiles.add(IcebergUtils.getIcebergDataFileWithoutMetric(file, partitionSpec, partition)); + } + } + return oldDataFiles; + } + + /** + * Method to get dataFiles with metrics information + * This method is used to get files to be added to iceberg + * This method will call method {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path + */ + private Set<DataFile> getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files, + PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> newSpecsMap, Map<Integer, Integer> schemaIdMap) + throws IOException { + Set<DataFile> dataFiles = new HashSet<>(); + for (org.apache.gobblin.metadata.DataFile file : files) { + try { + StructLike partition = getIcebergPartition(newSpecsMap.get(new Path(file.getFilePath()).getParent().toString()), + file.getFilePath(), partitionSpec); + dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, partitionSpec, partition, conf, schemaIdMap)); + } catch (Exception e) { + log.warn("Cannot get DataFile for {} dur to {}", file.getFilePath(), e); + } + } + return dataFiles; + } + + private StructLike getIcebergPartition(Collection<HiveSpec> specs, String filePath, PartitionSpec partitionSpec) + throws IOException { + if (specs == null || specs.isEmpty()) { + throw new IOException("Cannot get hive spec for " + filePath); + } + HivePartition hivePartition = specs.iterator().next().getPartition().orNull(); + StructLike partition = hivePartition == null ? null + : IcebergUtils.getPartition(partitionSpec.partitionType(), hivePartition.getValues()); + return partition; + } + + /** + * For flush of each table, we do the following logic: + * 1. Commit the appendFiles if it exist + * 2. Update the new table property: high watermark of GMCE, data offset range, schema versions + * 3. Update the schema + * 4. Commit the transaction + * 5. Clear cache + * @param dbName + * @param tableName + */ + @Override + public void flush(String dbName, String tableName) throws IOException { + Lock writeLock = readWriteLock.writeLock(); + writeLock.lock(); + try { + TableIdentifier tid = TableIdentifier.of(dbName, tableName); + TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new TableMetadata()); + if (tableMetadata.transaction.isPresent()) { + Transaction transaction = tableMetadata.transaction.get(); + if (tableMetadata.appendFiles.isPresent()) { + tableMetadata.appendFiles.get().commit(); + } + if (tableMetadata.deleteFiles.isPresent()) { + tableMetadata.deleteFiles.get().commit(); + } + Map<String, String> props = tableMetadata.newProperties.or( + Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))); + //Set high waterMark + Long highWatermark = tableCurrentWaterMarkMap.get(tid); + props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicpartitionMap.get(tid)), highWatermark.toString()); + //Set low waterMark + props.put(String.format(GMCE_LOW_WATERMARK_KEY, tableTopicpartitionMap.get(tid)), + tableMetadata.lowWatermark.get().toString()); + //Set whether to delete metadata files after commit + props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, Boolean.toString( + conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT))); + props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString( + conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT))); + //Set data offset range + boolean containOffsetRange = setDatasetOffsetRange(tableMetadata, props); + String topicName = tableName; + if (containOffsetRange) { + String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next(); + //In case the topic name is not the table name or the topic name contains '-' + topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-')); + } + //Update schema + updateSchema(tableMetadata, props, topicName); + //Update properties + UpdateProperties updateProperties = transaction.updateProperties(); + props.forEach(updateProperties::set); + updateProperties.commit(); + try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName)) { + transaction.commitTransaction(); + } + Snapshot snapshot = tableMetadata.table.get().currentSnapshot(); + Map<String, String> currentProps = tableMetadata.table.get().properties(); + submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark); + //Reset the table metadata for next accumulation period + tableMetadata.reset(currentProps, highWatermark); + log.info(String.format("Finish flushing for table %s", tid.toString())); + } + } finally { + writeLock.unlock(); + } + } + + private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata tableMetadata, String dbName, + String tableName, Map<String, String> props, Long highWaterMark) { + GobblinEventBuilder gobblinTrackingEvent = + new GobblinEventBuilder(IcebergMCEMetadataKeys.ICEBERG_COMMIT_EVENT_NAME); + long currentSnapshotID = snapshot.snapshotId(); + long endToEndLag = System.currentTimeMillis() - tableMetadata.lowestGMCEEmittedTime; + TableIdentifier tid = TableIdentifier.of(dbName, tableName); + String gmceTopicPartition = tableTopicpartitionMap.get(tid); + + //Add information to automatically trigger repair jon when data loss happen + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_NAME, gmceTopicPartition.split("-")[0]); + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, gmceTopicPartition.split("-")[1]); + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK, highWaterMark.toString()); + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK, + tableMetadata.lowWatermark.get().toString()); + + //Add information for lag monitoring + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.LAG_KEY_NAME, Long.toString(endToEndLag)); + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.SNAPSHOT_KEY_NAME, Long.toString(currentSnapshotID)); + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.MANIFEST_LOCATION, snapshot.manifestListLocation()); + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.SNAPSHOT_INFORMATION_KEY_NAME, + Joiner.on(",").withKeyValueSeparator("=").join(snapshot.summary())); + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.TABLE_KEY_NAME, tableName); + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATABASE_KEY_NAME, dbName); + gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DATASET_HDFS_PATH, tableMetadata.datasetName); + for (Map.Entry<String, String> entry : props.entrySet()) { + if (entry.getKey().startsWith(OFFSET_RANGE_KEY_PREFIX)) { + gobblinTrackingEvent.addMetadata(entry.getKey(), entry.getValue()); + } + } + eventSubmitter.submit(gobblinTrackingEvent); + } + + private boolean setDatasetOffsetRange(TableMetadata tableMetadata, Map<String, String> props) { + if (tableMetadata.dataOffsetRange.isPresent() && !tableMetadata.dataOffsetRange.get().isEmpty()) { + for (Map.Entry<String, List<Range>> offsets : tableMetadata.dataOffsetRange.get().entrySet()) { + List<Range> ranges = offsets.getValue(); + String rangeString = ranges.stream() + .map(r -> Joiner.on(ConfigurationKeys.RANGE_DELIMITER_KEY).join(r.lowerEndpoint(), r.upperEndpoint())) + .collect(Collectors.joining(ConfigurationKeys.LIST_DELIMITER_KEY)); + props.put(String.format(OFFSET_RANGE_KEY_FORMAT, offsets.getKey()), rangeString); + } + return true; + } + return false; + } + + private void updateSchema(TableMetadata tableMetadata, Map<String, String> props, String topicName) { + //Set default schema versions + props.put(SCHEMA_CREATION_TIME_KEY, tableMetadata.lastSchemaVersion.or(DEFAULT_CREATION_TIME)); + // Update Schema + try { + if (tableMetadata.candidateSchemas.isPresent() && tableMetadata.candidateSchemas.get().size() > 0) { + Cache candidates = tableMetadata.candidateSchemas.get(); + //Only have default schema, so either we calculate schema from event or the schema does not have creation time, directly update it + if (candidates.size() == 1 && candidates.getIfPresent(DEFAULT_CREATION_TIME) != null) { + updateSchemaHelper(DEFAULT_CREATION_TIME, (Schema) candidates.getIfPresent(DEFAULT_CREATION_TIME), props, + tableMetadata.table.get()); + } else { + //update schema if candidates contains the schema that has the same creation time with the latest schema + org.apache.avro.Schema latestSchema = + (org.apache.avro.Schema) schemaRegistry.getLatestSchemaByTopic(topicName); + String creationTime = AvroUtils.getSchemaCreationTime(latestSchema); + if (creationTime == null) { + log.warn( + "Schema from schema registry does not contain creation time, check config for schema registry class"); + } else if (candidates.getIfPresent(creationTime) != null) { + updateSchemaHelper(creationTime, (Schema) candidates.getIfPresent(creationTime), props, + tableMetadata.table.get()); + } + } + } + } catch (SchemaRegistryException e) { + log.error("Cannot get schema form schema registry, will not update this schema", e); + } + } + + private void updateSchemaHelper(String schemaCreationTime, Schema schema, Map<String, String> props, Table table) { + try { + props.put(SCHEMA_CREATION_TIME_KEY, schemaCreationTime); + table.updateSchema().unionByNameWith(schema).commit(); + } catch (Exception e) { Review comment: This is mainly because the API definition does not specify which type of exception it will throw, and I do want to make sure that all exception thrown by schema update will not fail the job. Since most of the time it's because iceberg does not support such evolution, which should not affect the job process. I will update the code to make sure schema creation key set after the schema change, so in this case, even the schema update does not succeed, next flush it will try again. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
