This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 974b476 [HUDI-1940] Add SqlQueryBasedTransformer unit test (#3004) 974b476 is described below commit 974b476180e61fac58cd87e78699428d4108a482 Author: wangxianghu <wangxian...@apache.org> AuthorDate: Fri May 28 22:30:30 2021 +0800 [HUDI-1940] Add SqlQueryBasedTransformer unit test (#3004) --- .../transform/TestSqlQueryBasedTransformer.java | 94 ++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java new file mode 100644 index 0000000..b6fdc25 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.transform; + +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class TestSqlQueryBasedTransformer { + + @Test + public void testSqlQuery() { + + SparkSession spark = SparkSession + .builder() + .master("local[2]") + .appName(TestSqlQueryBasedTransformer.class.getName()) + .getOrCreate(); + + JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + // prepare test data + String testData = "{\n" + + " \"ts\": 1622126968000,\n" + + " \"uuid\": \"c978e157-72ee-4819-8f04-8e46e1bb357a\",\n" + + " \"rider\": \"rider-213\",\n" + + " \"driver\": \"driver-213\",\n" + + " \"begin_lat\": 0.4726905879569653,\n" + + " \"begin_lon\": 0.46157858450465483,\n" + + " \"end_lat\": 0.754803407008858,\n" + + " \"end_lon\": 0.9671159942018241,\n" + + " \"fare\": 34.158284716382845,\n" + + " \"partitionpath\": \"americas/brazil/sao_paulo\"\n" + + "}"; + + JavaRDD<String> testRdd = jsc.parallelize(Collections.singletonList(testData), 2); + Dataset<Row> ds = spark.read().json(testRdd); + + // create a new column dt, whose value is transformed from ts, format is yyyyMMdd + String transSql = "select\n" + + "\tuuid,\n" + + "\tbegin_lat,\n" + + "\tbegin_lon,\n" + + "\tdriver,\n" + + "\tend_lat,\n" + + "\tend_lon,\n" + + "\tfare,\n" + + "\tpartitionpath,\n" + + "\trider,\n" + + "\tts,\n" + + "\tFROM_UNIXTIME(ts / 1000, 'yyyyMMdd') as dt\n" + + "from\n" + + "\t<SRC>"; + TypedProperties props = new TypedProperties(); + props.put("hoodie.deltastreamer.transformer.sql", transSql); + + // transform + SqlQueryBasedTransformer transformer = new SqlQueryBasedTransformer(); + Dataset<Row> result = transformer.apply(jsc, spark, ds, props); + + // check result + assertEquals(11, result.columns().length); + assertNotNull(result.col("dt")); + assertEquals("20210527", result.first().get(10).toString()); + + spark.close(); + } +}