This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 109c0242 Move metadata to single json field
109c0242 is described below
commit 109c0242cb4da249940541024eef4a41e0edd348
Author: Timothy Brown <[email protected]>
AuthorDate: Thu Apr 25 11:19:53 2024 -0500
Move metadata to single json field
---
api/pom.xml | 14 ++-
.../org/apache/xtable/model/TableSyncMetadata.java | 104 ---------------------
.../xtable/model/metadata/TableSyncMetadata.java | 93 ++++++++++++++++++
.../apache/xtable/spi/sync/ConversionTarget.java | 2 +-
.../apache/xtable/spi/sync/TableFormatSync.java | 2 +-
.../model/metadata/TestTableSyncMetadata.java | 79 ++++++++++++++++
.../xtable/spi/sync/TestTableFormatSync.java | 2 +-
.../xtable/conversion/ConversionController.java | 2 +-
.../apache/xtable/delta/DeltaConversionTarget.java | 14 ++-
.../apache/xtable/hudi/HudiConversionTarget.java | 11 ++-
.../xtable/iceberg/IcebergConversionTarget.java | 18 +---
.../conversion/TestConversionController.java | 2 +-
.../xtable/hudi/ITHudiConversionSourceTarget.java | 2 +-
.../hudi/TestHudiConversionSourceTarget.java | 2 +-
.../org/apache/xtable/iceberg/TestIcebergSync.java | 2 +-
15 files changed, 210 insertions(+), 139 deletions(-)
diff --git a/api/pom.xml b/api/pom.xml
index 21d1af61..2d591465 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -50,6 +50,16 @@
<artifactId>log4j-1.2-api</artifactId>
</dependency>
+ <!--Jackson -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ </dependency>
+
<!-- Junit -->
<dependency>
<groupId>org.junit.jupiter</groupId>
@@ -70,9 +80,5 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
</dependencies>
</project>
diff --git a/api/src/main/java/org/apache/xtable/model/TableSyncMetadata.java
b/api/src/main/java/org/apache/xtable/model/TableSyncMetadata.java
deleted file mode 100644
index 7c0b8c1a..00000000
--- a/api/src/main/java/org/apache/xtable/model/TableSyncMetadata.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.model;
-
-import java.time.Instant;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import lombok.AllArgsConstructor;
-import lombok.Value;
-
-/**
- * Metadata representing the state of a table sync process. This metadata is
stored in the target
- * table's properties and is used to track the status of previous sync
operation.
- */
-@AllArgsConstructor(staticName = "of")
-@Value
-public class TableSyncMetadata {
- /**
- * Property name for the lastInstantSynced field from SyncResult, used for
persisting
- * lastInstantSynced in the table metadata/properties
- */
- public static final String XTABLE_LAST_INSTANT_SYNCED_PROP =
"XTABLE_LAST_INSTANT_SYNCED";
- /**
- * Property name for the list of instants to consider during the next sync.
This list may include
- * out-of-order instants that could be missed without explicit tracking.
- */
- public static final String INFLIGHT_COMMITS_TO_CONSIDER_FOR_NEXT_SYNC_PROP =
- "INFLIGHT_COMMITS_TO_CONSIDER_FOR_NEXT_SYNC";
-
- Instant lastInstantSynced;
- List<Instant> instantsToConsiderForNextSync;
-
- public Map<String, String> asMap() {
- Map<String, String> map = new HashMap<>();
- map.put(XTABLE_LAST_INSTANT_SYNCED_PROP, lastInstantSynced.toString());
- map.put(
- INFLIGHT_COMMITS_TO_CONSIDER_FOR_NEXT_SYNC_PROP,
- convertInstantsToConsiderForNextSyncToString());
- return map;
- }
-
- public static Optional<TableSyncMetadata> fromMap(Map<String, String>
properties) {
- if (properties != null) {
- Instant lastInstantSynced = null;
- List<Instant> instantsToConsiderForNextSync = null;
- if (properties.containsKey(XTABLE_LAST_INSTANT_SYNCED_PROP)) {
- lastInstantSynced =
Instant.parse(properties.get(XTABLE_LAST_INSTANT_SYNCED_PROP));
- }
- if
(properties.containsKey(INFLIGHT_COMMITS_TO_CONSIDER_FOR_NEXT_SYNC_PROP)) {
- instantsToConsiderForNextSync =
- convertStringToInstantsToConsiderForNextSync(
-
properties.get(INFLIGHT_COMMITS_TO_CONSIDER_FOR_NEXT_SYNC_PROP));
- }
- return Optional.ofNullable(
- TableSyncMetadata.of(lastInstantSynced,
instantsToConsiderForNextSync));
- }
- return Optional.empty();
- }
-
- private String convertInstantsToConsiderForNextSyncToString() {
- if (instantsToConsiderForNextSync == null ||
instantsToConsiderForNextSync.isEmpty()) {
- return "";
- }
- Collections.sort(instantsToConsiderForNextSync);
- return instantsToConsiderForNextSync.stream()
- .map(Instant::toString)
- .collect(Collectors.joining(","));
- }
-
- private static List<Instant> convertStringToInstantsToConsiderForNextSync(
- String instantsToConsiderForNextSync) {
- if (instantsToConsiderForNextSync == null ||
instantsToConsiderForNextSync.isEmpty()) {
- return Collections.emptyList();
- }
- List<Instant> instantsList =
- Arrays.stream(instantsToConsiderForNextSync.split(","))
- .map(String::trim)
- .map(Instant::parse)
- .collect(Collectors.toList());
- return instantsList;
- }
-}
diff --git
a/api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
b/api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
new file mode 100644
index 00000000..fa5eeaff
--- /dev/null
+++ b/api/src/main/java/org/apache/xtable/model/metadata/TableSyncMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.model.metadata;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Value;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+import org.apache.xtable.model.exception.ParseException;
+
+/**
+ * Metadata representing the state of a table sync process. This metadata is
stored in the target
+ * table's properties and is used to track the status of previous sync
operation.
+ */
+@Value
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+public class TableSyncMetadata {
+ private static final int CURRENT_VERSION = 0;
+ private static final ObjectMapper MAPPER =
+ new ObjectMapper()
+ .registerModule(new JavaTimeModule())
+ .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ /** Property name for the XTABLE metadata in the table metadata/properties */
+ public static final String XTABLE_METADATA = "XTABLE_METADATA";
+
+ Instant lastInstantSynced;
+ List<Instant> instantsToConsiderForNextSync;
+ int version;
+
+ public static TableSyncMetadata of(
+ Instant lastInstantSynced, List<Instant> instantsToConsiderForNextSync) {
+ return new TableSyncMetadata(lastInstantSynced,
instantsToConsiderForNextSync, CURRENT_VERSION);
+ }
+
+ public String toJson() {
+ try {
+ return MAPPER.writeValueAsString(this);
+ } catch (IOException e) {
+ throw new ParseException("Failed to serialize TableSyncMetadata", e);
+ }
+ }
+
+ public static Optional<TableSyncMetadata> fromJson(String metadata) {
+ if (metadata == null || metadata.isEmpty()) {
+ return Optional.empty();
+ } else {
+ try {
+ TableSyncMetadata parsedMetadata = MAPPER.readValue(metadata,
TableSyncMetadata.class);
+ if (parsedMetadata.getLastInstantSynced() == null) {
+ throw new ParseException("LastInstantSynced is required in
TableSyncMetadata");
+ }
+ if (parsedMetadata.getVersion() > CURRENT_VERSION) {
+ throw new ParseException(
+ "Unable handle metadata version: "
+ + parsedMetadata.getVersion()
+ + " max supported version: "
+ + CURRENT_VERSION);
+ }
+ return Optional.of(parsedMetadata);
+ } catch (IOException e) {
+ throw new ParseException("Failed to deserialize TableSyncMetadata", e);
+ }
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
b/api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
index 2ff6fb27..a0df756a 100644
--- a/api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
+++ b/api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.xtable.conversion.PerTableConfig;
import org.apache.xtable.model.InternalTable;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataFilesDiff;
diff --git a/api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
b/api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
index 41e91525..7cd0b384 100644
--- a/api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
+++ b/api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
@@ -37,7 +37,7 @@ import org.apache.xtable.model.IncrementalTableChanges;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
diff --git
a/api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
b/api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
new file mode 100644
index 00000000..3dcc6fbe
--- /dev/null
+++
b/api/src/test/java/org/apache/xtable/model/metadata/TestTableSyncMetadata.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.model.metadata;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.xtable.model.exception.ParseException;
+
+class TestTableSyncMetadata {
+
+ @ParameterizedTest
+ @MethodSource("provideMetadataAndJson")
+ void jsonRoundTrip(TableSyncMetadata metadata, String expectedJson) {
+ assertEquals(expectedJson, metadata.toJson());
+ assertEquals(metadata, TableSyncMetadata.fromJson(expectedJson).get());
+ }
+
+ private static Stream<Arguments> provideMetadataAndJson() {
+ return Stream.of(
+ Arguments.of(
+ TableSyncMetadata.of(
+ Instant.parse("2020-07-04T10:15:30.00Z"),
+ Arrays.asList(
+ Instant.parse("2020-08-21T11:15:30.00Z"),
+ Instant.parse("2024-01-21T12:15:30.00Z"))),
+
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0}"),
+ Arguments.of(
+ TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"),
Collections.emptyList()),
+
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0}"),
+ Arguments.of(
+ TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"),
null),
+ "{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"));
+ }
+
+ @Test
+ void failToParseJsonFromNewerVersion() {
+ assertThrows(
+ ParseException.class,
+ () ->
+ TableSyncMetadata.fromJson(
+
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":1}"));
+ }
+
+ @Test
+ void failToParseJsonWithMissingLastSyncedInstant() {
+ assertThrows(
+ ParseException.class,
+ () ->
+ TableSyncMetadata.fromJson(
+
"{\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0}"));
+ }
+}
diff --git
a/api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
b/api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
index d401b08e..2a9e0588 100644
--- a/api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
+++ b/api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
@@ -43,7 +43,7 @@ import org.apache.xtable.model.IncrementalTableChanges;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
diff --git
a/core/src/main/java/org/apache/xtable/conversion/ConversionController.java
b/core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index f12c8fa4..34968733 100644
--- a/core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++ b/core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -42,7 +42,7 @@ import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.IncrementalTableChanges;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
import org.apache.xtable.spi.extractor.ConversionSource;
diff --git
a/core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
b/core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index 910fe1d7..120ee0e0 100644
--- a/core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++ b/core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -57,7 +57,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.xtable.conversion.PerTableConfig;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.InternalTable;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataFilesDiff;
@@ -202,9 +202,12 @@ public class DeltaConversionTarget implements
ConversionTarget {
@Override
public Optional<TableSyncMetadata> getTableMetadata() {
- return TableSyncMetadata.fromMap(
-
JavaConverters.mapAsJavaMapConverter(deltaLog.snapshot().metadata().configuration())
- .asJava());
+ return TableSyncMetadata.fromJson(
+ deltaLog
+ .snapshot()
+ .metadata()
+ .configuration()
+ .getOrElse(TableSyncMetadata.XTABLE_METADATA, () -> null));
}
@Override
@@ -265,9 +268,10 @@ public class DeltaConversionTarget implements
ConversionTarget {
}
private Map<String, String> getConfigurationsForDeltaSync() {
- Map<String, String> configMap = new HashMap<>(metadata.asMap());
+ Map<String, String> configMap = new HashMap<>();
configMap.put(DeltaConfigs.MIN_READER_VERSION().key(),
MIN_READER_VERSION);
configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(),
MIN_WRITER_VERSION);
+ configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
// Sets retention for the Delta Log, does not impact underlying files in
the table
configMap.put(
DeltaConfigs.LOG_RETENTION().key(), String.format("interval %d
hours", retentionInHours));
diff --git
a/core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
b/core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
index a778462d..b1a0bc91 100644
--- a/core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
+++ b/core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -84,7 +83,7 @@ import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.exception.UpdateException;
import org.apache.xtable.model.InternalTable;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
@@ -308,7 +307,10 @@ public class HudiConversionTarget implements
ConversionTarget {
throw new ReadException("Unable to read Hudi commit
metadata", ex);
}
})
- .flatMap(TableSyncMetadata::fromMap));
+ .flatMap(
+ metadata ->
+ TableSyncMetadata.fromJson(
+ metadata.get(TableSyncMetadata.XTABLE_METADATA))));
}
@Override
@@ -549,7 +551,8 @@ public class HudiConversionTarget implements
ConversionTarget {
}
private Option<Map<String, String>> getExtraMetadata() {
- Map<String, String> extraMetadata = new
HashMap<>(tableSyncMetadata.asMap());
+ Map<String, String> extraMetadata =
+ Collections.singletonMap(TableSyncMetadata.XTABLE_METADATA,
tableSyncMetadata.toJson());
return Option.of(extraMetadata);
}
diff --git
a/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
b/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index cb9cfeae..72125cb0 100644
--- a/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++ b/core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -18,13 +18,9 @@
package org.apache.xtable.iceberg;
-import static
org.apache.xtable.model.TableSyncMetadata.INFLIGHT_COMMITS_TO_CONSIDER_FOR_NEXT_SYNC_PROP;
-import static
org.apache.xtable.model.TableSyncMetadata.XTABLE_LAST_INSTANT_SYNCED_PROP;
-
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import lombok.extern.log4j.Log4j2;
@@ -45,7 +41,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.xtable.conversion.PerTableConfig;
import org.apache.xtable.model.InternalTable;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataFilesDiff;
@@ -179,9 +175,7 @@ public class IcebergConversionTarget implements
ConversionTarget {
@Override
public void syncMetadata(TableSyncMetadata metadata) {
UpdateProperties updateProperties = transaction.updateProperties();
- for (Map.Entry<String, String> stateProperty :
metadata.asMap().entrySet()) {
- updateProperties.set(stateProperty.getKey(), stateProperty.getValue());
- }
+ updateProperties.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
if (!table.properties().containsKey(TableProperties.WRITE_DATA_LOCATION)) {
// Required for a consistent write location when writing back to the
table as Iceberg
updateProperties.set(TableProperties.WRITE_DATA_LOCATION, basePath);
@@ -240,7 +234,7 @@ public class IcebergConversionTarget implements
ConversionTarget {
if (table == null) {
return Optional.empty();
}
- return TableSyncMetadata.fromMap(table.properties());
+ return
TableSyncMetadata.fromJson(table.properties().get(TableSyncMetadata.XTABLE_METADATA));
}
@Override
@@ -273,11 +267,7 @@ public class IcebergConversionTarget implements
ConversionTarget {
// in the subsequent sync round.
table.manageSnapshots().rollbackTo(currentSnapshot.parentId()).commit();
Transaction transaction = table.newTransaction();
- transaction
- .updateProperties()
- .remove(XTABLE_LAST_INSTANT_SYNCED_PROP)
- .remove(INFLIGHT_COMMITS_TO_CONSIDER_FOR_NEXT_SYNC_PROP)
- .commit();
+
transaction.updateProperties().remove(TableSyncMetadata.XTABLE_METADATA).commit();
transaction.commitTransaction();
}
}
diff --git
a/core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
b/core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index 4fe7c365..558b8ee5 100644
---
a/core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++
b/core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -49,7 +49,7 @@ import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
diff --git
a/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
b/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
index 26dba2b4..7990bfbf 100644
---
a/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
+++
b/core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
@@ -71,7 +71,7 @@ import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.xtable.conversion.PerTableConfigImpl;
import org.apache.xtable.model.InternalTable;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
diff --git
a/core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSourceTarget.java
b/core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSourceTarget.java
index 8f2ccd94..0e79f74f 100644
---
a/core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSourceTarget.java
+++
b/core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSourceTarget.java
@@ -43,7 +43,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.xtable.avro.AvroSchemaConverter;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.InternalTable;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
diff --git a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index fa8b162a..45140b44 100644
--- a/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -84,7 +84,7 @@ import org.apache.xtable.ITConversionController;
import org.apache.xtable.conversion.PerTableConfigImpl;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
-import org.apache.xtable.model.TableSyncMetadata;
+import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;