This is an automated email from the ASF dual-hosted git repository. xumingming pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 880caef3fc17295f817648ccdffe369c2dc63fa1 Author: mingmxu <ming...@ebay.com> AuthorDate: Mon Feb 26 15:42:55 2018 -0800 @Parameter annotation does not work for UDFs in Beam SQL --- .../apache/beam/sdk/extensions/sql/BeamSqlUdf.java | 2 +- .../sql/impl/interpreter/BeamSqlFnExecutor.java | 4 +++ .../operator/BeamSqlDefaultExpression.java} | 38 ++++++++++------------ .../sdk/extensions/sql/BeamSqlDslUdfUdafTest.java | 26 ++++++++++++++- 4 files changed, 47 insertions(+), 23 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java index 91bad20..5df046a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.annotations.Experimental; * }</pre></blockquote> * * <p>The first parameter is named "s" and is mandatory, - * and the second parameter is named "n" and is optional. + * and the second parameter is named "n" and is optional(always NULL if not specified). */ @Experimental public interface BeamSqlUdf extends Serializable { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java index ae65c2b..dbdd03d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java @@ -23,6 +23,7 @@ import java.util.Calendar; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDefaultExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; @@ -383,6 +384,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { case "DATETIME_PLUS": return new BeamSqlDatetimePlusExpression(subExps); + //DEFAULT keyword for UDF with optional parameter + case "DEFAULT": + return new BeamSqlDefaultExpression(); case "CASE": ret = new BeamSqlCaseExpression(subExps); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDefaultExpression.java similarity index 52% copy from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java copy to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDefaultExpression.java index 91bad20..0557600 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDefaultExpression.java @@ -15,29 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql; +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.Row; +import org.apache.calcite.sql.type.SqlTypeName; /** - * Interface to create a UDF in Beam SQL. - * - * <p>A static method {@code eval} is required. Here is an example: - * - * <blockquote><pre> - * public static class MyLeftFunction { - * public String eval( - * @Parameter(name = "s") String s, - * @Parameter(name = "n", optional = true) Integer n) { - * return s.substring(0, n == null ? 1 : n); - * } - * }</pre></blockquote> - * - * <p>The first parameter is named "s" and is mandatory, - * and the second parameter is named "n" and is optional. + * DEFAULT keyword for UDF with optional parameter. */ -@Experimental -public interface BeamSqlUdf extends Serializable { - String UDF_METHOD = "eval"; +public class BeamSqlDefaultExpression extends BeamSqlExpression { + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) { + return BeamSqlPrimitive.of(SqlTypeName.ANY, null); + } + } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java index 023ac20..01e45c1 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.RowType; import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.linq4j.function.Parameter; import org.junit.Test; /** @@ -74,7 +75,6 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { .withIntegerField("f_int") .withIntegerField("cubicvalue") .build(); - Row row = Row.withRowType(resultType).addValues(2, 8).build(); String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; @@ -90,6 +90,19 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { BeamSql.query(sql2).registerUdf("cubic2", new CubicIntegerFn())); PAssert.that(result2).containsInAnyOrder(row); + String sql3 = "SELECT f_int, substr(f_string) as sub_string FROM PCOLLECTION WHERE f_int = 2"; + PCollection<Row> result3 = + PCollectionTuple.of(new TupleTag<>("PCOLLECTION"), boundedInput1) + .apply("testUdf3", + BeamSql.query(sql3).registerUdf("substr", UdfFnWithDefault.class)); + + RowType subStrRowType = RowSqlType.builder() + .withIntegerField("f_int") + .withIntegerField("sub_string") + .build(); + Row subStrRow = Row.withRowType(subStrRowType).addValues(2, "s").build(); + PAssert.that(result3).containsInAnyOrder(subStrRow); + pipeline.run().waitUntilFinish(); } @@ -141,4 +154,15 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { return input * input * input; } } + + /** + * A UDF with default parameters. + * + */ + public static final class UdfFnWithDefault implements BeamSqlUdf { + public static String eval(@Parameter(name = "s") String s, + @Parameter(name = "n", optional = true) Integer n) { + return s.substring(0, n == null ? 1 : n); + } + } } -- To stop receiving notification emails like this one, please contact xumingm...@apache.org.