This is an automated email from the ASF dual-hosted git repository.
vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 19cbda42 Add intial IT for XTable REST Service (#716)
19cbda42 is described below
commit 19cbda42cca1504fcfbaf130dabce8b1f7b24399
Author: Rahil C <[email protected]>
AuthorDate: Tue Jun 24 20:55:27 2025 -0700
Add intial IT for XTable REST Service (#716)
* Intial integration tests for xtable rest service
* add hoodie read client
* it fix
* spotless
---
xtable-service/pom.xml | 74 ++++
.../apache/xtable/service/ITConversionService.java | 384 +++++++++++++++++++++
.../src/test/resources/application.properties | 17 +
3 files changed, 475 insertions(+)
diff --git a/xtable-service/pom.xml b/xtable-service/pom.xml
index 12dff680..381aa3d0 100644
--- a/xtable-service/pom.xml
+++ b/xtable-service/pom.xml
@@ -66,6 +66,12 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -142,6 +148,25 @@
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
+
+ <!-- Quarkus Test -->
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-junit5</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-test-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.11.0</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
@@ -149,6 +174,55 @@
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+
<artifactId>iceberg-spark-runtime-${spark.version.prefix}_${scala.binary.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+
<artifactId>hudi-spark${spark.version.prefix}-bundle_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-java-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.xtable</groupId>
+ <artifactId>xtable-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Delta test dependencies -->
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-core_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-standalone_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
<plugins>
diff --git
a/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
b/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
new file mode 100644
index 00000000..0e7a7e26
--- /dev/null
+++
b/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.service;
+
+import static org.apache.xtable.GenericTable.getTableName;
+import static org.apache.xtable.model.storage.TableFormat.DELTA;
+import static org.apache.xtable.model.storage.TableFormat.HUDI;
+import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.delta.DeltaConversionSourceProvider;
+import org.apache.xtable.hudi.HudiConversionSourceProvider;
+import org.apache.xtable.hudi.HudiTestUtil;
+import org.apache.xtable.iceberg.IcebergConversionSourceProvider;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.service.models.ConvertTableRequest;
+import org.apache.xtable.service.models.ConvertTableResponse;
+import org.apache.xtable.service.models.ConvertedTable;
+
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+public class ITConversionService {
+
+ private ConversionService conversionService;
+ private static Path tempDir;
+ protected static JavaSparkContext jsc;
+ protected static SparkSession sparkSession;
+
+ @BeforeAll
+ public static void setupOnce() {
+ try {
+ // local fs setup
+ tempDir = Files.createTempDirectory("xtable-it");
+ String tableName = "xtable-service-test-" + UUID.randomUUID();
+ Path basePath = tempDir.resolve(tableName);
+ Files.createDirectories(basePath);
+
+ SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+ sparkSession =
+
SparkSession.builder().config(HoodieReadClient.addHoodieSupport(sparkConf)).getOrCreate();
+ sparkSession
+ .sparkContext()
+ .hadoopConfiguration()
+ .set("parquet.avro.write-old-list-structure", "false");
+ jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @BeforeEach
+ public void setUp() {
+ // Create ConversionService with test's configuration
+ ConversionServiceConfig serviceConfig = new ConversionServiceConfig();
+ ConversionController conversionController = new
ConversionController(jsc.hadoopConfiguration());
+
+ Map<String, ConversionSourceProvider<?>> sourceProviders = new HashMap<>();
+ ConversionSourceProvider<HoodieInstant> hudiConversionSourceProvider =
+ new HudiConversionSourceProvider();
+ ConversionSourceProvider<Long> deltaConversionSourceProvider =
+ new DeltaConversionSourceProvider();
+ ConversionSourceProvider<org.apache.iceberg.Snapshot>
icebergConversionSourceProvider =
+ new IcebergConversionSourceProvider();
+
+ hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
+ deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
+ icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
+
+ sourceProviders.put(HUDI, hudiConversionSourceProvider);
+ sourceProviders.put(DELTA, deltaConversionSourceProvider);
+ sourceProviders.put(ICEBERG, icebergConversionSourceProvider);
+
+ this.conversionService =
+ new ConversionService(
+ serviceConfig, conversionController, jsc.hadoopConfiguration(),
sourceProviders);
+ }
+
+ @AfterAll
+ public static void teardown() {
+ if (jsc != null) {
+ jsc.close();
+ }
+ if (sparkSession != null) {
+ sparkSession.close();
+ }
+ try {
+
Files.walk(tempDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateTestParametersFormatsAndPartitioning")
+ public void testVariousOperations(String sourceTableFormat, boolean
isPartitioned) {
+ String tableName = getTableName();
+ List<String> targetTableFormats = getOtherFormats(sourceTableFormat);
+ String partitionConfig = isPartitioned ? "level:VALUE" : null;
+
+ try (GenericTable table =
+ GenericTable.getInstance(
+ tableName, tempDir, sparkSession, jsc, sourceTableFormat,
isPartitioned)) {
+ List<?> insertRecords = table.insertRows(100);
+
+ // Create and execute conversion request
+ ConvertTableRequest request =
+ createConvertTableRequest(
+ sourceTableFormat,
+ tableName,
+ table.getBasePath(),
+ table.getDataPath(),
+ targetTableFormats,
+ partitionConfig);
+ ConvertTableResponse response = conversionService.convertTable(request);
+ assertConversionResponse(response, targetTableFormats);
+ checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats,
100);
+
+ // Make multiple commits and then sync with service
+ table.insertRows(100);
+ table.upsertRows(insertRecords.subList(0, 20));
+ response = conversionService.convertTable(request);
+ assertConversionResponse(response, targetTableFormats);
+ checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats,
200);
+
+ table.deleteRows(insertRecords.subList(30, 50));
+ response = conversionService.convertTable(request);
+ assertConversionResponse(response, targetTableFormats);
+ checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats,
180);
+ checkDatasetEquivalenceWithFilter(
+ sourceTableFormat, table, targetTableFormats,
table.getFilterQuery());
+ }
+
+ try (GenericTable tableWithUpdatedSchema =
+ GenericTable.getInstanceWithAdditionalColumns(
+ tableName, tempDir, sparkSession, jsc, sourceTableFormat,
isPartitioned)) {
+ ConvertTableRequest request =
+ createConvertTableRequest(
+ sourceTableFormat,
+ tableName,
+ tableWithUpdatedSchema.getBasePath(),
+ tableWithUpdatedSchema.getDataPath(),
+ targetTableFormats,
+ partitionConfig);
+
+ List<Row> insertsAfterSchemaUpdate =
tableWithUpdatedSchema.insertRows(100);
+ tableWithUpdatedSchema.reload();
+ ConvertTableResponse response = conversionService.convertTable(request);
+ assertConversionResponse(response, targetTableFormats);
+ checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema,
targetTableFormats, 280);
+
+ tableWithUpdatedSchema.deleteRows(insertsAfterSchemaUpdate.subList(60,
90));
+ response = conversionService.convertTable(request);
+ assertConversionResponse(response, targetTableFormats);
+ checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema,
targetTableFormats, 250);
+
+ if (isPartitioned) {
+ // Adds new partition.
+ tableWithUpdatedSchema.insertRecordsForSpecialPartition(50);
+ response = conversionService.convertTable(request);
+ assertConversionResponse(response, targetTableFormats);
+ checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema,
targetTableFormats, 300);
+
+ // Drops partition.
+ tableWithUpdatedSchema.deleteSpecialPartition();
+ response = conversionService.convertTable(request);
+ assertConversionResponse(response, targetTableFormats);
+ checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema,
targetTableFormats, 250);
+
+ // Insert records to the dropped partition again.
+ tableWithUpdatedSchema.insertRecordsForSpecialPartition(50);
+ response = conversionService.convertTable(request);
+ assertConversionResponse(response, targetTableFormats);
+ checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema,
targetTableFormats, 300);
+ }
+ }
+ }
+
+ private static Stream<Arguments>
generateTestParametersFormatsAndPartitioning() {
+ List<Arguments> arguments = new ArrayList<>();
+ for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) {
+ for (boolean isPartitioned : new boolean[] {true, false}) {
+ arguments.add(Arguments.of(sourceTableFormat, isPartitioned));
+ }
+ }
+ return arguments.stream();
+ }
+
+ protected static List<String> getOtherFormats(String sourceTableFormat) {
+ return Arrays.stream(TableFormat.values())
+ .filter(format -> !format.equals(sourceTableFormat))
+ .collect(Collectors.toList());
+ }
+
+ protected void checkDatasetEquivalenceWithFilter(
+ String sourceFormat,
+ GenericTable<?, ?> sourceTable,
+ List<String> targetFormats,
+ String filter) {
+ checkDatasetEquivalence(
+ sourceFormat,
+ sourceTable,
+ Collections.emptyMap(),
+ targetFormats,
+ Collections.emptyMap(),
+ null,
+ filter);
+ }
+
+ protected void checkDatasetEquivalence(
+ String sourceFormat,
+ GenericTable<?, ?> sourceTable,
+ List<String> targetFormats,
+ Integer expectedCount) {
+ checkDatasetEquivalence(
+ sourceFormat,
+ sourceTable,
+ Collections.emptyMap(),
+ targetFormats,
+ Collections.emptyMap(),
+ expectedCount,
+ "1 = 1");
+ }
+
+ private void checkDatasetEquivalence(
+ String sourceFormat,
+ GenericTable<?, ?> sourceTable,
+ Map<String, String> sourceOptions,
+ List<String> targetFormats,
+ Map<String, Map<String, String>> targetOptions,
+ Integer expectedCount,
+ String filterCondition) {
+ Dataset<Row> sourceRows =
+ sparkSession
+ .read()
+ .options(sourceOptions)
+ .format(sourceFormat.toLowerCase())
+ .load(sourceTable.getBasePath())
+ .orderBy(sourceTable.getOrderByColumn())
+ .filter(filterCondition);
+ Map<String, Dataset<Row>> targetRowsByFormat =
+ targetFormats.stream()
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ targetFormat -> {
+ Map<String, String> finalTargetOptions =
+ targetOptions.getOrDefault(targetFormat,
Collections.emptyMap());
+ if (targetFormat.equals(HUDI)) {
+ finalTargetOptions = new HashMap<>(finalTargetOptions);
+
finalTargetOptions.put(HoodieMetadataConfig.ENABLE.key(), "true");
+ finalTargetOptions.put(
+
"hoodie.datasource.read.extract.partition.values.from.path", "true");
+ }
+ return sparkSession
+ .read()
+ .options(finalTargetOptions)
+ .format(targetFormat.toLowerCase())
+ .load(sourceTable.getDataPath())
+ .orderBy(sourceTable.getOrderByColumn())
+ .filter(filterCondition);
+ }));
+
+ String[] selectColumnsArr = sourceTable.getColumnsToSelect().toArray(new
String[] {});
+ List<String> dataset1Rows =
sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
+ targetRowsByFormat.forEach(
+ (format, targetRows) -> {
+ List<String> dataset2Rows =
+ targetRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
+ assertEquals(
+ dataset1Rows.size(),
+ dataset2Rows.size(),
+ String.format(
+ "Datasets have different row counts when reading from Spark.
Source: %s, Target: %s",
+ sourceFormat, format));
+ // sanity check the count to ensure test is set up properly
+ if (expectedCount != null) {
+ assertEquals(expectedCount, dataset1Rows.size());
+ } else {
+ // if count is not known ahead of time, ensure datasets are
non-empty
+ assertFalse(dataset1Rows.isEmpty());
+ }
+ assertEquals(
+ dataset1Rows,
+ dataset2Rows,
+ String.format(
+ "Datasets are not equivalent when reading from Spark.
Source: %s, Target: %s",
+ sourceFormat, format));
+ });
+ }
+
+ private ConvertTableRequest createConvertTableRequest(
+ String sourceFormat,
+ String tableName,
+ String tablePath,
+ String dataPath,
+ List<String> targetFormats,
+ String partitionConfig) {
+ Map<String, String> configs = new HashMap<>();
+ if (partitionConfig != null) {
+ configs.put("partition-spec", partitionConfig);
+ }
+ return ConvertTableRequest.builder()
+ .sourceFormat(sourceFormat)
+ .sourceTableName(tableName)
+ .sourceTablePath(tablePath)
+ .sourceDataPath(dataPath)
+ .targetFormats(targetFormats)
+ .configurations(configs)
+ .build();
+ }
+
+ private void assertConversionResponse(
+ ConvertTableResponse response, List<String> expectedFormats) {
+ assertNotNull(response, "Response should not be null");
+ assertNotNull(response.getConvertedTables(), "Converted tables should not
be null");
+ assertEquals(
+ expectedFormats.size(),
+ response.getConvertedTables().size(),
+ "Should have converted tables for all target formats");
+
+ for (ConvertedTable convertedTable : response.getConvertedTables()) {
+ assertTrue(
+ expectedFormats.contains(convertedTable.getTargetFormat()),
+ "Unexpected target format: " + convertedTable.getTargetFormat());
+ assertNotNull(convertedTable.getTargetSchema(), "Schema should not be
null");
+ assertNotNull(convertedTable.getTargetMetadataPath(), "Metadata path
should not be null");
+ }
+ }
+}
diff --git a/xtable-service/src/test/resources/application.properties
b/xtable-service/src/test/resources/application.properties
new file mode 100644
index 00000000..f00ee7c8
--- /dev/null
+++ b/xtable-service/src/test/resources/application.properties
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+quarkus.log.level=INFO
\ No newline at end of file