This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 350b0ec  [HUDI-311] : Support for AWS Database Migration Service in 
DeltaStreamer
350b0ec is described below

commit 350b0ecb4d137411c6231a1568add585c6d7b7d5
Author: vinoth chandar <vchan...@confluent.io>
AuthorDate: Sun Dec 22 23:33:35 2019 -0800

    [HUDI-311] : Support for AWS Database Migration Service in DeltaStreamer
    
     - Add a transformer class, that adds `Op` fiels if not found in input frame
     - Add a payload implementation, that issues deletes when Op=D
     - Remove Parquet as a top level source type, consolidate with RowSource
     - Made delta streamer work without a property file, simply using 
overridden cli options
     - Unit tests for transformer/payload classes
---
 .../common/util/DFSPropertiesConfiguration.java    |  15 ++-
 .../org/apache/hudi/payload/AWSDmsAvroPayload.java |  68 +++++++++++++
 .../org/apache/hudi/utilities/UtilHelpers.java     |  18 +++-
 .../deltastreamer/SourceFormatAdapter.java         |  15 ---
 .../utilities/schema/RowBasedSchemaProvider.java   |   6 ++
 .../hudi/utilities/sources/ParquetDFSSource.java   |  20 ++--
 .../org/apache/hudi/utilities/sources/Source.java  |   2 +-
 .../AWSDmsTransformer.java}                        |  32 ++++--
 .../TestAWSDatabaseMigrationServiceSource.java     | 107 +++++++++++++++++++++
 .../apache/hudi/utilities/UtilitiesTestBase.java   |   1 -
 .../hudi/utilities/sources/TestDFSSource.java      |   2 +-
 11 files changed, 239 insertions(+), 47 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
index 838d4b8..f535cac 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
@@ -60,6 +60,17 @@ public class DFSPropertiesConfiguration {
     visitFile(rootFile);
   }
 
+  public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) {
+    this(fs, rootFile, new TypedProperties());
+  }
+
+  public DFSPropertiesConfiguration() {
+    this.fs = null;
+    this.rootFile = null;
+    this.props = new TypedProperties();
+    this.visitedFiles = new HashSet<>();
+  }
+
   private String[] splitProperty(String line) {
     int ind = line.indexOf('=');
     String k = line.substring(0, ind).trim();
@@ -106,10 +117,6 @@ public class DFSPropertiesConfiguration {
     }
   }
 
-  public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) {
-    this(fs, rootFile, new TypedProperties());
-  }
-
   public TypedProperties getConfig() {
     return props;
   }
diff --git 
a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java 
b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
new file mode 100644
index 0000000..09898ec
--- /dev/null
+++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.payload;
+
+import org.apache.hudi.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+
+/**
+ * Provides support for seamlessly applying changes captured via Amazon 
Database Migration Service onto S3.
+ *
+ * Typically, we get the following pattern of full change records 
corresponding to DML against the
+ * source database
+ *
+ * - Full load records with no `Op` field
+ * - For inserts against the source table, records contain full after image 
with `Op=I`
+ * - For updates against the source table, records contain full after image 
with `Op=U`
+ * - For deletes against the source table, records contain full before image 
with `Op=D`
+ *
+ * This payload implementation will issue matching insert, delete, updates 
against the hudi dataset
+ *
+ */
+public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
+
+  public static final String OP_FIELD = "Op";
+
+  public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
+    super(record, orderingVal);
+  }
+
+  public AWSDmsAvroPayload(Option<GenericRecord> record) {
+    this(record.get(), (record1) -> 0); // natural order
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema)
+      throws IOException {
+    IndexedRecord insertValue = getInsertValue(schema).get();
+    boolean delete = false;
+    if (insertValue instanceof GenericRecord) {
+      GenericRecord record = (GenericRecord) insertValue;
+      delete = record.get(OP_FIELD) != null && 
record.get(OP_FIELD).toString().equalsIgnoreCase("D");
+    }
+
+    return delete ? Option.empty() : Option.of(insertValue);
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 218934a..4cb56e9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -27,7 +27,7 @@ import org.apache.hudi.common.util.TypedProperties;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.Source;
@@ -92,16 +92,24 @@ public class UtilHelpers {
   /**
    */
   public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path 
cfgPath, List<String> overriddenProps) {
+    DFSPropertiesConfiguration conf;
+    try {
+      conf = new 
DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
+    } catch (Exception e) {
+      conf = new DFSPropertiesConfiguration();
+      LOG.warn("Unexpected error read props file at :" + cfgPath, e);
+    }
+
     try {
-      DFSPropertiesConfiguration conf = new 
DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
       if (!overriddenProps.isEmpty()) {
         LOG.info("Adding overridden properties to file properties.");
         conf.addProperties(new BufferedReader(new 
StringReader(String.join("\n", overriddenProps))));
       }
-      return conf;
-    } catch (Exception e) {
-      throw new HoodieException("Unable to read props file at :" + cfgPath, e);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Unexpected error adding config overrides", 
ioe);
     }
+
+    return conf;
   }
 
   public static TypedProperties buildProperties(List<String> props) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index 65779e0..a21d263 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.sources.AvroSource;
 import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.JsonSource;
