the-other-tim-brown commented on code in PR #591: URL: https://github.com/apache/incubator-xtable/pull/591#discussion_r1898140627
########## xtable-api/src/main/java/org/apache/xtable/conversion/ExternalCatalogConfig.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.Collections; +import java.util.Map; + +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +/** + * Defines the configuration for an external catalog, user needs to populate at-least one of {@link + * ExternalCatalogConfig#catalogType} or {@link ExternalCatalogConfig#catalogSyncClientImpl} + */ +@Value +@Builder +public class ExternalCatalogConfig { + /** + * A user-defined unique identifier for the catalog, allows user to sync table to multiple + * catalogs of the same name/type eg: HMS catalog with url1, HMS catalog with url2 + */ + @NonNull String catalogId; + + /** + * The type of the catalog. If the catalogType implementation exists in XTable, the implementation + * class will be inferred. + */ + String catalogType; + + /** + * (Optional) A fully qualified class name that implements the interface for {@link + * org.apache.xtable.spi.sync.CatalogSyncClient}, it can be used if the implementation for + * catalogType doesn't exist in XTable. + */ + String catalogSyncClientImpl; + + /** + * (Optional) A fully qualified class name that implements the interface for {@link + * org.apache.xtable.spi.extractor.CatalogConversionSource} it can be used if the implementation + * for catalogType doesn't exist in XTable. + */ + String catalogConversionSourceImpl; + + /** + * The properties for each catalog, used for providing any custom behaviour during catalog sync Review Comment: `each` may be more clear if changed to `this` so it is more clear that the properties apply to this instance ########## xtable-utilities/src/test/java/org/apache/xtable/utilities/TestRunCatalogSync.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.utilities; + +import static org.junit.jupiter.api.Assertions.*; + +import lombok.SneakyThrows; + +import org.junit.jupiter.api.Test; + +class TestRunCatalogSync { + + @SneakyThrows + @Test + void testMain() { + String catalogConfigYamlPath = + TestRunCatalogSync.class.getClassLoader().getResource("catalogConfig.yaml").getPath(); + String[] args = {"-catalogConfig", catalogConfigYamlPath}; + // Ensure yaml gets parsed and no op-sync implemented in TestCatalogImpl is called. + assertDoesNotThrow(() -> RunCatalogSync.main(args)); Review Comment: Can we add? Can we add some state in the TestCatalogImpl we can check to see that it is called or whether the table metadata is synced? ########## xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java: ########## @@ -34,14 +35,21 @@ public class ConversionConfig { @NonNull SourceTable sourceTable; // One or more targets to sync the table metadata to List<TargetTable> targetTables; + // Each target table can be synced to multiple target catalogs, this is map from + // targetTable to target catalogs. + Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs; Review Comment: Should we default this to an empty map if nothing else is set with `Builder.Default`? ########## xtable-core/src/main/java/org/apache/xtable/catalog/ExternalCatalogConfigFactory.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.catalog; + +import java.util.Map; + +import org.apache.xtable.conversion.ExternalCatalogConfig; + +/** A factory class which returns {@link ExternalCatalogConfig} based on catalogType. */ +public class ExternalCatalogConfigFactory { Review Comment: Where will this be used? ########## xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java: ########## @@ -89,57 +102,135 @@ public <COMMIT> Map<String, SyncResult> sync( try (ConversionSource<COMMIT> conversionSource = conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) { ExtractFromSource<COMMIT> source = ExtractFromSource.of(conversionSource); + return syncTableFormats(config, source, config.getSyncMode()); + } catch (IOException ioException) { + throw new ReadException("Failed to close source converter", ioException); + } + } - Map<String, ConversionTarget> conversionTargetByFormat = - config.getTargetTables().stream() - .collect( - Collectors.toMap( - TargetTable::getFormatName, - targetTable -> conversionTargetFactory.createForFormat(targetTable, conf))); - // State for each TableFormat - Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat = - conversionTargetByFormat.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, entry -> entry.getValue().getTableMetadata())); - Map<String, ConversionTarget> formatsToSyncIncrementally = - getFormatsToSyncIncrementally( - config, - conversionTargetByFormat, - lastSyncMetadataByFormat, - source.getConversionSource()); - Map<String, ConversionTarget> formatsToSyncBySnapshot = - conversionTargetByFormat.entrySet().stream() - .filter(entry -> !formatsToSyncIncrementally.containsKey(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - SyncResultForTableFormats syncResultForSnapshotSync = - formatsToSyncBySnapshot.isEmpty() - ? SyncResultForTableFormats.builder().build() - : syncSnapshot(formatsToSyncBySnapshot, source); - SyncResultForTableFormats syncResultForIncrementalSync = - formatsToSyncIncrementally.isEmpty() - ? SyncResultForTableFormats.builder().build() - : syncIncrementalChanges( - formatsToSyncIncrementally, lastSyncMetadataByFormat, source); - Map<String, SyncResult> syncResultsMerged = - new HashMap<>(syncResultForIncrementalSync.getLastSyncResult()); - syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult()); - String successfulSyncs = - getFormatsWithStatusCode(syncResultsMerged, SyncResult.SyncStatusCode.SUCCESS); - if (!successfulSyncs.isEmpty()) { - log.info("Sync is successful for the following formats {}", successfulSyncs); - } - String failedSyncs = - getFormatsWithStatusCode(syncResultsMerged, SyncResult.SyncStatusCode.ERROR); - if (!failedSyncs.isEmpty()) { - log.error("Sync failed for the following formats {}", failedSyncs); + /** + * Synchronizes the source table in conversion config to multiple target catalogs. If the + * configuration for the target table uses a different table format, synchronizes the table format + * first before syncing it to target catalog + * + * @param config A per table level config containing source table, target tables, target catalogs + * and syncMode. + * @param conversionSourceProvider A provider for the {@link ConversionSource} instance for each + * tableFormat, {@link ConversionSourceProvider#init(Configuration)} must be called before + * calling this method. + * @return Returns a map containing the table format, and it's sync result. Run sync for a table * + * with the provided per table level configuration. + */ + public Map<String, SyncResult> syncTableAcrossCatalogs( + ConversionConfig config, Map<String, ConversionSourceProvider> conversionSourceProvider) { + if (config.getTargetTables() == null || config.getTargetTables().isEmpty()) { + throw new IllegalArgumentException("Please provide at-least one format to sync"); + } + try (ConversionSource conversionSource = + conversionSourceProvider + .get(config.getSourceTable().getFormatName()) + .getConversionSourceInstance(config.getSourceTable())) { + ExtractFromSource source = ExtractFromSource.of(conversionSource); + Map<String, SyncResult> tableFormatSyncResults = + syncTableFormats(config, source, config.getSyncMode()); + Map<String, SyncResult> catalogSyncResults = new HashMap<>(); + for (TargetTable targetTable : config.getTargetTables()) { + Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients = + config.getTargetCatalogs().get(targetTable).stream() + .collect( + Collectors.toMap( + TargetCatalogConfig::getCatalogTableIdentifier, + targetCatalog -> + catalogConversionFactory.createCatalogSyncClient( + targetCatalog.getCatalogConfig(), + targetTable.getFormatName(), + conf))); + catalogSyncResults.put( + targetTable.getFormatName(), + syncCatalogsForTargetTable( + targetTable, + catalogSyncClients, + conversionSourceProvider.get(targetTable.getFormatName()))); } - return syncResultsMerged; + mergeSyncResults(tableFormatSyncResults, catalogSyncResults); + return tableFormatSyncResults; } catch (IOException ioException) { throw new ReadException("Failed to close source converter", ioException); } } + private <COMMIT> Map<String, SyncResult> syncTableFormats( Review Comment: Can you add a quick java doc here as well? ########## xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.utilities; + +import static org.apache.xtable.utilities.RunSync.getCustomConfigurations; +import static org.apache.xtable.utilities.RunSync.loadHadoopConf; +import static org.apache.xtable.utilities.RunSync.loadTableFormatConversionConfigs; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import lombok.Data; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import org.apache.xtable.catalog.CatalogConversionFactory; +import org.apache.xtable.conversion.ConversionConfig; +import org.apache.xtable.conversion.ConversionController; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetCatalogConfig; +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; +import org.apache.xtable.model.sync.SyncMode; +import org.apache.xtable.reflection.ReflectionUtils; +import org.apache.xtable.spi.extractor.CatalogConversionSource; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.StorageIdentifier; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TableIdentifier; +import org.apache.xtable.utilities.RunCatalogSync.DatasetConfig.TargetTableIdentifier; + +/** + * Provides standalone process for reading tables from a source catalog and synchronizing their Review Comment: Since this will also do table format syncing, do we want to just combine this functionality with the existing tool? ########## xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java: ########## @@ -134,15 +134,7 @@ public void upsertRows(List<Row> upsertRows) { } @SneakyThrows - @Override public void deleteRows(List<Row> deleteRows) { - String idsToDelete = - deleteRows.stream().map(row -> row.get(0).toString()).collect(Collectors.joining(", ")); - deltaTable.delete("id in (" + idsToDelete + ")"); - } - - @SneakyThrows - public void mergeDeleteRows(List<Row> deleteRows) { Review Comment: This looks unintentional -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
