This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 350b0ec [HUDI-311] : Support for AWS Database Migration Service in DeltaStreamer 350b0ec is described below commit 350b0ecb4d137411c6231a1568add585c6d7b7d5 Author: vinoth chandar <vchan...@confluent.io> AuthorDate: Sun Dec 22 23:33:35 2019 -0800 [HUDI-311] : Support for AWS Database Migration Service in DeltaStreamer - Add a transformer class, that adds `Op` fiels if not found in input frame - Add a payload implementation, that issues deletes when Op=D - Remove Parquet as a top level source type, consolidate with RowSource - Made delta streamer work without a property file, simply using overridden cli options - Unit tests for transformer/payload classes --- .../common/util/DFSPropertiesConfiguration.java | 15 ++- .../org/apache/hudi/payload/AWSDmsAvroPayload.java | 68 +++++++++++++ .../org/apache/hudi/utilities/UtilHelpers.java | 18 +++- .../deltastreamer/SourceFormatAdapter.java | 15 --- .../utilities/schema/RowBasedSchemaProvider.java | 6 ++ .../hudi/utilities/sources/ParquetDFSSource.java | 20 ++-- .../org/apache/hudi/utilities/sources/Source.java | 2 +- .../AWSDmsTransformer.java} | 32 ++++-- .../TestAWSDatabaseMigrationServiceSource.java | 107 +++++++++++++++++++++ .../apache/hudi/utilities/UtilitiesTestBase.java | 1 - .../hudi/utilities/sources/TestDFSSource.java | 2 +- 11 files changed, 239 insertions(+), 47 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java index 838d4b8..f535cac 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java @@ -60,6 +60,17 @@ public class DFSPropertiesConfiguration { visitFile(rootFile); } + public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) { + this(fs, rootFile, new TypedProperties()); + } + + public DFSPropertiesConfiguration() { + this.fs = null; + this.rootFile = null; + this.props = new TypedProperties(); + this.visitedFiles = new HashSet<>(); + } + private String[] splitProperty(String line) { int ind = line.indexOf('='); String k = line.substring(0, ind).trim(); @@ -106,10 +117,6 @@ public class DFSPropertiesConfiguration { } } - public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) { - this(fs, rootFile, new TypedProperties()); - } - public TypedProperties getConfig() { return props; } diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java new file mode 100644 index 0000000..09898ec --- /dev/null +++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.payload; + +import org.apache.hudi.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.io.IOException; + +/** + * Provides support for seamlessly applying changes captured via Amazon Database Migration Service onto S3. + * + * Typically, we get the following pattern of full change records corresponding to DML against the + * source database + * + * - Full load records with no `Op` field + * - For inserts against the source table, records contain full after image with `Op=I` + * - For updates against the source table, records contain full after image with `Op=U` + * - For deletes against the source table, records contain full before image with `Op=D` + * + * This payload implementation will issue matching insert, delete, updates against the hudi dataset + * + */ +public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload { + + public static final String OP_FIELD = "Op"; + + public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public AWSDmsAvroPayload(Option<GenericRecord> record) { + this(record.get(), (record1) -> 0); // natural order + } + + @Override + public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) + throws IOException { + IndexedRecord insertValue = getInsertValue(schema).get(); + boolean delete = false; + if (insertValue instanceof GenericRecord) { + GenericRecord record = (GenericRecord) insertValue; + delete = record.get(OP_FIELD) != null && record.get(OP_FIELD).toString().equalsIgnoreCase("D"); + } + + return delete ? Option.empty() : Option.of(insertValue); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 218934a..4cb56e9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -27,7 +27,7 @@ import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.Source; @@ -92,16 +92,24 @@ public class UtilHelpers { /** */ public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) { + DFSPropertiesConfiguration conf; + try { + conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath); + } catch (Exception e) { + conf = new DFSPropertiesConfiguration(); + LOG.warn("Unexpected error read props file at :" + cfgPath, e); + } + try { - DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath); if (!overriddenProps.isEmpty()) { LOG.info("Adding overridden properties to file properties."); conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps)))); } - return conf; - } catch (Exception e) { - throw new HoodieException("Unable to read props file at :" + cfgPath, e); + } catch (IOException ioe) { + throw new HoodieIOException("Unexpected error adding config overrides", ioe); } + + return conf; } public static TypedProperties buildProperties(List<String> props) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java index 65779e0..a21d263 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.sources.AvroSource; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.JsonSource; -import org.apache.hudi.utilities.sources.ParquetSource; import org.apache.hudi.utilities.sources.RowSource; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.sources.helpers.AvroConvertor; @@ -60,8 +59,6 @@ public final class SourceFormatAdapter { switch (source.getSourceType()) { case AVRO: return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit); - case PARQUET: - return ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit); case JSON: { InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit); AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema()); @@ -102,18 +99,6 @@ public final class SourceFormatAdapter { .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider()); } - case PARQUET: { - InputBatch<JavaRDD<GenericRecord>> r = ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit); - Schema sourceSchema = r.getSchemaProvider().getSourceSchema(); - return new InputBatch<>( - Option - .ofNullable( - r.getBatch() - .map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(), - source.getSparkSession())) - .orElse(null)), - r.getCheckpointForNextBatch(), r.getSchemaProvider()); - } case JSON: { InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit); Schema sourceSchema = r.getSchemaProvider().getSourceSchema(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java index 4b708fa..6c8a3d0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java @@ -19,8 +19,10 @@ package org.apache.hudi.utilities.schema; import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.common.util.TypedProperties; import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.types.StructType; public class RowBasedSchemaProvider extends SchemaProvider { @@ -31,6 +33,10 @@ public class RowBasedSchemaProvider extends SchemaProvider { private StructType rowStruct; + public RowBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + public RowBasedSchemaProvider(StructType rowStruct) { super(null, null); this.rowStruct = rowStruct; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java index 9f4eab1..3a8f722 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java @@ -24,17 +24,15 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; -import org.apache.avro.generic.GenericRecord; -import org.apache.parquet.avro.AvroParquetInputFormat; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; /** * DFS Source that reads parquet data. */ -public class ParquetDFSSource extends ParquetSource { +public class ParquetDFSSource extends RowSource { private final DFSPathSelector pathSelector; @@ -45,17 +43,15 @@ public class ParquetDFSSource extends ParquetSource { } @Override - protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) { + public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) { Pair<Option<String>, String> selectPathsWithMaxModificationTime = pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit); return selectPathsWithMaxModificationTime.getLeft() - .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight())) - .orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight())); + .map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight())) + .orElseGet(() -> Pair.of(Option.empty(), selectPathsWithMaxModificationTime.getRight())); } - private JavaRDD<GenericRecord> fromFiles(String pathStr) { - JavaPairRDD<Void, GenericRecord> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroParquetInputFormat.class, - Void.class, GenericRecord.class, sparkContext.hadoopConfiguration()); - return avroRDD.values(); + private Dataset<Row> fromFiles(String pathStr) { + return sparkSession.read().parquet(pathStr.split(",")); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java index 2afe8bb..0760c73 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java @@ -36,7 +36,7 @@ public abstract class Source<T> implements Serializable { private static final Logger LOG = LogManager.getLogger(Source.class); public enum SourceType { - JSON, AVRO, ROW, PARQUET + JSON, AVRO, ROW } protected transient TypedProperties props; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/AWSDmsTransformer.java similarity index 50% rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java rename to hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/AWSDmsTransformer.java index 58fe5ad..bffa6e4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/AWSDmsTransformer.java @@ -16,20 +16,36 @@ * limitations under the License. */ -package org.apache.hudi.utilities.sources; +package org.apache.hudi.utilities.transform; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypedProperties; -import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.payload.AWSDmsAvroPayload; -import org.apache.avro.generic.GenericRecord; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -public abstract class ParquetSource extends Source<JavaRDD<GenericRecord>> { +import java.util.Arrays; - public ParquetSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, - SchemaProvider schemaProvider) { - super(props, sparkContext, sparkSession, schemaProvider, SourceType.PARQUET); +import static org.apache.spark.sql.functions.lit; + +/** + * A Simple transformer that adds `Op` field with value `I`, for AWS DMS data, if the field is not + * present. + */ +public class AWSDmsTransformer implements Transformer { + + @Override + public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, + TypedProperties properties) { + Option<String> opColumnOpt = Option.fromJavaOptional( + Arrays.stream(rowDataset.columns()).filter(c -> c.equals(AWSDmsAvroPayload.OP_FIELD)).findFirst()); + if (opColumnOpt.isPresent()) { + return rowDataset; + } else { + return rowDataset.withColumn(AWSDmsAvroPayload.OP_FIELD, lit("")); + } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestAWSDatabaseMigrationServiceSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestAWSDatabaseMigrationServiceSource.java new file mode 100644 index 0000000..d015a42 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestAWSDatabaseMigrationServiceSource.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.payload.AWSDmsAvroPayload; +import org.apache.hudi.utilities.transform.AWSDmsTransformer; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestAWSDatabaseMigrationServiceSource { + + private static JavaSparkContext jsc; + private static SparkSession spark; + + @BeforeClass + public static void setupTest() { + jsc = UtilHelpers.buildSparkContext("aws-dms-test", "local[2]"); + spark = SparkSession.builder().config(jsc.getConf()).getOrCreate(); + } + + @AfterClass + public static void tearDownTest() { + if (jsc != null) { + jsc.stop(); + } + } + + @Test + public void testPayload() throws IOException { + final Schema schema = Schema.createRecord(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null), + new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null), + new Schema.Field(AWSDmsAvroPayload.OP_FIELD, Schema.create(Schema.Type.STRING), "", null) + )); + final GenericRecord record = new GenericData.Record(schema); + + record.put("id", "1"); + record.put("Op", ""); + record.put("ts", 0L); + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts")); + assertTrue(payload.combineAndGetUpdateValue(null, schema).isPresent()); + + record.put("Op", "I"); + payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts")); + assertTrue(payload.combineAndGetUpdateValue(null, schema).isPresent()); + + record.put("Op", "D"); + payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts")); + assertFalse(payload.combineAndGetUpdateValue(null, schema).isPresent()); + } + + static class Record implements Serializable { + String id; + long ts; + + Record(String id, long ts) { + this.id = id; + this.ts = ts; + } + } + + @Test + public void testTransformer() { + AWSDmsTransformer transformer = new AWSDmsTransformer(); + Dataset<Row> inputFrame = spark.createDataFrame(Arrays.asList( + new Record("1", 3433L), + new Record("2", 3433L)), Record.class); + + Dataset<Row> outputFrame = transformer.apply(jsc, spark, inputFrame, null); + assertTrue(Arrays.asList(outputFrame.schema().fields()).stream() + .map(f -> f.name()).anyMatch(n -> n.equals(AWSDmsAvroPayload.OP_FIELD))); + assertTrue(outputFrame.select(AWSDmsAvroPayload.OP_FIELD).collectAsList().stream() + .allMatch(r -> r.getString(0).equals(""))); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java index 753f947..ed4e037 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java @@ -61,7 +61,6 @@ import java.util.List; /** * Abstract test that provides a dfs & spark contexts. * - * TODO(vc): this needs to be done across the board. */ public class UtilitiesTestBase { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java index 369e385..f7ac61f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java @@ -139,7 +139,7 @@ public class TestDFSSource extends UtilitiesTestBase { TypedProperties props = new TypedProperties(); props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/parquetFiles"); - ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); + ParquetDFSSource parquetDFSSource = new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); SourceFormatAdapter parquetSource = new SourceFormatAdapter(parquetDFSSource); // 1. Extract without any checkpoint => get all the data, respecting sourceLimit