This is an automated email from the ASF dual-hosted git repository.

xumingming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 880caef3fc17295f817648ccdffe369c2dc63fa1
Author: mingmxu <ming...@ebay.com>
AuthorDate: Mon Feb 26 15:42:55 2018 -0800

    @Parameter annotation does not work for UDFs in Beam SQL
---
 .../apache/beam/sdk/extensions/sql/BeamSqlUdf.java |  2 +-
 .../sql/impl/interpreter/BeamSqlFnExecutor.java    |  4 +++
 .../operator/BeamSqlDefaultExpression.java}        | 38 ++++++++++------------
 .../sdk/extensions/sql/BeamSqlDslUdfUdafTest.java  | 26 ++++++++++++++-
 4 files changed, 47 insertions(+), 23 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
index 91bad20..5df046a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.annotations.Experimental;
  * }</pre></blockquote>
  *
  * <p>The first parameter is named "s" and is mandatory,
- * and the second parameter is named "n" and is optional.
+ * and the second parameter is named "n" and is optional(always NULL if not 
specified).
  */
 @Experimental
 public interface BeamSqlUdf extends Serializable {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index ae65c2b..dbdd03d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -23,6 +23,7 @@ import java.util.Calendar;
 import java.util.List;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDefaultExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
@@ -383,6 +384,9 @@ public class BeamSqlFnExecutor implements 
BeamSqlExpressionExecutor {
         case "DATETIME_PLUS":
           return new BeamSqlDatetimePlusExpression(subExps);
 
+        //DEFAULT keyword for UDF with optional parameter
+        case "DEFAULT":
+          return new BeamSqlDefaultExpression();
 
         case "CASE":
           ret = new BeamSqlCaseExpression(subExps);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDefaultExpression.java
similarity index 52%
copy from 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
copy to 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDefaultExpression.java
index 91bad20..0557600 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlUdf.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlDefaultExpression.java
@@ -15,29 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql;
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
 
-import java.io.Serializable;
-import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
- * Interface to create a UDF in Beam SQL.
- *
- * <p>A static method {@code eval} is required. Here is an example:
- *
- * <blockquote><pre>
- * public static class MyLeftFunction {
- *   public String eval(
- *       &#64;Parameter(name = "s") String s,
- *       &#64;Parameter(name = "n", optional = true) Integer n) {
- *     return s.substring(0, n == null ? 1 : n);
- *   }
- * }</pre></blockquote>
- *
- * <p>The first parameter is named "s" and is mandatory,
- * and the second parameter is named "n" and is optional.
+ * DEFAULT keyword for UDF with optional parameter.
  */
-@Experimental
-public interface BeamSqlUdf extends Serializable {
-  String UDF_METHOD = "eval";
+public class BeamSqlDefaultExpression extends BeamSqlExpression {
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
+    return BeamSqlPrimitive.of(SqlTypeName.ANY, null);
+  }
+
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 023ac20..01e45c1 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.RowType;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.linq4j.function.Parameter;
 import org.junit.Test;
 
 /**
@@ -74,7 +75,6 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
         .withIntegerField("f_int")
         .withIntegerField("cubicvalue")
         .build();
-
     Row row = Row.withRowType(resultType).addValues(2, 8).build();
 
     String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION 
WHERE f_int = 2";
@@ -90,6 +90,19 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
                    BeamSql.query(sql2).registerUdf("cubic2", new 
CubicIntegerFn()));
     PAssert.that(result2).containsInAnyOrder(row);
 
+    String sql3 = "SELECT f_int, substr(f_string) as sub_string FROM 
PCOLLECTION WHERE f_int = 2";
+    PCollection<Row> result3 =
+        PCollectionTuple.of(new TupleTag<>("PCOLLECTION"), boundedInput1)
+            .apply("testUdf3",
+                   BeamSql.query(sql3).registerUdf("substr", 
UdfFnWithDefault.class));
+
+    RowType subStrRowType = RowSqlType.builder()
+        .withIntegerField("f_int")
+        .withIntegerField("sub_string")
+        .build();
+    Row subStrRow = Row.withRowType(subStrRowType).addValues(2, "s").build();
+    PAssert.that(result3).containsInAnyOrder(subStrRow);
+
     pipeline.run().waitUntilFinish();
   }
 
@@ -141,4 +154,15 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
       return input * input * input;
     }
   }
+
+  /**
+   * A UDF with default parameters.
+   *
+   */
+  public static final class UdfFnWithDefault implements BeamSqlUdf {
+    public static String eval(@Parameter(name = "s") String s,
+        @Parameter(name = "n", optional = true) Integer n) {
+      return s.substring(0, n == null ? 1 : n);
+    }
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
xumingm...@apache.org.

Reply via email to