vinishjail97 commented on code in PR #591:
URL: https://github.com/apache/incubator-xtable/pull/591#discussion_r1889215252
##########
xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java:
##########
@@ -89,71 +103,126 @@ public <COMMIT> Map<String, SyncResult> sync(
try (ConversionSource<COMMIT> conversionSource =
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
ExtractFromSource<COMMIT> source =
ExtractFromSource.of(conversionSource);
+ return syncTableFormats(config, source, config.getSyncMode());
+ } catch (IOException ioException) {
+ throw new ReadException("Failed to close source converter", ioException);
+ }
+ }
- Map<String, ConversionTarget> conversionTargetByFormat =
- config.getTargetTables().stream()
- .collect(
- Collectors.toMap(
- TargetTable::getFormatName,
- targetTable ->
conversionTargetFactory.createForFormat(targetTable, conf)));
- // State for each TableFormat
- Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
- conversionTargetByFormat.entrySet().stream()
- .collect(
- Collectors.toMap(
- Map.Entry::getKey, entry ->
entry.getValue().getTableMetadata()));
- Map<String, ConversionTarget> formatsToSyncIncrementally =
- getFormatsToSyncIncrementally(
- config,
- conversionTargetByFormat,
- lastSyncMetadataByFormat,
- source.getConversionSource());
- Map<String, ConversionTarget> formatsToSyncBySnapshot =
- conversionTargetByFormat.entrySet().stream()
- .filter(entry ->
!formatsToSyncIncrementally.containsKey(entry.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
- SyncResultForTableFormats syncResultForSnapshotSync =
- formatsToSyncBySnapshot.isEmpty()
- ? SyncResultForTableFormats.builder().build()
- : syncSnapshot(formatsToSyncBySnapshot, source);
- SyncResultForTableFormats syncResultForIncrementalSync =
- formatsToSyncIncrementally.isEmpty()
- ? SyncResultForTableFormats.builder().build()
- : syncIncrementalChanges(
- formatsToSyncIncrementally, lastSyncMetadataByFormat,
source);
- Map<String, SyncResult> syncResultsMerged =
- new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
- syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
- String successfulSyncs =
- getFormatsWithStatusCode(syncResultsMerged,
SyncResult.SyncStatusCode.SUCCESS);
- if (!successfulSyncs.isEmpty()) {
- log.info("Sync is successful for the following formats {}",
successfulSyncs);
- }
- String failedSyncs =
- getFormatsWithStatusCode(syncResultsMerged,
SyncResult.SyncStatusCode.ERROR);
- if (!failedSyncs.isEmpty()) {
- log.error("Sync failed for the following formats {}", failedSyncs);
+ public Map<String, SyncResult> syncCatalogs(
+ ConversionConfig config, Map<String, ConversionSourceProvider>
conversionSourceProvider) {
+ if (config.getTargetTables() == null ||
config.getTargetTables().isEmpty()) {
+ throw new IllegalArgumentException("Please provide at-least one format
to sync");
+ }
+ try (ConversionSource conversionSource =
+ conversionSourceProvider
+ .get(config.getSourceTable().getFormatName())
+ .getConversionSourceInstance(config.getSourceTable())) {
+ ExtractFromSource source = ExtractFromSource.of(conversionSource);
+ Map<String, SyncResult> tableFormatSyncResults =
+ syncTableFormats(config, source, config.getSyncMode());
Review Comment:
@the-other-tim-brown We are syncing to catalogs after `syncTableFormats` is
called. I have created a separate method for this known as syncCatalogs.
The responsibility of `syncCatalogs` is to bridge the gap between a table in
source catalog and the target. If there's a need for tableFormat sync,
`syncTableFormats` will do the sync otherwise skip it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]