This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c406472 [SPARK-26870][SQL] Move to_avro/from_avro into functions object due to Java compatibility c406472 is described below commit c40647297031e0cd3e2682990c2067b59b2aeaa1 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Fri Feb 15 10:24:35 2019 +0800 [SPARK-26870][SQL] Move to_avro/from_avro into functions object due to Java compatibility ## What changes were proposed in this pull request? Currently, looks, to use `from_avro` and `to_avro` in Java APIs side, ```java import static org.apache.spark.sql.avro.package$.MODULE$; MODULE$.to_avro MODULE$.from_avro ``` This PR targets to deprecate and move both functions under `avro` package into `functions` object like the way of our `org.apache.spark.sql.functions`. Therefore, Java side can import: ```java import static org.apache.spark.sql.avro.functions.*; ``` and Scala side can import: ```scala import org.apache.spark.sql.avro.functions._ ``` ## How was this patch tested? Manually tested, and unit tests for Java APIs were added. Closes #23784 from HyukjinKwon/SPARK-26870. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- docs/sql-data-sources-avro.md | 5 +- .../sql/avro/{package.scala => functions.scala} | 12 ++-- .../scala/org/apache/spark/sql/avro/package.scala | 32 ++-------- .../spark/sql/avro/JavaAvroFunctionsSuite.java | 74 ++++++++++++++++++++++ .../apache/spark/sql/avro/AvroFunctionsSuite.scala | 7 +- 5 files changed, 95 insertions(+), 35 deletions(-) diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index b403a66..afb91ae 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -78,7 +78,7 @@ Both functions are currently only available in Scala and Java. <div class="codetabs"> <div data-lang="scala" markdown="1"> {% highlight scala %} -import org.apache.spark.sql.avro._ +import org.apache.spark.sql.avro.functions._ // `from_avro` requires Avro schema in JSON string format. val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))) @@ -109,7 +109,8 @@ val query = output </div> <div data-lang="java" markdown="1"> {% highlight java %} -import org.apache.spark.sql.avro.*; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.avro.functions.*; // `from_avro` requires Avro schema in JSON string format. String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))); diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala similarity index 92% copy from external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala copy to external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala index dee8575..5ed7828 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/functions.scala @@ -15,13 +15,17 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.avro import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.Column -package object avro { + +// scalastyle:off: object.name +object functions { +// scalastyle:on: object.name /** * Converts a binary column of avro format into its corresponding catalyst value. The specified @@ -31,7 +35,7 @@ package object avro { * @param data the binary column. * @param jsonFormatSchema the avro schema in JSON string format. * - * @since 2.4.0 + * @since 3.0.0 */ @Experimental def from_avro( @@ -64,7 +68,7 @@ package object avro { * * @param data the data column. * - * @since 2.4.0 + * @since 3.0.0 */ @Experimental def to_avro(data: Column): Column = { diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala index dee8575..af0752e 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import scala.collection.JavaConverters._ - import org.apache.spark.annotation.Experimental package object avro { @@ -34,30 +32,11 @@ package object avro { * @since 2.4.0 */ @Experimental + @deprecated("Please use 'org.apache.spark.sql.avro.functions.from_avro' instead.", "3.0.0") def from_avro( data: Column, - jsonFormatSchema: String): Column = { - new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, Map.empty)) - } - - /** - * Converts a binary column of avro format into its corresponding catalyst value. The specified - * schema must match the read data, otherwise the behavior is undefined: it may fail or return - * arbitrary result. - * - * @param data the binary column. - * @param jsonFormatSchema the avro schema in JSON string format. - * @param options options to control how the Avro record is parsed. - * - * @since 3.0.0 - */ - @Experimental - def from_avro( - data: Column, - jsonFormatSchema: String, - options: java.util.Map[String, String]): Column = { - new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, options.asScala.toMap)) - } + jsonFormatSchema: String): Column = + org.apache.spark.sql.avro.functions.from_avro(data, jsonFormatSchema) /** * Converts a column into binary of avro format. @@ -67,7 +46,6 @@ package object avro { * @since 2.4.0 */ @Experimental - def to_avro(data: Column): Column = { - new Column(CatalystDataToAvro(data.expr)) - } + @deprecated("Please use 'org.apache.spark.sql.avro.functions.to_avro' instead.", "3.0.0") + def to_avro(data: Column): Column = org.apache.spark.sql.avro.functions.to_avro(data) } diff --git a/external/avro/src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java b/external/avro/src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java new file mode 100644 index 0000000..a448583 --- /dev/null +++ b/external/avro/src/test/java/org/apache/spark/sql/avro/JavaAvroFunctionsSuite.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.QueryTest$; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.test.TestSparkSession; + +import static org.apache.spark.sql.avro.functions.to_avro; +import static org.apache.spark.sql.avro.functions.from_avro; + + +public class JavaAvroFunctionsSuite { + private transient TestSparkSession spark; + + @Before + public void setUp() { + spark = new TestSparkSession(); + } + + @After + public void tearDown() { + spark.stop(); + } + + private static void checkAnswer(Dataset<Row> actual, Dataset<Row> expected) { + String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected.collectAsList()); + if (errorMessage != null) { + Assert.fail(errorMessage); + } + } + + @Test + public void testToAvroFromAvro() { + Dataset<Long> rangeDf = spark.range(10); + Dataset<Row> df = rangeDf.select( + rangeDf.col("id"), rangeDf.col("id").cast("string").as("str")); + + Dataset<Row> avroDF = + df.select( + to_avro(df.col("id")).as("a"), + to_avro(df.col("str")).as("b")); + + String avroTypeLong = "{\"type\": \"int\", \"name\": \"id\"}"; + String avroTypeStr = "{\"type\": \"string\", \"name\": \"str\"}"; + + Dataset<Row> actual = avroDF.select( + from_avro(avroDF.col("a"), avroTypeLong), + from_avro(avroDF.col("b"), avroTypeStr)); + + checkAnswer(actual, df); + } +} diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index 46a37d8..148a66c 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -79,13 +79,16 @@ class AvroFunctionsSuite extends QueryTest with SharedSQLContext with SQLTestUti intercept[SparkException] { avroStructDF.select( - from_avro('avro, avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect() + org.apache.spark.sql.avro.functions.from_avro( + 'avro, avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect() } // For PERMISSIVE mode, the result should be row of null columns. val expected = (0 until count).map(_ => Row(Row(null, null))) checkAnswer( - avroStructDF.select(from_avro('avro, avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)), + avroStructDF.select( + org.apache.spark.sql.avro.functions.from_avro( + 'avro, avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)), expected) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org