[
https://issues.apache.org/jira/browse/GOBBLIN-1335?focusedWorklogId=539425&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-539425
]
ASF GitHub Bot logged work on GOBBLIN-1335:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Jan/21 01:53
Start Date: 22/Jan/21 01:53
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on a change in pull request #3172:
URL: https://github.com/apache/incubator-gobblin/pull/3172#discussion_r562317196
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -0,0 +1,828 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
+import org.apache.gobblin.hive.AutoCloseableHiveLock;
+import org.apache.gobblin.hive.HiveLock;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
+import org.apache.gobblin.hive.HivePartition;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.iceberg.Utils.IcebergUtils;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metrics.GobblinMetricsRegistry;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import
org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.util.AutoReturnableObject;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.FindFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.thrift.TException;
+import org.joda.time.DateTime;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+
+
+/**
+ * This writer is used to calculate iceberg metadata from GMCE and register to
iceberg
+ * iceberg metadata here includes:
+ * 1. Data files that contained by the table and the metrics of the data files
+ * 2. Properties of the table (origin table properties, data offset range,
high watermark of the GMCE and schema created time)
+ * 3. Latest schema of the table
+ */
+@Slf4j
+public class IcebergMetadataWriter implements MetadataWriter {
+
+ public static final String USE_DATA_PATH_AS_TABLE_LOCATION =
"use.data.path.as.table.location";
+ public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata";
+ public static final String GMCE_HIGH_WATERMARK_KEY =
"gmce.high.watermark.%s";
+ public static final String GMCE_LOW_WATERMARK_KEY = "gmce.low.watermark.%s";
+ private final static String EXPIRE_SNAPSHOTS_LOOKBACK_TIME =
"gobblin.iceberg.dataset.expire.snapshots.lookBackTime";
+ private final static String DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "3d";
+ public static final String ICEBERG_REGISTRATION_BLACKLIST =
"iceberg.registration.blacklist";
+ public static final String ICEBERG_REGISTRATION_WHITELIST =
"iceberg.registration.whitelist";
+ private static final String CLUSTER_IDENTIFIER_KEY_NAME =
"clusterIdentifier";
+ private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
+ private static final String SCHEMA_CREATION_TIME_KEY =
"schema.creation.time";
+ private static final String ADDED_FILES_CACHE_EXPIRING_TIME =
"added.files.cache.expiring.time";
+ private static final int DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME = 1;
+ private static final String OFFSET_RANGE_KEY_PREFIX = "offset.range.";
+ private static final String OFFSET_RANGE_KEY_FORMAT =
OFFSET_RANGE_KEY_PREFIX + "%s";
+ private static final String ICEBERG_FILE_PATH_COLUMN = "file_path";
+ private static final String DEFAULT_CREATION_TIME = "0";
+ private static final String SNAPSHOT_EXPIRE_THREADS =
"snapshot.expire.threads";
+ private static final long DEFAULT_WATERMARK = -1L;
+ private final WhitelistBlacklist whiteistBlacklist;
+ private final Closer closer = Closer.create();
+ private final Map<TableIdentifier, Long> tableCurrentWaterMarkMap;
+ //Used to store the relationship between table and the gmce topicPartition
+ private final Map<TableIdentifier, String> tableTopicpartitionMap;
+ private final MetricContext metricContext;
+ private EventSubmitter eventSubmitter;
+ @Getter
+ private final KafkaSchemaRegistry schemaRegistry;
+ private final Map<TableIdentifier, TableMetadata> tableMetadataMap;
+ @Setter
+ private HiveCatalog catalog;
+ protected final Configuration conf;
+ protected final ReadWriteLock readWriteLock;
+ private final HiveLock locks;
+ private final ParallelRunner parallelRunner;
+ private final boolean useDataLoacationAsTableLocation;
+
+ public IcebergMetadataWriter(State state) throws IOException {
+ this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
+ conf = HadoopUtils.getConfFromState(state);
+ initializeCatalog();
+ tableTopicpartitionMap = new HashMap<>();
+ tableMetadataMap = new HashMap<>();
+ tableCurrentWaterMarkMap = new HashMap<>();
+ List<Tag<?>> tags = Lists.newArrayList();
+ String clusterIdentifier = ClustersNames.getInstance().getClusterName();
+ tags.add(new Tag<>(CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
+ metricContext = closer.register(
+ GobblinMetricsRegistry.getInstance().getMetricContext(state,
IcebergMetadataWriter.class, tags));
+ this.eventSubmitter =
+ new EventSubmitter.Builder(this.metricContext,
IcebergMCEMetadataKeys.METRICS_NAMESPACE_ICEBERG_WRITER).build();
+ this.whiteistBlacklist = new
WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""),
+ state.getProp(ICEBERG_REGISTRATION_BLACKLIST, ""));
+ // Use lock to make it safe when flush and write are called async
+ readWriteLock = new ReentrantReadWriteLock();
+ this.locks = new HiveLock(state.getProperties());
+ parallelRunner = closer.register(new
ParallelRunner(state.getPropAsInt(SNAPSHOT_EXPIRE_THREADS, 20),
+ FileSystem.get(HadoopUtils.getConfFromState(state))));
+ useDataLoacationAsTableLocation =
state.getPropAsBoolean(USE_DATA_PATH_AS_TABLE_LOCATION, false);
+ }
+
+ protected void initializeCatalog() {
+ catalog = HiveCatalogs.loadCatalog(conf);
+ }
+
+ private org.apache.iceberg.Table getIcebergTable(TableIdentifier tid) throws
NoSuchTableException {
+ TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
+ if (!tableMetadata.table.isPresent()) {
+ tableMetadata.table = Optional.of(catalog.loadTable(tid));
+ }
+ return tableMetadata.table.get();
+ }
+
+ /**
+ * The method is used to get current watermark of the gmce topic partition
for a table
+ * Make the watermark config name contains topicPartition in case we change
the gmce topic name for some reason
+ */
+ private Long getCurrentWaterMark(TableIdentifier tid, String topicPartition)
{
+ if (tableCurrentWaterMarkMap.containsKey(tid)) {
+ return tableCurrentWaterMarkMap.get(tid);
+ }
+ org.apache.iceberg.Table icebergTable;
+ Long currentWatermark = DEFAULT_WATERMARK;
+ try {
+ icebergTable = getIcebergTable(tid);
+ } catch (NoSuchTableException e) {
+ return currentWatermark;
+ }
+ currentWatermark =
+
icebergTable.properties().containsKey(String.format(GMCE_HIGH_WATERMARK_KEY,
topicPartition)) ? Long.parseLong(
+
icebergTable.properties().get(String.format(GMCE_HIGH_WATERMARK_KEY,
topicPartition))) : DEFAULT_WATERMARK;
+ if (currentWatermark != DEFAULT_WATERMARK) {
+ // set the low watermark for current snapshot
+ tableMetadataMap.computeIfAbsent(tid, t -> new
TableMetadata()).lowWatermark = Optional.of(currentWatermark);
+ }
+ return currentWatermark;
+ }
+
+ /**
+ *The write method will be responsible for process a given gmce and
aggregate the metadata
+ * The logic of this function will be:
+ * 1. Check whether a table exists, if not then create the iceberg table
+ * 2. Compute schema from the gmce and update the cache for candidate schemas
+ * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or
dropFile
+ * Note: this method only aggregate the metadata in cache without doing
commitment. The actual commit will be done in flush method
+ */
+ @SuppressWarnings("checkstyle:WhitespaceAround")
+ public void write(GobblinMetadataChangeEvent gmce, Map<String,
Collection<HiveSpec>> newSpecsMap,
+ Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec)
throws IOException {
+ System.out.println("$$here");
+ TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName());
+ TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
+ Table table;
+ try {
+ table = getIcebergTable(tid);
+ } catch (NoSuchTableException e) {
+ try {
+ if (gmce.getOperationType() == OperationType.drop_files) {
+ log.warn("Table {} does not exist, skip processing this drop_file
event", tid.toString());
+ return;
+ }
+ table = createTable(gmce, tableSpec);
+ tableMetadata.table = Optional.of(table);
+ } catch (Exception e1) {
+ log.error("skip processing {} for table {}.{} due to error when
creating table", gmce.toString(),
+ tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName());
+ log.debug(e1.toString());
+ return;
+ }
+ }
+ computeCandidateSchema(gmce, tid, tableSpec);
+ tableMetadata.transaction =
Optional.of(tableMetadata.transaction.or(table::newTransaction));
+ tableMetadata.lowestGMCEEmittedTime =
Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
+ switch (gmce.getOperationType()) {
+ case add_files: {
+ updateTableProperty(tableSpec, tid);
+ addFiles(gmce, newSpecsMap, table, tableMetadata);
+ if (gmce.getTopicPartitionOffsetsRange() != null) {
+ mergeOffsets(gmce, tid);
+ }
+ break;
+ }
+ case rewrite_files: {
+ updateTableProperty(tableSpec, tid);
+ rewriteFiles(gmce, newSpecsMap, oldSpecsMap, table, tableMetadata);
+ break;
+ }
+ case drop_files: {
+ dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
+ break;
+ }
+ default: {
+ log.error("unsupported operation {}",
gmce.getOperationType().toString());
+ return;
+ }
+ }
+ }
+
+ private HashMap<String, List<Range>> getLastOffset(TableMetadata
tableMetadata) {
+ HashMap<String, List<Range>> offsets = new HashMap<>();
+ if (tableMetadata.lastProperties.isPresent()) {
+ for (Map.Entry<String, String> entry :
tableMetadata.lastProperties.get().entrySet()) {
+ if (entry.getKey().startsWith(OFFSET_RANGE_KEY_PREFIX)) {
+ List<Range> ranges =
Arrays.asList(entry.getValue().split(ConfigurationKeys.LIST_DELIMITER_KEY))
+ .stream()
+ .map(s -> Range.openClosed(
+
Long.parseLong(Splitter.on(ConfigurationKeys.RANGE_DELIMITER_KEY).splitToList(s).get(0)),
+
Long.parseLong(Splitter.on(ConfigurationKeys.RANGE_DELIMITER_KEY).splitToList(s).get(1))))
+ .collect(Collectors.toList());
+
offsets.put(entry.getKey().substring(OFFSET_RANGE_KEY_PREFIX.length()), ranges);
+ }
+ }
+ }
+ return offsets;
+ }
+
+ private void mergeOffsets(GobblinMetadataChangeEvent gmce, TableIdentifier
tid) throws IOException {
+ TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
+ tableMetadata.dataOffsetRange =
Optional.of(tableMetadata.dataOffsetRange.or(() ->
getLastOffset(tableMetadata)));
+ Map<String, List<Range>> offsets = tableMetadata.dataOffsetRange.get();
+ for (Map.Entry<String, String> entry :
gmce.getTopicPartitionOffsetsRange().entrySet()) {
+ Range range = Range.openClosed(
+
Long.parseLong(Splitter.on(ConfigurationKeys.RANGE_DELIMITER_KEY).splitToList(entry.getValue()).get(0)),
+ Long.parseLong(
+
Splitter.on(ConfigurationKeys.RANGE_DELIMITER_KEY).splitToList(entry.getValue()).get(1)));
+ List<Range> existRanges = offsets.getOrDefault(entry.getKey(), new
ArrayList<>());
+ List<Range> newRanges = new ArrayList<>();
+ for (Range r : existRanges) {
+ if (range.isConnected(r)) {
+ range = range.span(r);
+ } else {
+ newRanges.add(r);
+ }
+ }
+ newRanges.add(range);
+ Collections.sort(newRanges, new Comparator<Range>() {
+ @Override
+ public int compare(Range o1, Range o2) {
+ return o1.lowerEndpoint().compareTo(o2.lowerEndpoint());
+ }
+ });
+ offsets.put(entry.getKey(), newRanges);
+ }
+ }
+
+ private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) {
+ org.apache.hadoop.hive.metastore.api.Table table =
HiveMetaStoreUtils.getTable(tableSpec.getTable());
+ tableMetadataMap.computeIfAbsent(tid, t -> new
TableMetadata()).newProperties =
+ Optional.of(IcebergUtils.getTableProperties(table));
+ }
+
+ /**
+ * Compute the candidate schema from the gmce.
+ * If the schema source is schemaRegistry, we will use the schema creation
time as the schema version to compute candidate schema and determine latest
schema
+ * If the schema does not contain creation time, we will treat it the same
as when schema source is event
+ * If the schema source is event, we will put it as default creation time,
during flush, if we only have one candidate with default creation time,
+ * we'll use that to update schema.
+ * @param gmce
+ * @param tid
+ */
+ private void computeCandidateSchema(GobblinMetadataChangeEvent gmce,
TableIdentifier tid, HiveSpec spec)
+ throws IOException {
+ Table table = getIcebergTable(tid);
+ TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
+ org.apache.hadoop.hive.metastore.api.Table hiveTable =
HiveMetaStoreUtils.getTable(spec.getTable());
+ tableMetadata.lastProperties =
Optional.of(tableMetadata.lastProperties.or(() -> table.properties()));
+ Map<String, String> props = tableMetadata.lastProperties.get();
+ tableMetadata.lastSchemaVersion = Optional.of(
+ tableMetadata.lastSchemaVersion.or(() ->
props.getOrDefault(SCHEMA_CREATION_TIME_KEY, DEFAULT_CREATION_TIME)));
+ String lastSchemaVersion = tableMetadata.lastSchemaVersion.get();
+ tableMetadata.candidateSchemas =
Optional.of(tableMetadata.candidateSchemas.or(() -> CacheBuilder.newBuilder()
+ .expireAfterAccess(conf.getInt(GobblinMCEWriter.CACHE_EXPIRING_TIME,
+ GobblinMCEWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
+ .build()));
+ Cache<String, Schema> candidate = tableMetadata.candidateSchemas.get();
+ try {
+ switch (gmce.getSchemaSource()) {
+ case SCHEMAREGISTRY: {
+ org.apache.avro.Schema schema = new
org.apache.avro.Schema.Parser().parse(gmce.getTableSchema());
+ String createdOn = AvroUtils.getSchemaCreationTime(schema);
+ if (createdOn == null) {
+ candidate.put(DEFAULT_CREATION_TIME,
+ IcebergUtils.getIcebergSchema(gmce.getTableSchema(),
hiveTable).tableSchema);
+ } else if (!createdOn.equals(lastSchemaVersion)) {
+ candidate.put(createdOn,
IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
+ }
+ break;
+ }
+ case EVENT: {
+ candidate.put(DEFAULT_CREATION_TIME,
+ IcebergUtils.getIcebergSchema(gmce.getTableSchema(),
hiveTable).tableSchema);
+ break;
+ }
+ case NONE: {
+ log.debug("Schema source set to be none, will ignore the schema");
+ break;
+ }
+ default: {
+ throw new IOException(String.format("unsupported schema source %s",
gmce.getSchemaSource()));
+ }
+ }
+ } catch (Exception e) {
+ log.error("Cannot get candidate schema from event due to", e);
+ }
+ }
+
+ protected Table createTable(GobblinMetadataChangeEvent gmce, HiveSpec spec) {
+ String schema = gmce.getTableSchema();
+ org.apache.hadoop.hive.metastore.api.Table table =
HiveMetaStoreUtils.getTable(spec.getTable());
+ IcebergUtils.IcebergDataAndPartitionSchema schemas =
IcebergUtils.getIcebergSchema(schema, table);
+ TableIdentifier tid = TableIdentifier.of(table.getDbName(),
table.getTableName());
+ Schema tableSchema = schemas.tableSchema;
+ Preconditions.checkState(tableSchema != null, "Table schema cannot be null
when creating a table");
+ PartitionSpec partitionSpec = IcebergUtils.getPartitionSpec(tableSchema,
schemas.partitionSchema);
+ Table icebergTable = null;
+ String tableLocation = null;
+ if (useDataLoacationAsTableLocation) {
+ tableLocation = gmce.getDatasetIdentifier().getNativeName() +
TABLE_LOCATION_SUFFIX;
+ }
+ try (Timer.Context context =
metricContext.timer(CREATE_TABLE_TIME).time()) {
+ icebergTable =
+ catalog.createTable(tid, tableSchema, partitionSpec, tableLocation,
IcebergUtils.getTableProperties(table));
+ log.info("Created table {}, schema: {} partition spec: {}", tid,
tableSchema, partitionSpec);
+ } catch (AlreadyExistsException e) {
+ log.warn("table {} already exist, there may be some other process try to
create table concurrently", tid);
+ }
+ return icebergTable;
+ }
+
+ protected void rewriteFiles(GobblinMetadataChangeEvent gmce, Map<String,
Collection<HiveSpec>> newSpecsMap,
+ Map<String, Collection<HiveSpec>> oldSpecsMap, Table table,
TableMetadata tableMetadata) throws IOException {
+ PartitionSpec partitionSpec = table.spec();
+ Transaction transaction = tableMetadata.transaction.get();
Review comment:
I add one line to force the transaction to be set
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 539425)
Time Spent: 3h 10m (was: 3h)
> Publish GMCE(GobblinMetadataChangeEvent) publisher and iceberg retention job
> to Gobblin OSS
> -------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1335
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1335
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> Publish GMCE() publisher and iceberg retention job to Gobblin OSS, which
> contains the file information for added/deleted/rewrited files, and will be
> then used for iceberg registration.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)