-import org.apache.hudi.utilities.sources.ParquetSource;
 import org.apache.hudi.utilities.sources.RowSource;
 import org.apache.hudi.utilities.sources.Source;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
@@ -60,8 +59,6 @@ public final class SourceFormatAdapter {
     switch (source.getSourceType()) {
       case AVRO:
         return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
-      case PARQUET:
-        return ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((JsonSource) 
source).fetchNext(lastCkptStr, sourceLimit);
         AvroConvertor convertor = new 
AvroConvertor(r.getSchemaProvider().getSourceSchema());
@@ -102,18 +99,6 @@ public final class SourceFormatAdapter {
                         .orElse(null)),
             r.getCheckpointForNextBatch(), r.getSchemaProvider());
       }
-      case PARQUET: {
-        InputBatch<JavaRDD<GenericRecord>> r = ((ParquetSource) 
source).fetchNext(lastCkptStr, sourceLimit);
-        Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-        return new InputBatch<>(
-            Option
-                .ofNullable(
-                    r.getBatch()
-                        .map(rdd -> 
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
-                            source.getSparkSession()))
-                        .orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
-      }
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((JsonSource) 
source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
index 4b708fa..6c8a3d0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
@@ -19,8 +19,10 @@
 package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.util.TypedProperties;
 
 import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.types.StructType;
 
 public class RowBasedSchemaProvider extends SchemaProvider {
@@ -31,6 +33,10 @@ public class RowBasedSchemaProvider extends SchemaProvider {
 
   private StructType rowStruct;
 
+  public RowBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
+    super(props, jssc);
+  }
+
   public RowBasedSchemaProvider(StructType rowStruct) {
     super(null, null);
     this.rowStruct = rowStruct;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
index 9f4eab1..3a8f722 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -24,17 +24,15 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.parquet.avro.AvroParquetInputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
 /**
  * DFS Source that reads parquet data.
  */
-public class ParquetDFSSource extends ParquetSource {
+public class ParquetDFSSource extends RowSource {
 
   private final DFSPathSelector pathSelector;
 
@@ -45,17 +43,15 @@ public class ParquetDFSSource extends ParquetSource {
   }
 
   @Override
-  protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCkptStr, long sourceLimit) {
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
     Pair<Option<String>, String> selectPathsWithMaxModificationTime =
         pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, 
sourceLimit);
     return selectPathsWithMaxModificationTime.getLeft()
-        .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), 
selectPathsWithMaxModificationTime.getRight()))
-        .orElseGet(() -> new InputBatch<>(Option.empty(), 
selectPathsWithMaxModificationTime.getRight()));
+        .map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)), 
selectPathsWithMaxModificationTime.getRight()))
+        .orElseGet(() -> Pair.of(Option.empty(), 
selectPathsWithMaxModificationTime.getRight()));
   }
 
