This is an automated email from the ASF dual-hosted git repository. vinish pushed a commit to branch IcebergSourceFix in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
commit 1317e241cfa6485f3c90299a129033a0a83fec17 Author: Vinish Reddy <[email protected]> AuthorDate: Fri Dec 13 18:50:13 2024 -0800 Normalize basePath in targetTables in ConversionController --- config.yaml | 25 +++++ .../xtable/conversion/ConversionController.java | 2 +- .../apache/xtable/conversion/ConversionUtils.java | 57 +++++++++++ .../org/apache/xtable/ITConversionController.java | 2 +- .../xtable/conversion/TestConversionUtils.java | 111 +++++++++++++++++++++ 5 files changed, 195 insertions(+), 2 deletions(-) diff --git a/config.yaml b/config.yaml new file mode 100644 index 00000000..15d4a5ca --- /dev/null +++ b/config.yaml @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +sourceFormat: ICEBERG +targetFormats: + - DELTA + - HUDI +datasets: + - + tableBasePath: /var/folders/xk/qt16ny8538z_6tbsc7shvwfr0000gn/T/junit4577570639454493869/test_table_475e7d66_ea22_4c89_81d7_690974d0123d + tableDataPath: /var/folders/xk/qt16ny8538z_6tbsc7shvwfr0000gn/T/junit4577570639454493869/test_table_475e7d66_ea22_4c89_81d7_690974d0123d/data + tableName: test_table_475e7d66_ea22_4c89_81d7_690974d0123d diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index dc665969..bc5f5e02 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -85,7 +85,7 @@ public class ConversionController { if (config.getTargetTables() == null || config.getTargetTables().isEmpty()) { throw new IllegalArgumentException("Please provide at-least one format to sync"); } - + config = ConversionUtils.normalizeTargetPaths(config); try (ConversionSource<COMMIT> conversionSource = conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) { ExtractFromSource<COMMIT> source = ExtractFromSource.of(conversionSource); diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java new file mode 100644 index 00000000..bf1cb6da --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.xtable.model.storage.TableFormat; + +public class ConversionUtils { + + /** + * Adjusts the location of metadata for few table formats need to be at the root level of the data + * files. Eg: An iceberg table generated through spark will have two directories basePath/data and + * basePath/metadata For synchronising the iceberg metadata to hudi and delta, they need to be + * present in basePath/data/.hoodie and basePath/data/_delta_log. + * + * @param config conversion config for synchronizing source and target tables + * @return updated table config. + */ + public static ConversionConfig normalizeTargetPaths(ConversionConfig config) { + if (!config.getSourceTable().getDataPath().equals(config.getSourceTable().getBasePath()) + && config.getSourceTable().getFormatName().equals(TableFormat.ICEBERG)) { + List<TargetTable> updatedTargetTables = + config.getTargetTables().stream() + .filter( + targetTable -> + targetTable.getFormatName().equals(TableFormat.HUDI) + || targetTable.getFormatName().equals(TableFormat.DELTA)) + .map( + targetTable -> + targetTable.toBuilder() + .basePath(config.getSourceTable().getDataPath()) + .build()) + .collect(Collectors.toList()); + return new ConversionConfig( + config.getSourceTable(), updatedTargetTables, config.getSyncMode()); + } + return config; + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 3d539766..9af29a78 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -1014,7 +1014,7 @@ public class ITConversionController { .name(tableName) .formatName(formatName) // set the metadata path to the data path as the default (required by Hudi) - .basePath(table.getDataPath()) + .basePath(table.getBasePath()) .metadataRetention(metadataRetention) .build()) .collect(Collectors.toList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java new file mode 100644 index 00000000..b1044039 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Arrays; + +import org.junit.jupiter.api.Test; + +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.model.sync.SyncMode; + +class TestConversionUtils { + + @Test + void testNormalizeTargetPaths() { + ConversionConfig config = + ConversionConfig.builder() + .sourceTable( + SourceTable.builder() + .name("table_name") + .formatName(TableFormat.ICEBERG) + .basePath("/tmp/basePath") + .dataPath("/tmp/basePath/data") + .build()) + .syncMode(SyncMode.FULL) + .targetTables( + Arrays.asList( + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath") + .formatName(TableFormat.DELTA) + .build(), + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath") + .formatName(TableFormat.HUDI) + .build())) + .build(); + ConversionConfig expectedNormalizedConfig = + ConversionConfig.builder() + .sourceTable( + SourceTable.builder() + .name("table_name") + .formatName(TableFormat.ICEBERG) + .basePath("/tmp/basePath") + .dataPath("/tmp/basePath/data") + .build()) + .syncMode(SyncMode.FULL) + .targetTables( + Arrays.asList( + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath/data") + .formatName(TableFormat.DELTA) + .build(), + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath/data") + .formatName(TableFormat.HUDI) + .build())) + .build(); + ConversionConfig actualConfig = ConversionUtils.normalizeTargetPaths(config); + assertEquals(expectedNormalizedConfig, actualConfig); + } + + @Test + void testNormalizeTargetPathsNoOp() { + ConversionConfig config = + ConversionConfig.builder() + .sourceTable( + SourceTable.builder() + .name("table_name") + .formatName(TableFormat.HUDI) + .basePath("/tmp/basePath") + .build()) + .syncMode(SyncMode.FULL) + .targetTables( + Arrays.asList( + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath") + .formatName(TableFormat.ICEBERG) + .build(), + TargetTable.builder() + .name("table_name") + .basePath("/tmp/basePath") + .formatName(TableFormat.DELTA) + .build())) + .build(); + ConversionConfig actualConfig = ConversionUtils.normalizeTargetPaths(config); + assertEquals(config, actualConfig); + } +}