-  private JavaRDD<GenericRecord> fromFiles(String pathStr) {
-    JavaPairRDD<Void, GenericRecord> avroRDD = 
sparkContext.newAPIHadoopFile(pathStr, AvroParquetInputFormat.class,
-        Void.class, GenericRecord.class, sparkContext.hadoopConfiguration());
-    return avroRDD.values();
+  private Dataset<Row> fromFiles(String pathStr) {
+    return sparkSession.read().parquet(pathStr.split(","));
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index 2afe8bb..0760c73 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -36,7 +36,7 @@ public abstract class Source<T> implements Serializable {
   private static final Logger LOG = LogManager.getLogger(Source.class);
 
   public enum SourceType {
-    JSON, AVRO, ROW, PARQUET
+    JSON, AVRO, ROW
   }
 
   protected transient TypedProperties props;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/AWSDmsTransformer.java
similarity index 50%
rename from 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
rename to 
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/AWSDmsTransformer.java
index 58fe5ad..bffa6e4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/AWSDmsTransformer.java
@@ -16,20 +16,36 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utilities.sources;
+package org.apache.hudi.utilities.transform;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.TypedProperties;
-import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.payload.AWSDmsAvroPayload;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
-public abstract class ParquetSource extends Source<JavaRDD<GenericRecord>> {
+import java.util.Arrays;
 
-  public ParquetSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
-      SchemaProvider schemaProvider) {
-    super(props, sparkContext, sparkSession, schemaProvider, 
SourceType.PARQUET);
+import static org.apache.spark.sql.functions.lit;
+
+/**
+ * A Simple transformer that adds `Op` field with value `I`, for AWS DMS data, 
if the field is not
+ * present.
+ */
+public class AWSDmsTransformer implements Transformer {
+
+  @Override
+  public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset,
+      TypedProperties properties) {
+    Option<String> opColumnOpt = Option.fromJavaOptional(
+        Arrays.stream(rowDataset.columns()).filter(c -> 
c.equals(AWSDmsAvroPayload.OP_FIELD)).findFirst());
+    if (opColumnOpt.isPresent()) {
+      return rowDataset;
+    } else {
+      return rowDataset.withColumn(AWSDmsAvroPayload.OP_FIELD, lit(""));
+    }
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestAWSDatabaseMigrationServiceSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestAWSDatabaseMigrationServiceSource.java
new file mode 100644
index 0000000..d015a42
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestAWSDatabaseMigrationServiceSource.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.payload.AWSDmsAvroPayload;
+import org.apache.hudi.utilities.transform.AWSDmsTransformer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestAWSDatabaseMigrationServiceSource {
+
+  private static JavaSparkContext jsc;
+  private static SparkSession spark;
+
+  @BeforeClass
+  public static void setupTest() {
+    jsc = UtilHelpers.buildSparkContext("aws-dms-test", "local[2]");
+    spark = SparkSession.builder().config(jsc.getConf()).getOrCreate();
+  }
+
+  @AfterClass
+  public static void tearDownTest() {
+    if (jsc != null) {
+      jsc.stop();
+    }
+  }
+
+  @Test
+  public void testPayload() throws IOException {
+    final Schema schema = Schema.createRecord(Arrays.asList(
+        new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null),
+        new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
+        new Schema.Field(AWSDmsAvroPayload.OP_FIELD, 
Schema.create(Schema.Type.STRING), "", null)
+    ));
+    final GenericRecord record = new GenericData.Record(schema);
+
+    record.put("id", "1");
+    record.put("Op", "");
+    record.put("ts", 0L);
+    AWSDmsAvroPayload payload = new AWSDmsAvroPayload(record, (Comparable) 
record.get("ts"));
+    assertTrue(payload.combineAndGetUpdateValue(null, schema).isPresent());
+
+    record.put("Op", "I");
+    payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts"));
+    assertTrue(payload.combineAndGetUpdateValue(null, schema).isPresent());
+
+    record.put("Op", "D");
+    payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts"));
+    assertFalse(payload.combineAndGetUpdateValue(null, schema).isPresent());
+  }
+
+  static class Record implements Serializable {
+    String id;
+    long ts;
+
+    Record(String id, long ts) {
+      this.id = id;
+      this.ts = ts;
+    }
+  }
+
+  @Test
+  public void testTransformer() {
+    AWSDmsTransformer transformer = new AWSDmsTransformer();
+    Dataset<Row> inputFrame = spark.createDataFrame(Arrays.asList(
+        new Record("1", 3433L),
+        new Record("2", 3433L)), Record.class);
+
+    Dataset<Row> outputFrame = transformer.apply(jsc, spark, inputFrame, null);
+    assertTrue(Arrays.asList(outputFrame.schema().fields()).stream()
+        .map(f -> f.name()).anyMatch(n -> 
n.equals(AWSDmsAvroPayload.OP_FIELD)));
+    
assertTrue(outputFrame.select(AWSDmsAvroPayload.OP_FIELD).collectAsList().stream()
+        .allMatch(r -> r.getString(0).equals("")));
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
index 753f947..ed4e037 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
@@ -61,7 +61,6 @@ import java.util.List;
 /**
  * Abstract test that provides a dfs & spark contexts.
  *
- * TODO(vc): this needs to be done across the board.
  */
 public class UtilitiesTestBase {
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
index 369e385..f7ac61f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
@@ -139,7 +139,7 @@ public class TestDFSSource extends UtilitiesTestBase {
 
     TypedProperties props = new TypedProperties();
     props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + 
"/parquetFiles");
-    ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc, 
sparkSession, schemaProvider);
+    ParquetDFSSource parquetDFSSource = new ParquetDFSSource(props, jsc, 
sparkSession, schemaProvider);
     SourceFormatAdapter parquetSource = new 
SourceFormatAdapter(parquetDFSSource);
 
     // 1. Extract without any checkpoint => get all the data, respecting 
sourceLimit

Reply via email to