This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 029d578e098e2b02281d5c320da55c06e239b2b9 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue Feb 11 08:25:12 2020 +0100 [FLINK-16033][table-api] Introduced Java Table API Expression DSL --- .../org/apache/flink/table/api/ApiExpression.java | 66 + .../org/apache/flink/table/api/Expressions.java | 549 +++++++++ .../org/apache/flink/table/api/GroupWindow.java | 5 +- .../org/apache/flink/table/api/SessionWithGap.java | 3 +- .../flink/table/api/SessionWithGapOnTime.java | 5 +- .../table/api/SessionWithGapOnTimeWithAlias.java | 3 +- .../org/apache/flink/table/api/SlideWithSize.java | 3 +- .../flink/table/api/SlideWithSizeAndSlide.java | 5 +- .../table/api/SlideWithSizeAndSlideOnTime.java | 13 +- .../api/SlideWithSizeAndSlideOnTimeWithAlias.java | 13 +- .../org/apache/flink/table/api/TumbleWithSize.java | 3 +- .../flink/table/api/TumbleWithSizeOnTime.java | 5 +- .../table/api/TumbleWithSizeOnTimeWithAlias.java | 3 +- .../flink/table/api/internal/BaseExpressions.java | 1284 ++++++++++++++++++++ .../apache/flink/table/api/internal/TableImpl.java | 37 +- .../table/expressions/ApiExpressionUtils.java | 39 +- .../table/expressions/LookupCallExpression.java | 3 +- .../expressions/UnresolvedCallExpression.java | 7 +- .../expressions/resolver/ExpressionResolver.java | 2 + .../expressions/resolver/LookupCallResolver.java | 12 + .../resolver/rules/OverWindowResolverRule.java | 2 +- .../expressions/resolver/rules/ResolverRules.java | 6 + .../resolver/rules/UnwrapApiExpressionRule.java} | 30 +- .../flink/table/typeutils/FieldInfoUtils.java | 14 +- .../org/apache/flink/table/api/expressionDsl.scala | 1045 +++------------- .../expressions/PlannerExpressionConverter.scala | 10 +- .../expressions/PlannerExpressionConverter.scala | 8 +- 27 files changed, 2205 insertions(+), 970 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java new file mode 100644 index 0000000..0e2027d --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.table.api.internal.BaseExpressions; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionVisitor; + +import java.util.List; + +/** + * Java API class that gives access to expression operations. + * + * @see BaseExpressions + */ +public final class ApiExpression extends BaseExpressions<Object, ApiExpression> implements Expression { + private final Expression wrappedExpression; + + @Override + public String asSummaryString() { + return wrappedExpression.asSummaryString(); + } + + ApiExpression(Expression wrappedExpression) { + if (wrappedExpression instanceof ApiExpression) { + throw new UnsupportedOperationException("This is a bug. Please file an issue."); + } + this.wrappedExpression = wrappedExpression; + } + + @Override + public Expression toExpr() { + return wrappedExpression; + } + + @Override + protected ApiExpression toApiSpecificExpression(Expression expression) { + return new ApiExpression(expression); + } + + @Override + public List<Expression> getChildren() { + return wrappedExpression.getChildren(); + } + + @Override + public <R> R accept(ExpressionVisitor<R> visitor) { + return wrappedExpression.accept(visitor); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java new file mode 100644 index 0000000..9e0c10e --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.TimePointUnit; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.table.types.utils.ValueDataTypeConverter; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression; +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral; + +/** + * Entry point of the Table API Expression DSL such as: {@code $("myField").plus(10).abs()} + * + * <p>This class contains static methods for referencing table columns, creating literals, + * and building more complex {@link Expression} chains. {@link ApiExpression ApiExpressions} are + * pure API entities that are further translated into {@link ResolvedExpression ResolvedExpressions} + * under the hood. + * + * <p>For fluent definition of expressions and easier readability, we recommend to add a + * star import to the methods of this class: + * + * <pre> + * import static org.apache.flink.table.api.Expressions.*; + * </pre> + * + * <p>Check the documentation for more programming language specific APIs, for example, by using + * Scala implicits. + */ +@PublicEvolving +public final class Expressions { + /** + * Creates an unresolved reference to a table's field. + * + * <p>Example: + * <pre>{@code + * tab.select($("key"), $("value")) + * } + * </pre> + */ + //CHECKSTYLE.OFF: MethodName + public static ApiExpression $(String name) { + return new ApiExpression(unresolvedRef(name)); + } + //CHECKSTYLE.ON: MethodName + + /** + * Creates a SQL literal. + * + * <p>The data type is derived from the object's class and its value. + * + * <p>For example: + * <ul> + * <li>{@code lit(12)} leads to {@code INT}</li> + * <li>{@code lit("abc")} leads to {@code CHAR(3)}</li> + * <li>{@code lit(new BigDecimal("123.45"))} leads to {@code DECIMAL(5, 2)}</li> + * </ul> + * + * <p>See {@link ValueDataTypeConverter} for a list of supported literal values. + */ + public static ApiExpression lit(Object v) { + return new ApiExpression(valueLiteral(v)); + } + + /** + * Creates a SQL literal of a given {@link DataType}. + * + * <p>The method {@link #lit(Object)} is preferred as it extracts the {@link DataType} automatically. + * Use this method only when necessary. The class of {@code v} must be supported according to the + * {@link org.apache.flink.table.types.logical.LogicalType#supportsInputConversion(Class)}. + */ + public static ApiExpression lit(Object v, DataType dataType) { + return new ApiExpression(valueLiteral(v, dataType)); + } + + /** + * Indicates a range from 'start' to 'end', which can be used in columns + * selection. + * + * <p>Example: + * <pre>{@code + * Table table = ... + * table.select(withColumns(range(b, c))) + * }</pre> + * + * @see #withColumns(Object, Object...) + * @see #withoutColumns(Object, Object...) + */ + public static ApiExpression range(String start, String end) { + return apiCall(BuiltInFunctionDefinitions.RANGE_TO, unresolvedRef(start), unresolvedRef(end)); + } + + /** + * Indicates an index based range, which can be used in columns selection. + * + * <p>Example: + * <pre>{@code + * Table table = ... + * table.select(withColumns(range(3, 4))) + * }</pre> + * + * @see #withColumns(Object, Object...) + * @see #withoutColumns(Object, Object...) + */ + public static ApiExpression range(int start, int end) { + return apiCall(BuiltInFunctionDefinitions.RANGE_TO, valueLiteral(start), valueLiteral(end)); + } + + /** + * Boolean AND in three-valued logic. + */ + public static ApiExpression and(Object predicate0, Object predicate1, Object... predicates) { + return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.AND, predicate0, predicate1, predicates); + } + + /** + * Boolean OR in three-valued logic. + */ + public static ApiExpression or(Object predicate0, Object predicate1, Object... predicates) { + return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.OR, predicate0, predicate1, predicates); + } + + /** + * Offset constant to be used in the {@code preceding} clause of unbounded {@code Over} windows. Use this + * constant for a time interval. Unbounded over windows start with the first row of a partition. + */ + public static final ApiExpression UNBOUNDED_ROW = apiCall(BuiltInFunctionDefinitions.UNBOUNDED_ROW); + + /** + * Offset constant to be used in the {@code preceding} clause of unbounded {@link Over} windows. Use this + * constant for a row-count interval. Unbounded over windows start with the first row of a + * partition. + */ + public static final ApiExpression UNBOUNDED_RANGE = apiCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE); + + /** + * Offset constant to be used in the {@code following} clause of {@link Over} windows. Use this for setting + * the upper bound of the window to the current row. + */ + public static final ApiExpression CURRENT_ROW = apiCall(BuiltInFunctionDefinitions.CURRENT_ROW); + + /** + * Offset constant to be used in the {@code following} clause of {@link Over} windows. Use this for setting + * the upper bound of the window to the sort key of the current row, i.e., all rows with the same + * sort key as the current row are included in the window. + */ + public static final ApiExpression CURRENT_RANGE = apiCall(BuiltInFunctionDefinitions.CURRENT_RANGE); + + /** + * Returns the current SQL date in UTC time zone. + */ + public static ApiExpression currentDate() { + return apiCall(BuiltInFunctionDefinitions.CURRENT_DATE); + } + + /** + * Returns the current SQL time in UTC time zone. + */ + public static ApiExpression currentTime() { + return apiCall(BuiltInFunctionDefinitions.CURRENT_TIME); + } + + /** + * Returns the current SQL timestamp in UTC time zone. + */ + public static ApiExpression currentTimestamp() { + return apiCall(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP); + } + + /** + * Returns the current SQL time in local time zone. + */ + public static ApiExpression localTime() { + return apiCall(BuiltInFunctionDefinitions.LOCAL_TIME); + } + + /** + * Returns the current SQL timestamp in local time zone. + */ + public static ApiExpression localTimestamp() { + return apiCall(BuiltInFunctionDefinitions.LOCAL_TIMESTAMP); + } + + /** + * Determines whether two anchored time intervals overlap. Time point and temporal are + * transformed into a range defined by two time points (start, end). The function + * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>. + * + * <p>It evaluates: leftEnd >= rightStart && rightEnd >= leftStart + * + * <p>e.g. + * <pre>{@code + * temporalOverlaps( + * lit("2:55:00").toTime(), + * interval(Duration.ofHour(1)), + * lit("3:30:00").toTime(), + * interval(Duration.ofHour(2)) + * }</pre> + * leads to true + */ + public static ApiExpression temporalOverlaps( + Object leftTimePoint, + Object leftTemporal, + Object rightTimePoint, + Object rightTemporal) { + return apiCall( + BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS, + leftTimePoint, + leftTemporal, + rightTimePoint, + rightTemporal); + } + + /** + * Formats a timestamp as a string using a specified format. + * The format must be compatible with MySQL's date formatting syntax as used by the + * date_parse function. + * + * <p>For example {@code dataFormat($("time"), "%Y, %d %M")} results in strings formatted as "2017, 05 May". + * + * @param timestamp The timestamp to format as string. + * @param format The format of the string. + * @return The formatted timestamp as string. + */ + public static ApiExpression dateFormat( + Object timestamp, + Object format) { + return apiCall(BuiltInFunctionDefinitions.DATE_FORMAT, timestamp, format); + } + + /** + * Returns the (signed) number of {@link TimePointUnit} between timePoint1 and timePoint2. + * + * <p>For example, {@code timestampDiff(TimePointUnit.DAY, lit("2016-06-15").toDate(), lit("2016-06-18").toDate()} + * leads to 3. + * + * @param timePointUnit The unit to compute diff. + * @param timePoint1 The first point in time. + * @param timePoint2 The second point in time. + * @return The number of intervals as integer value. + */ + public static ApiExpression timestampDiff( + TimePointUnit timePointUnit, + Object timePoint1, + Object timePoint2) { + return apiCall(BuiltInFunctionDefinitions.TIMESTAMP_DIFF, valueLiteral(timePointUnit), timePoint1, timePoint2); + } + + /** + * Creates an array of literals. + */ + public static ApiExpression array(Object head, Object... tail) { + return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ARRAY, head, tail); + } + + /** + * Creates a row of expressions. + */ + public static ApiExpression row(Object head, Object... tail) { + return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ROW, head, tail); + } + + /** + * Creates a map of expressions. + * + * <pre>{@code + * table.select( + * map( + * "key1", 1, + * "key2", 2, + * "key3", 3 + * )) + * }</pre> + * + * <p>Note keys and values should have the same types for all entries. + */ + public static ApiExpression map(Object key, Object value, Object... tail) { + return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.MAP, key, value, tail); + } + + /** + * Creates an interval of rows. + * + * @see Table#window(GroupWindow) + * @see Table#window(OverWindow...) + */ + public static ApiExpression rowInterval(Long rows) { + return new ApiExpression(valueLiteral(rows)); + } + + /** + * Returns a value that is closer than any other value to pi. + */ + public static ApiExpression pi() { + return apiCall(BuiltInFunctionDefinitions.PI); + } + + /** + * Returns a value that is closer than any other value to e. + */ + public static ApiExpression e() { + return apiCall(BuiltInFunctionDefinitions.E); + } + + /** + * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive). + */ + public static ApiExpression rand() { + return apiCall(BuiltInFunctionDefinitions.RAND); + } + + /** + * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a + * initial seed. Two rand() functions will return identical sequences of numbers if they + * have same initial seed. + */ + public static ApiExpression rand(Object seed) { + return apiCall(BuiltInFunctionDefinitions.RAND, objectToExpression(seed)); + } + + /** + * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified + * value (exclusive). + */ + public static ApiExpression randInteger(Object bound) { + return apiCall(BuiltInFunctionDefinitions.RAND_INTEGER, objectToExpression(bound)); + } + + /** + * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value + * (exclusive) with a initial seed. Two randInteger() functions will return identical sequences + * of numbers if they have same initial seed and same bound. + */ + public static ApiExpression randInteger(Object seed, Object bound) { + return apiCall(BuiltInFunctionDefinitions.RAND_INTEGER, objectToExpression(seed), objectToExpression(bound)); + } + + /** + * Returns the string that results from concatenating the arguments. + * Returns NULL if any argument is NULL. + */ + public static ApiExpression concat(Object string, Object... strings) { + return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.CONCAT, string, strings); + } + + /** + * Calculates the arc tangent of a given coordinate. + */ + public static ApiExpression atan2(Object y, Object x) { + return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ATAN2, y, x); + } + + /** + * Returns negative numeric. + */ + public static ApiExpression negative(Object v) { + return apiCall(BuiltInFunctionDefinitions.MINUS_PREFIX, v); + } + + /** + * Returns the string that results from concatenating the arguments and separator. + * Returns NULL If the separator is NULL. + * + * <p>Note: this function does not skip empty strings. However, it does skip any NULL + * values after the separator argument. + */ + public static ApiExpression concatWs(Object separator, Object string, Object... strings) { + return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.CONCAT_WS, separator, string, strings); + } + + /** + * Returns an UUID (Universally Unique Identifier) string (e.g., + * "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly + * generated) UUID. The UUID is generated using a cryptographically strong pseudo random number + * generator. + */ + public static ApiExpression uuid() { + return apiCall(BuiltInFunctionDefinitions.UUID); + } + + /** + * Returns a null literal value of a given data type. + * + * <p>e.g. {@code nullOf(DataTypes.INT())} + */ + public static ApiExpression nullOf(DataType dataType) { + return new ApiExpression(valueLiteral(null, dataType)); + } + + /** + * @deprecated This method will be removed in future versions as it uses the old type system. + * It is recommended to use {@link #nullOf(DataType)} instead which uses the new type + * system based on {@link DataTypes}. Please make sure to use either the old or the new + * type system consistently to avoid unintended behavior. See the website + * documentation for more information. + */ + public static ApiExpression nullOf(TypeInformation<?> typeInfo) { + return nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo)); + } + + /** + * Calculates the logarithm of the given value. + */ + public static ApiExpression log(Object value) { + return apiCall(BuiltInFunctionDefinitions.LOG, value); + } + + /** + * Calculates the logarithm of the given value to the given base. + */ + public static ApiExpression log(Object base, Object value) { + return apiCall(BuiltInFunctionDefinitions.LOG, base, value); + } + + /** + * Ternary conditional operator that decides which of two other expressions should be evaluated + * based on a evaluated boolean condition. + * + * <p>e.g. ifThenElse($("f0") > 5, "A", "B") leads to "A" + * + * @param condition boolean condition + * @param ifTrue expression to be evaluated if condition holds + * @param ifFalse expression to be evaluated if condition does not hold + */ + public static ApiExpression ifThenElse(Object condition, Object ifTrue, Object ifFalse) { + return apiCall(BuiltInFunctionDefinitions.IF, condition, ifTrue, ifFalse); + } + + /** + * Creates an expression that selects a range of columns. It can be used wherever an array of + * expression is accepted such as function calls, projections, or groupings. + * + * <p>A range can either be index-based or name-based. Indices start at 1 and boundaries are + * inclusive. + * + * <p>e.g. withColumns(range("b", "c")) or withoutColumns($("*")) + */ + public static ApiExpression withColumns(Object head, Object... tail) { + return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITH_COLUMNS, head, tail); + } + + /** + * Creates an expression that selects all columns except for the given range of columns. It can + * be used wherever an array of expression is accepted such as function calls, projections, or + * groupings. + * + * <p>A range can either be index-based or name-based. Indices start at 1 and boundaries are + * inclusive. + * + * <p>e.g. withoutColumns(range("b", "c")) or withoutColumns($("c")) + */ + public static ApiExpression withoutColumns(Object head, Object... tail) { + return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITHOUT_COLUMNS, head, tail); + } + + /** + * A call to a function that will be looked up in a catalog. There are two kinds of functions: + * <ul> + * <li>System functions - which are identified with one part names</li> + * <li>Catalog functions - which are identified always with three parts names + * (catalog, database, function)</li> + * </ul> + * + * <p>Moreover each function can either be a temporary function or permanent one + * (which is stored in an external catalog). + * + * <p>Based on that two properties the resolution order for looking up a function based on + * the provided {@code functionName} is following: + * <ul> + * <li>Temporary system function</li> + * <li>System function</li> + * <li>Temporary catalog function</li> + * <li>Catalog function</li> + * </ul> + * + * @see TableEnvironment#useCatalog(String) + * @see TableEnvironment#useDatabase(String) + * @see TableEnvironment#createTemporaryFunction + * @see TableEnvironment#createTemporarySystemFunction + */ + public static ApiExpression call(String path, Object... params) { + return new ApiExpression(ApiExpressionUtils.lookupCall( + path, + Arrays.stream(params).map(ApiExpressionUtils::objectToExpression).toArray(Expression[]::new))); + } + + private static ApiExpression apiCall(FunctionDefinition functionDefinition, Object... args) { + List<Expression> arguments = + Stream.of(args) + .map(ApiExpressionUtils::objectToExpression) + .collect(Collectors.toList()); + return new ApiExpression(unresolvedCall(functionDefinition, arguments)); + } + + private static ApiExpression apiCallAtLeastOneArgument(FunctionDefinition functionDefinition, + Object arg0, + Object... args) { + List<Expression> arguments = Stream.concat( + Stream.of(arg0), + Stream.of(args) + ).map(ApiExpressionUtils::objectToExpression) + .collect(Collectors.toList()); + return new ApiExpression(unresolvedCall(functionDefinition, arguments)); + } + + private static ApiExpression apiCallAtLeastTwoArgument( + FunctionDefinition functionDefinition, + Object arg0, + Object arg1, + Object... args) { + List<Expression> arguments = Stream.concat( + Stream.of(arg0, arg1), + Stream.of(args) + ).map(ApiExpressionUtils::objectToExpression) + .collect(Collectors.toList()); + return new ApiExpression(unresolvedCall(functionDefinition, arguments)); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java index 4601e75..36cd855 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; /** @@ -41,8 +42,8 @@ public abstract class GroupWindow { private final Expression timeField; GroupWindow(Expression alias, Expression timeField) { - this.alias = alias; - this.timeField = timeField; + this.alias = ApiExpressionUtils.unwrapFromApi(alias); + this.timeField = ApiExpressionUtils.unwrapFromApi(timeField); } public Expression getAlias() { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java index 6e646f7..0d9ee6d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionParser; @@ -36,7 +37,7 @@ public final class SessionWithGap { private final Expression gap; SessionWithGap(Expression gap) { - this.gap = gap; + this.gap = ApiExpressionUtils.unwrapFromApi(gap); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java index 53c9c4f..ef63dde 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionParser; @@ -32,8 +33,8 @@ public final class SessionWithGapOnTime { private final Expression gap; SessionWithGapOnTime(Expression timeField, Expression gap) { - this.timeField = timeField; - this.gap = gap; + this.timeField = ApiExpressionUtils.unwrapFromApi(timeField); + this.gap = ApiExpressionUtils.unwrapFromApi(gap); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java index 891ea7c..b88a93e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; /** @@ -31,7 +32,7 @@ public final class SessionWithGapOnTimeWithAlias extends GroupWindow { SessionWithGapOnTimeWithAlias(Expression alias, Expression timeField, Expression gap) { super(alias, timeField); - this.gap = gap; + this.gap = ApiExpressionUtils.unwrapFromApi(gap); } public Expression getGap() { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java index 44470f9..983d07d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionParser; @@ -32,7 +33,7 @@ public final class SlideWithSize { private final Expression size; SlideWithSize(Expression size) { - this.size = size; + this.size = ApiExpressionUtils.unwrapFromApi(size); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java index 4d509d1..d4d83c1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionParser; @@ -37,8 +38,8 @@ public final class SlideWithSizeAndSlide { private final Expression slide; SlideWithSizeAndSlide(Expression size, Expression slide) { - this.size = size; - this.slide = slide; + this.size = ApiExpressionUtils.unwrapFromApi(size); + this.slide = ApiExpressionUtils.unwrapFromApi(slide); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java index 604b9cd..c74529a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionParser; @@ -33,12 +34,12 @@ public final class SlideWithSizeAndSlideOnTime { private final Expression slide; SlideWithSizeAndSlideOnTime( - Expression timeField, - Expression size, - Expression slide) { - this.timeField = timeField; - this.size = size; - this.slide = slide; + Expression timeField, + Expression size, + Expression slide) { + this.timeField = ApiExpressionUtils.unwrapFromApi(timeField); + this.size = ApiExpressionUtils.unwrapFromApi(size); + this.slide = ApiExpressionUtils.unwrapFromApi(slide); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java index 50b3692..8b9c692 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; /** @@ -31,13 +32,13 @@ public final class SlideWithSizeAndSlideOnTimeWithAlias extends GroupWindow { private final Expression slide; SlideWithSizeAndSlideOnTimeWithAlias( - Expression alias, - Expression timeField, - Expression size, - Expression slide) { + Expression alias, + Expression timeField, + Expression size, + Expression slide) { super(alias, timeField); - this.size = size; - this.slide = slide; + this.size = ApiExpressionUtils.unwrapFromApi(size); + this.slide = ApiExpressionUtils.unwrapFromApi(slide); } public Expression getSize() { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java index 600e3a1..bbc1825 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionParser; @@ -36,7 +37,7 @@ public final class TumbleWithSize { private Expression size; TumbleWithSize(Expression size) { - this.size = size; + this.size = ApiExpressionUtils.unwrapFromApi(size); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java index f635ba5..07616d1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionParser; @@ -32,8 +33,8 @@ public final class TumbleWithSizeOnTime { private final Expression size; TumbleWithSizeOnTime(Expression time, Expression size) { - this.time = time; - this.size = size; + this.time = ApiExpressionUtils.unwrapFromApi(time); + this.size = ApiExpressionUtils.unwrapFromApi(size); } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java index 5180d33..4e25a4f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; /** @@ -31,7 +32,7 @@ public final class TumbleWithSizeOnTimeWithAlias extends GroupWindow { TumbleWithSizeOnTimeWithAlias(Expression alias, Expression timeField, Expression size) { super(alias, timeField); - this.size = size; + this.size = ApiExpressionUtils.unwrapFromApi(size); } public Expression getSize() { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java new file mode 100644 index 0000000..41cb93c --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -0,0 +1,1284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.internal; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Expressions; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.TimeIntervalUnit; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.DataType; + +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_DAY; +import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_HOUR; +import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_MINUTE; +import static org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_SECOND; +import static org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression; +import static org.apache.flink.table.expressions.ApiExpressionUtils.tableRef; +import static org.apache.flink.table.expressions.ApiExpressionUtils.toMilliInterval; +import static org.apache.flink.table.expressions.ApiExpressionUtils.toMonthInterval; +import static org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral; +import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; +import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; + +//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.*; +//CHECKSTYLE.ON: AvoidStarImport|ImportOrder + +/** + * These are Java and Scala common operations that can be used to construct an {@link Expression} AST for + * expression operations. + * + * @param <InType> The accepted type of input expressions, it is {@code Expression} for Scala and + * {@code Object} for Java. Generally the expression DSL works on expressions, the + * reason why Java accepts Object is to remove cumbersome call to {@code lit()} for + * literals. Scala alleviates this problem via implicit conversions. + * @param <OutType> The produced type of the DSL. It is {@code ApiExpression} for Java and {@code Expression} + * for Scala. In Scala the infix operations are included via implicit conversions. In Java + * we introduced a wrapper that enables the operations without pulling them through the whole stack. + */ +@PublicEvolving +public abstract class BaseExpressions<InType, OutType> { + protected abstract Expression toExpr(); + + protected abstract OutType toApiSpecificExpression(Expression expression); + + /** + * Specifies a name for an expression i.e. a field. + * + * @param name name for one field + * @param extraNames additional names if the expression expands to multiple fields + */ + public OutType as(String name, String... extraNames) { + return toApiSpecificExpression(ApiExpressionUtils.unresolvedCall( + BuiltInFunctionDefinitions.AS, + Stream.concat( + Stream.of(toExpr(), ApiExpressionUtils.valueLiteral(name)), + Stream.of(extraNames).map(ApiExpressionUtils::valueLiteral) + ).toArray(Expression[]::new))); + } + + /** + * Boolean AND in three-valued logic. This is an infix notation. See also + * {@link Expressions#and(Object, Object, Object...)} for prefix notation with multiple arguments. + * + * @see Expressions#and(Object, Object, Object...) + */ + public OutType and(InType other) { + return toApiSpecificExpression(unresolvedCall(AND, toExpr(), objectToExpression(other))); + } + + /** + * Boolean OR in three-valued logic. This is an infix notation. See also + * {@link Expressions#or(Object, Object, Object...)} for prefix notation with multiple arguments. + * + * @see Expressions#or(Object, Object, Object...) + */ + public OutType or(InType other) { + return toApiSpecificExpression(unresolvedCall(OR, toExpr(), objectToExpression(other))); + } + + /** + * Greater than. + */ + public OutType isGreater(InType other) { + return toApiSpecificExpression(unresolvedCall(GREATER_THAN, toExpr(), objectToExpression(other))); + } + + /** + * Greater than or equal. + */ + public OutType isGreaterOrEqual(InType other) { + return toApiSpecificExpression(unresolvedCall(GREATER_THAN_OR_EQUAL, toExpr(), objectToExpression(other))); + } + + /** + * Less than. + */ + public OutType isLess(InType other) { + return toApiSpecificExpression(unresolvedCall(LESS_THAN, toExpr(), objectToExpression(other))); + } + + /** + * Less than or equal. + */ + public OutType isLessOrEqual(InType other) { + return toApiSpecificExpression(unresolvedCall(LESS_THAN_OR_EQUAL, toExpr(), objectToExpression(other))); + } + + /** + * Equals. + */ + public OutType isEqual(InType other) { + return toApiSpecificExpression(unresolvedCall(EQUALS, toExpr(), objectToExpression(other))); + } + + /** + * Not equal. + */ + public OutType isNotEqual(InType other) { + return toApiSpecificExpression(unresolvedCall(NOT_EQUALS, toExpr(), objectToExpression(other))); + } + + /** + * Returns left plus right. + */ + public OutType plus(InType other) { + return toApiSpecificExpression(unresolvedCall(PLUS, toExpr(), objectToExpression(other))); + } + + /** + * Returns left minus right. + */ + public OutType minus(InType other) { + return toApiSpecificExpression(unresolvedCall(MINUS, toExpr(), objectToExpression(other))); + } + + /** + * Returns left divided by right. + */ + public OutType dividedBy(InType other) { + return toApiSpecificExpression(unresolvedCall(DIVIDE, toExpr(), objectToExpression(other))); + } + + /** + * Returns left multiplied by right. + */ + public OutType times(InType other) { + return toApiSpecificExpression(unresolvedCall(TIMES, toExpr(), objectToExpression(other))); + } + + /** + * Returns true if the given expression is between lowerBound and upperBound (both inclusive). + * False otherwise. The parameters must be numeric types or identical comparable types. + * + * @param lowerBound numeric or comparable expression + * @param upperBound numeric or comparable expression + */ + public OutType between(InType lowerBound, InType upperBound) { + return toApiSpecificExpression(unresolvedCall( + BETWEEN, + toExpr(), + objectToExpression(lowerBound), + objectToExpression(upperBound))); + } + + /** + * Returns true if the given expression is not between lowerBound and upperBound (both + * inclusive). False otherwise. The parameters must be numeric types or identical + * comparable types. + * + * @param lowerBound numeric or comparable expression + * @param upperBound numeric or comparable expression + */ + public OutType notBetween(InType lowerBound, InType upperBound) { + return toApiSpecificExpression(unresolvedCall( + NOT_BETWEEN, + toExpr(), + objectToExpression(lowerBound), + objectToExpression(upperBound))); + } + + /** + * Ternary conditional operator that decides which of two other expressions should be evaluated + * based on a evaluated boolean condition. + * + * <p>e.g. lit(42).isGreater(5).then("A", "B") leads to "A" + * + * @param ifTrue expression to be evaluated if condition holds + * @param ifFalse expression to be evaluated if condition does not hold + */ + public OutType then(InType ifTrue, InType ifFalse) { + return toApiSpecificExpression(unresolvedCall( + IF, + toExpr(), + objectToExpression(ifTrue), + objectToExpression(ifFalse))); + } + + /** + * Returns true if the given expression is null. + */ + public OutType isNull() { + return toApiSpecificExpression(unresolvedCall(IS_NULL, toExpr())); + } + + /** + * Returns true if the given expression is not null. + */ + public OutType isNotNull() { + return toApiSpecificExpression(unresolvedCall(IS_NOT_NULL, toExpr())); + } + + /** + * Returns true if given boolean expression is true. False otherwise (for null and false). + */ + public OutType isTrue() { + return toApiSpecificExpression(unresolvedCall(IS_TRUE, toExpr())); + } + + /** + * Returns true if given boolean expression is false. False otherwise (for null and true). + */ + public OutType isFalse() { + return toApiSpecificExpression(unresolvedCall(IS_FALSE, toExpr())); + } + + /** + * Returns true if given boolean expression is not true (for null and false). False otherwise. + */ + public OutType isNotTrue() { + return toApiSpecificExpression(unresolvedCall(IS_NOT_TRUE, toExpr())); + } + + /** + * Returns true if given boolean expression is not false (for null and true). False otherwise. + */ + public OutType isNotFalse() { + return toApiSpecificExpression(unresolvedCall(IS_NOT_FALSE, toExpr())); + } + + /** + * Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT a), declares that an + * aggregation function is only applied on distinct input values. + * + * <p>For example: + * <pre> + * {@code + * orders + * .groupBy($("a")) + * .select($("a"), $("b").sum().distinct().as("d")) + * } + * </pre> + */ + public OutType distinct() { + return toApiSpecificExpression(unresolvedCall(DISTINCT, toExpr())); + } + + /** + * Returns the sum of the numeric field across all input values. + * If all values are null, null is returned. + */ + public OutType sum() { + return toApiSpecificExpression(unresolvedCall(SUM, toExpr())); + } + + /** + * Returns the sum of the numeric field across all input values. + * If all values are null, 0 is returned. + */ + public OutType sum0() { + return toApiSpecificExpression(unresolvedCall(SUM0, toExpr())); + } + + /** + * Returns the minimum value of field across all input values. + */ + public OutType min() { + return toApiSpecificExpression(unresolvedCall(MIN, toExpr())); + } + + /** + * Returns the maximum value of field across all input values. + */ + public OutType max() { + return toApiSpecificExpression(unresolvedCall(MAX, toExpr())); + } + + /** + * Returns the number of input rows for which the field is not null. + */ + public OutType count() { + return toApiSpecificExpression(unresolvedCall(COUNT, toExpr())); + } + + /** + * Returns the average (arithmetic mean) of the numeric field across all input values. + */ + public OutType avg() { + return toApiSpecificExpression(unresolvedCall(AVG, toExpr())); + } + + /** + * Returns the population standard deviation of an expression (the square root of varPop()). + */ + public OutType stddevPop() { + return toApiSpecificExpression(unresolvedCall(STDDEV_POP, toExpr())); + } + + /** + * Returns the sample standard deviation of an expression (the square root of varSamp()). + */ + public OutType stddevSamp() { + return toApiSpecificExpression(unresolvedCall(STDDEV_SAMP, toExpr())); + } + + /** + * Returns the population standard variance of an expression. + */ + public OutType varPop() { + return toApiSpecificExpression(unresolvedCall(VAR_POP, toExpr())); + } + + /** + * Returns the sample variance of a given expression. + */ + public OutType varSamp() { + return toApiSpecificExpression(unresolvedCall(VAR_SAMP, toExpr())); + } + + /** + * Returns multiset aggregate of a given expression. + */ + public OutType collect() { + return toApiSpecificExpression(unresolvedCall(COLLECT, toExpr())); + } + + /** + * Converts a value to a given data type. + * + * <p>e.g. "42".cast(DataTypes.INT()) leads to 42. + */ + public OutType cast(DataType toType) { + return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), typeLiteral(toType))); + } + + /** + * @deprecated This method will be removed in future versions as it uses the old type system. It + * is recommended to use {@link #cast(DataType)} instead which uses the new type system + * based on {@link org.apache.flink.table.api.DataTypes}. Please make sure to use either the old + * or the new type system consistently to avoid unintended behavior. See the website documentation + * for more information. + */ + @Deprecated + public OutType cast(TypeInformation<?> toType) { + return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), typeLiteral(fromLegacyInfoToDataType(toType)))); + } + + /** + * Specifies ascending order of an expression i.e. a field for orderBy unresolvedCall. + */ + public OutType asc() { + return toApiSpecificExpression(unresolvedCall(ORDER_ASC, toExpr())); + } + + /** + * Specifies descending order of an expression i.e. a field for orderBy unresolvedCall. + */ + public OutType desc() { + return toApiSpecificExpression(unresolvedCall(ORDER_DESC, toExpr())); + } + + /** + * Returns true if an expression exists in a given list of expressions. This is a shorthand + * for multiple OR conditions. + * + * <p>If the testing set contains null, the result will be null if the element can not be found + * and true if it can be found. If the element is null, the result is always null. + * + * <p>e.g. lit("42").in(1, 2, 3) leads to false. + */ + @SafeVarargs + public final OutType in(InType... elements) { + Expression[] args = Stream.concat( + Stream.of(toExpr()), + Arrays.stream(elements).map(ApiExpressionUtils::objectToExpression)) + .toArray(Expression[]::new); + return toApiSpecificExpression(unresolvedCall(IN, args)); + } + + /** + * Returns true if an expression exists in a given table sub-query. The sub-query table + * must consist of one column. This column must have the same data type as the expression. + * + * <p>Note: This operation is not supported in a streaming environment yet. + */ + public OutType in(Table table) { + return toApiSpecificExpression(unresolvedCall(IN, toExpr(), tableRef(table.toString(), table))); + } + + /** + * Returns the start time (inclusive) of a window when applied on a window reference. + */ + public OutType start() { + return toApiSpecificExpression(unresolvedCall(WINDOW_START, toExpr())); + } + + /** + * Returns the end time (exclusive) of a window when applied on a window reference. + * + * <p>e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000. + */ + public OutType end() { + return toApiSpecificExpression(unresolvedCall(WINDOW_END, toExpr())); + } + + /** + * Calculates the remainder of division the given number by another one. + */ + public OutType mod(InType other) { + return toApiSpecificExpression(unresolvedCall(MOD, toExpr(), objectToExpression(other))); + } + + /** + * Calculates the Euler's number raised to the given power. + */ + public OutType exp() { + return toApiSpecificExpression(unresolvedCall(EXP, toExpr())); + } + + /** + * Calculates the base 10 logarithm of the given value. + */ + public OutType log10() { + return toApiSpecificExpression(unresolvedCall(LOG10, toExpr())); + } + + /** + * Calculates the base 2 logarithm of the given value. + */ + public OutType log2() { + return toApiSpecificExpression(unresolvedCall(LOG2, toExpr())); + } + + /** + * Calculates the natural logarithm of the given value. + */ + public OutType ln() { + return toApiSpecificExpression(unresolvedCall(LN, toExpr())); + } + + /** + * Calculates the natural logarithm of the given value. + */ + public OutType log() { + return toApiSpecificExpression(unresolvedCall(LOG, toExpr())); + } + + /** + * Calculates the logarithm of the given value to the given base. + */ + public OutType log(InType base) { + return toApiSpecificExpression(unresolvedCall(LOG, objectToExpression(base), toExpr())); + } + + /** + * Calculates the given number raised to the power of the other value. + */ + public OutType power(InType other) { + return toApiSpecificExpression(unresolvedCall(POWER, toExpr(), objectToExpression(other))); + } + + /** + * Calculates the hyperbolic cosine of a given value. + */ + public OutType cosh() { + return toApiSpecificExpression(unresolvedCall(COSH, toExpr())); + } + + /** + * Calculates the square root of a given value. + */ + public OutType sqrt() { + return toApiSpecificExpression(unresolvedCall(SQRT, toExpr())); + } + + /** + * Calculates the absolute value of given value. + */ + public OutType abs() { + return toApiSpecificExpression(unresolvedCall(ABS, toExpr())); + } + + /** + * Calculates the largest integer less than or equal to a given number. + */ + public OutType floor() { + return toApiSpecificExpression(unresolvedCall(FLOOR, toExpr())); + } + + /** + * Calculates the hyperbolic sine of a given value. + */ + public OutType sinh() { + return toApiSpecificExpression(unresolvedCall(SINH, toExpr())); + } + + /** + * Calculates the smallest integer greater than or equal to a given number. + */ + public OutType ceil() { + return toApiSpecificExpression(unresolvedCall(CEIL, toExpr())); + } + + /** + * Calculates the sine of a given number. + */ + public OutType sin() { + return toApiSpecificExpression(unresolvedCall(SIN, toExpr())); + } + + /** + * Calculates the cosine of a given number. + */ + public OutType cos() { + return toApiSpecificExpression(unresolvedCall(COS, toExpr())); + } + + /** + * Calculates the tangent of a given number. + */ + public OutType tan() { + return toApiSpecificExpression(unresolvedCall(TAN, toExpr())); + } + + /** + * Calculates the cotangent of a given number. + */ + public OutType cot() { + return toApiSpecificExpression(unresolvedCall(COT, toExpr())); + } + + /** + * Calculates the arc sine of a given number. + */ + public OutType asin() { + return toApiSpecificExpression(unresolvedCall(ASIN, toExpr())); + } + + /** + * Calculates the arc cosine of a given number. + */ + public OutType acos() { + return toApiSpecificExpression(unresolvedCall(ACOS, toExpr())); + } + + /** + * Calculates the arc tangent of a given number. + */ + public OutType atan() { + return toApiSpecificExpression(unresolvedCall(ATAN, toExpr())); + } + + /** + * Calculates the hyperbolic tangent of a given number. + */ + public OutType tanh() { + return toApiSpecificExpression(unresolvedCall(TANH, toExpr())); + } + + /** + * Converts numeric from radians to degrees. + */ + public OutType degrees() { + return toApiSpecificExpression(unresolvedCall(DEGREES, toExpr())); + } + + /** + * Converts numeric from degrees to radians. + */ + public OutType radians() { + return toApiSpecificExpression(unresolvedCall(RADIANS, toExpr())); + } + + /** + * Calculates the signum of a given number. + */ + public OutType sign() { + return toApiSpecificExpression(unresolvedCall(SIGN, toExpr())); + } + + /** + * Rounds the given number to integer places right to the decimal point. + */ + public OutType round(InType places) { + return toApiSpecificExpression(unresolvedCall(ROUND, toExpr(), objectToExpression(places))); + } + + /** + * Returns a string representation of an integer numeric value in binary format. Returns null if + * numeric is null. E.g. "4" leads to "100", "12" leads to "1100". + */ + public OutType bin() { + return toApiSpecificExpression(unresolvedCall(BIN, toExpr())); + } + + /** + * Returns a string representation of an integer numeric value or a string in hex format. Returns + * null if numeric or string is null. + * + * <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world" leads + * to "68656c6c6f2c776f726c64". + */ + public OutType hex() { + return toApiSpecificExpression(unresolvedCall(HEX, toExpr())); + } + + /** + * Returns a number of truncated to n decimal places. + * If n is 0,the result has no decimal point or fractional part. + * n can be negative to cause n digits left of the decimal point of the value to become zero. + * E.g. truncate(42.345, 2) to 42.34. + */ + public OutType truncate(InType n) { + return toApiSpecificExpression(unresolvedCall(TRUNCATE, toExpr(), objectToExpression(n))); + } + + /** + * Returns a number of truncated to 0 decimal places. + * E.g. truncate(42.345) to 42.0. + */ + public OutType truncate() { + return toApiSpecificExpression(unresolvedCall(TRUNCATE, toExpr())); + } + + // String operations + + /** + * Creates a substring of the given string at given index for a given length. + * + * @param beginIndex first character of the substring (starting at 1, inclusive) + * @param length number of characters of the substring + */ + public OutType substring(InType beginIndex, InType length) { + return toApiSpecificExpression(unresolvedCall(SUBSTRING, toExpr(), objectToExpression(beginIndex), objectToExpression(length))); + } + + /** + * Creates a substring of the given string beginning at the given index to the end. + * + * @param beginIndex first character of the substring (starting at 1, inclusive) + */ + public OutType substring(InType beginIndex) { + return toApiSpecificExpression(unresolvedCall(SUBSTRING, toExpr(), objectToExpression(beginIndex))); + } + + /** + * Removes leading space characters from the given string. + */ + public OutType trimLeading() { + return toApiSpecificExpression(unresolvedCall( + TRIM, + valueLiteral(true), + valueLiteral(false), + valueLiteral(" "), + toExpr())); + } + + /** + * Removes leading characters from the given string. + * + * @param character string containing the character + */ + public OutType trimLeading(InType character) { + return toApiSpecificExpression(unresolvedCall( + TRIM, + valueLiteral(true), + valueLiteral(false), + objectToExpression(character), + toExpr())); + } + + /** + * Removes trailing space characters from the given string. + */ + public OutType trimTrailing() { + return toApiSpecificExpression(unresolvedCall( + TRIM, + valueLiteral(false), + valueLiteral(true), + valueLiteral(" "), + toExpr())); + } + + /** + * Removes trailing characters from the given string. + * + * @param character string containing the character + */ + public OutType trimTrailing(InType character) { + return toApiSpecificExpression(unresolvedCall( + TRIM, + valueLiteral(false), + valueLiteral(true), + objectToExpression(character), + toExpr())); + } + + /** + * Removes leading and trailing space characters from the given string. + */ + public OutType trim() { + return toApiSpecificExpression(unresolvedCall( + TRIM, + valueLiteral(true), + valueLiteral(true), + valueLiteral(" "), + toExpr())); + } + + /** + * Removes leading and trailing characters from the given string. + * + * @param character string containing the character + */ + public OutType trim(InType character) { + return toApiSpecificExpression(unresolvedCall( + TRIM, + valueLiteral(true), + valueLiteral(true), + objectToExpression(character), + toExpr())); + } + + /** + * Returns a new string which replaces all the occurrences of the search target + * with the replacement string (non-overlapping). + */ + public OutType replace(InType search, InType replacement) { + return toApiSpecificExpression(unresolvedCall(REPLACE, toExpr(), objectToExpression(search), objectToExpression(replacement))); + } + + /** + * Returns the length of a string. + */ + public OutType charLength() { + return toApiSpecificExpression(unresolvedCall(CHAR_LENGTH, toExpr())); + } + + /** + * Returns all of the characters in a string in upper case using the rules of + * the default locale. + */ + public OutType upperCase() { + return toApiSpecificExpression(unresolvedCall(UPPER, toExpr())); + } + + /** + * Returns all of the characters in a string in lower case using the rules of + * the default locale. + */ + public OutType lowerCase() { + return toApiSpecificExpression(unresolvedCall(LOWER, toExpr())); + } + + /** + * Converts the initial letter of each word in a string to uppercase. + * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace. + */ + public OutType initCap() { + return toApiSpecificExpression(unresolvedCall(INIT_CAP, toExpr())); + } + + /** + * Returns true, if a string matches the specified LIKE pattern. + * + * <p>e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n" + */ + public OutType like(InType pattern) { + return toApiSpecificExpression(unresolvedCall(LIKE, toExpr(), objectToExpression(pattern))); + } + + /** + * Returns true, if a string matches the specified SQL regex pattern. + * + * <p>e.g. "A+" matches all strings that consist of at least one A + */ + public OutType similar(InType pattern) { + return toApiSpecificExpression(unresolvedCall(SIMILAR, toExpr(), objectToExpression(pattern))); + } + + /** + * Returns the position of string in an other string starting at 1. + * Returns 0 if string could not be found. + * + * <p>e.g. lit("a").position("bbbbba") leads to 6 + */ + public OutType position(InType haystack) { + return toApiSpecificExpression(unresolvedCall(POSITION, toExpr(), objectToExpression(haystack))); + } + + /** + * Returns a string left-padded with the given pad string to a length of len characters. If + * the string is longer than len, the return value is shortened to len characters. + * + * <p>e.g. lit("hi").lpad(4, "??") returns "??hi", lit("hi").lpad(1, '??') returns "h" + */ + public OutType lpad(InType len, InType pad) { + return toApiSpecificExpression(unresolvedCall(LPAD, toExpr(), objectToExpression(len), objectToExpression(pad))); + } + + /** + * Returns a string right-padded with the given pad string to a length of len characters. If + * the string is longer than len, the return value is shortened to len characters. + * + * <p>e.g. lit("hi").rpad(4, "??") returns "hi??", lit("hi").rpad(1, '??') returns "h" + */ + public OutType rpad(InType len, InType pad) { + return toApiSpecificExpression(unresolvedCall(RPAD, toExpr(), objectToExpression(len), objectToExpression(pad))); + } + + /** + * Defines an aggregation to be used for a previously specified over window. + * + * <p>For example: + * + * <pre> + * {@code + * table + * .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) + * .select('c, 'a, 'a.count over 'w, 'a.sum over 'w) + * }</pre> + */ + public OutType over(InType alias) { + return toApiSpecificExpression(unresolvedCall(OVER, toExpr(), objectToExpression(alias))); + } + + /** + * Replaces a substring of string with a string starting at a position (starting at 1). + * + * <p>e.g. lit("xxxxxtest").overlay("xxxx", 6) leads to "xxxxxxxxx" + */ + public OutType overlay(InType newString, InType starting) { + return toApiSpecificExpression(unresolvedCall( + OVERLAY, + toExpr(), + objectToExpression(newString), + objectToExpression(starting))); + } + + /** + * Replaces a substring of string with a string starting at a position (starting at 1). + * The length specifies how many characters should be removed. + * + * <p>e.g. lit("xxxxxtest").overlay("xxxx", 6, 2) leads to "xxxxxxxxxst" + */ + public OutType overlay(InType newString, InType starting, InType length) { + return toApiSpecificExpression(unresolvedCall( + OVERLAY, + toExpr(), + objectToExpression(newString), + objectToExpression(starting), + objectToExpression(length))); + } + + /** + * Returns a string with all substrings that match the regular expression consecutively + * being replaced. + */ + public OutType regexpReplace(InType regex, InType replacement) { + return toApiSpecificExpression(unresolvedCall( + REGEXP_REPLACE, + toExpr(), + objectToExpression(regex), + objectToExpression(replacement))); + } + + /** + * Returns a string extracted with a specified regular expression and a regex match group + * index. + */ + public OutType regexpExtract(InType regex, InType extractIndex) { + return toApiSpecificExpression(unresolvedCall( + REGEXP_EXTRACT, + toExpr(), + objectToExpression(regex), + objectToExpression(extractIndex))); + } + + /** + * Returns a string extracted with a specified regular expression. + */ + public OutType regexpExtract(InType regex) { + return toApiSpecificExpression(unresolvedCall(REGEXP_EXTRACT, toExpr(), objectToExpression(regex))); + } + + /** + * Returns the base string decoded with base64. + */ + public OutType fromBase64() { + return toApiSpecificExpression(unresolvedCall(FROM_BASE64, toExpr())); + } + + /** + * Returns the base64-encoded result of the input string. + */ + public OutType toBase64() { + return toApiSpecificExpression(unresolvedCall(TO_BASE64, toExpr())); + } + + /** + * Returns a string that removes the left whitespaces from the given string. + */ + public OutType ltrim() { + return toApiSpecificExpression(unresolvedCall(LTRIM, toExpr())); + } + + /** + * Returns a string that removes the right whitespaces from the given string. + */ + public OutType rtrim() { + return toApiSpecificExpression(unresolvedCall(RTRIM, toExpr())); + } + + /** + * Returns a string that repeats the base string n times. + */ + public OutType repeat(InType n) { + return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(), objectToExpression(n))); + } + + // Temporal operations + + /** + * Parses a date string in the form "yyyy-MM-dd" to a SQL Date. + */ + public OutType toDate() { + return toApiSpecificExpression(unresolvedCall( + CAST, + toExpr(), + typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))); + } + + /** + * Parses a time string in the form "HH:mm:ss" to a SQL Time. + */ + public OutType toTime() { + return toApiSpecificExpression(unresolvedCall( + CAST, + toExpr(), + typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))); + } + + /** + * Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp. + */ + public OutType toTimestamp() { + return toApiSpecificExpression(unresolvedCall( + CAST, + toExpr(), + typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))); + } + + /** + * Extracts parts of a time point or time interval. Returns the part as a long value. + * + * <p>e.g. lit("2006-06-05").toDate().extract(DAY) leads to 5 + */ + public OutType extract(TimeIntervalUnit timeIntervalUnit) { + return toApiSpecificExpression(unresolvedCall(EXTRACT, valueLiteral(timeIntervalUnit), toExpr())); + } + + /** + * Rounds down a time point to the given unit. + * + * <p>e.g. lit("12:44:31").toDate().floor(MINUTE) leads to 12:44:00 + */ + public OutType floor(TimeIntervalUnit timeIntervalUnit) { + return toApiSpecificExpression(unresolvedCall(FLOOR, valueLiteral(timeIntervalUnit), toExpr())); + } + + /** + * Rounds up a time point to the given unit. + * + * <p>e.g. lit("12:44:31").toDate().ceil(MINUTE) leads to 12:45:00 + */ + public OutType ceil(TimeIntervalUnit timeIntervalUnit) { + return toApiSpecificExpression(unresolvedCall( + CEIL, + valueLiteral(timeIntervalUnit), + toExpr())); + } + + // Advanced type helper functions + + /** + * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and + * returns it's value. + * + * @param name name of the field (similar to Flink's field expressions) + */ + public OutType get(String name) { + return toApiSpecificExpression(unresolvedCall(GET, toExpr(), valueLiteral(name))); + } + + /** + * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and + * returns it's value. + * + * @param index position of the field + */ + public OutType get(int index) { + return toApiSpecificExpression(unresolvedCall(GET, toExpr(), valueLiteral(index))); + } + + /** + * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes + * into a flat representation where every subtype is a separate field. + */ + public OutType flatten() { + return toApiSpecificExpression(unresolvedCall(FLATTEN, toExpr())); + } + + /** + * Accesses the element of an array or map based on a key or an index (starting at 1). + * + * @param index key or position of the element (array index starting at 1) + */ + public OutType at(InType index) { + return toApiSpecificExpression(unresolvedCall(AT, toExpr(), objectToExpression(index))); + } + + /** + * Returns the number of elements of an array or number of entries of a map. + */ + public OutType cardinality() { + return toApiSpecificExpression(unresolvedCall(CARDINALITY, toExpr())); + } + + /** + * Returns the sole element of an array with a single element. Returns null if the array is + * empty. Throws an exception if the array has more than one element. + */ + public OutType element() { + return toApiSpecificExpression(unresolvedCall(ARRAY_ELEMENT, toExpr())); + } + + // Time definition + + /** + * Declares a field as the rowtime attribute for indicating, accessing, and working in + * Flink's event time. + */ + public OutType rowtime() { + return toApiSpecificExpression(unresolvedCall(ROWTIME, toExpr())); + } + + /** + * Declares a field as the proctime attribute for indicating, accessing, and working in + * Flink's processing time. + */ + public OutType proctime() { + return toApiSpecificExpression(unresolvedCall(PROCTIME, toExpr())); + } + + /** + * Creates an interval of the given number of years. + * + * <p>The produced expression is of type {@code DataTypes.INTERVAL} + */ + public OutType year() { + return toApiSpecificExpression(toMonthInterval(toExpr(), 12)); + } + + /** + * Creates an interval of the given number of years. + */ + public OutType years() { + return year(); + } + + /** + * Creates an interval of the given number of quarters. + */ + public OutType quarter() { + return toApiSpecificExpression(toMonthInterval(toExpr(), 3)); + } + + /** + * Creates an interval of the given number of quarters. + */ + public OutType quarters() { + return quarter(); + } + + /** + * Creates an interval of the given number of months. + */ + public OutType month() { + return toApiSpecificExpression(toMonthInterval(toExpr(), 1)); + } + + /** + * Creates an interval of the given number of months. + */ + public OutType months() { + return month(); + } + + /** + * Creates an interval of the given number of weeks. + */ + public OutType week() { + return toApiSpecificExpression(toMilliInterval(toExpr(), 7 * MILLIS_PER_DAY)); + } + + /** + * Creates an interval of the given number of weeks. + */ + public OutType weeks() { + return week(); + } + + /** + * Creates an interval of the given number of days. + */ + public OutType day() { + return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_DAY)); + } + + /** + * Creates an interval of the given number of days. + */ + public OutType days() { + return day(); + } + + /** + * Creates an interval of the given number of hours. + */ + public OutType hour() { + return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_HOUR)); + } + + /** + * Creates an interval of the given number of hours. + */ + public OutType hours() { + return hour(); + } + + /** + * Creates an interval of the given number of minutes. + */ + public OutType minute() { + return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_MINUTE)); + } + + /** + * Creates an interval of the given number of minutes. + */ + public OutType minutes() { + return minute(); + } + + /** + * Creates an interval of the given number of seconds. + */ + public OutType second() { + return toApiSpecificExpression(toMilliInterval(toExpr(), MILLIS_PER_SECOND)); + } + + /** + * Creates an interval of the given number of seconds. + */ + public OutType seconds() { + return second(); + } + + /** + * Creates an interval of the given number of milliseconds. + */ + public OutType milli() { + return toApiSpecificExpression(toMilliInterval(toExpr(), 1)); + } + + /** + * Creates an interval of the given number of milliseconds. + */ + public OutType millis() { + return milli(); + } + + // Hash functions + + /** + * Returns the MD5 hash of the string argument; null if string is null. + * + * @return string of 32 hexadecimal digits or null + */ + public OutType md5() { + return toApiSpecificExpression(unresolvedCall(MD5, toExpr())); + } + + /** + * Returns the SHA-1 hash of the string argument; null if string is null. + * + * @return string of 40 hexadecimal digits or null + */ + public OutType sha1() { + return toApiSpecificExpression(unresolvedCall(SHA1, toExpr())); + } + + /** + * Returns the SHA-224 hash of the string argument; null if string is null. + * + * @return string of 56 hexadecimal digits or null + */ + public OutType sha224() { + return toApiSpecificExpression(unresolvedCall(SHA224, toExpr())); + } + + /** + * Returns the SHA-256 hash of the string argument; null if string is null. + * + * @return string of 64 hexadecimal digits or null + */ + public OutType sha256() { + return toApiSpecificExpression(unresolvedCall(SHA256, toExpr())); + } + + /** + * Returns the SHA-384 hash of the string argument; null if string is null. + * + * @return string of 96 hexadecimal digits or null + */ + public OutType sha384() { + return toApiSpecificExpression(unresolvedCall(SHA384, toExpr())); + } + + /** + * Returns the SHA-512 hash of the string argument; null if string is null. + * + * @return string of 128 hexadecimal digits or null + */ + public OutType sha512() { + return toApiSpecificExpression(unresolvedCall(SHA512, toExpr())); + } + + /** + * Returns the hash for the given string expression using the SHA-2 family of hash + * functions (SHA-224, SHA-256, SHA-384, or SHA-512). + * + * @param hashLength bit length of the result (either 224, 256, 384, or 512) + * @return string or null if one of the arguments is null. + */ + public OutType sha2(InType hashLength) { + return toApiSpecificExpression(unresolvedCall(SHA2, toExpr(), objectToExpression(hashLength))); + } +} + diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java index 2b545ab..2333ffa 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java @@ -119,9 +119,7 @@ public class TableImpl implements Table { @Override public Table select(Expression... fields) { - List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields) - .map(f -> f.accept(lookupResolver)) - .collect(Collectors.toList()); + List<Expression> expressionsWithResolvedCalls = preprocessExpressions(fields); CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties( expressionsWithResolvedCalls ); @@ -464,9 +462,7 @@ public class TableImpl implements Table { } private Table addColumnsOperation(boolean replaceIfExist, List<Expression> fields) { - List<Expression> expressionsWithResolvedCalls = fields.stream() - .map(f -> f.accept(lookupResolver)) - .collect(Collectors.toList()); + List<Expression> expressionsWithResolvedCalls = preprocessExpressions(fields); CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties( expressionsWithResolvedCalls ); @@ -563,6 +559,16 @@ public class TableImpl implements Table { return new TableImpl(tableEnvironment, operation, operationTreeBuilder, lookupResolver); } + private List<Expression> preprocessExpressions(List<Expression> expressions) { + return preprocessExpressions(expressions.toArray(new Expression[0])); + } + + private List<Expression> preprocessExpressions(Expression[] expressions) { + return Arrays.stream(expressions) + .map(f -> f.accept(lookupResolver)) + .collect(Collectors.toList()); + } + private static final class GroupedTableImpl implements GroupedTable { private final TableImpl table; @@ -582,9 +588,7 @@ public class TableImpl implements Table { @Override public Table select(Expression... fields) { - List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields) - .map(f -> f.accept(table.lookupResolver)) - .collect(Collectors.toList()); + List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields); CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties( expressionsWithResolvedCalls ); @@ -716,7 +720,8 @@ public class TableImpl implements Table { @Override public WindowGroupedTable groupBy(Expression... fields) { - List<Expression> fieldsWithoutWindow = Arrays.stream(fields) + List<Expression> fieldsWithoutWindow = table.preprocessExpressions(fields) + .stream() .filter(f -> !window.getAlias().equals(f)) .collect(Collectors.toList()); if (fields.length != fieldsWithoutWindow.size() + 1) { @@ -749,9 +754,7 @@ public class TableImpl implements Table { @Override public Table select(Expression... fields) { - List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields) - .map(f -> f.accept(table.lookupResolver)) - .collect(Collectors.toList()); + List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields); CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties( expressionsWithResolvedCalls ); @@ -816,9 +819,7 @@ public class TableImpl implements Table { @Override public Table select(Expression... fields) { - List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields) - .map(f -> f.accept(table.lookupResolver)) - .collect(Collectors.toList()); + List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields); CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties( expressionsWithResolvedCalls ); @@ -873,9 +874,7 @@ public class TableImpl implements Table { @Override public Table select(Expression... fields) { - List<Expression> expressionsWithResolvedCalls = Arrays.stream(fields) - .map(f -> f.accept(table.lookupResolver)) - .collect(Collectors.toList()); + List<Expression> expressionsWithResolvedCalls = table.preprocessExpressions(fields); CategorizedExpressions extracted = OperationExpressionsUtils.extractAggregationsAndProperties( expressionsWithResolvedCalls ); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java index ec57fd5..92f8d83 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.table.expressions; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ApiExpression; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.ValidationException; @@ -31,6 +32,7 @@ import org.apache.flink.table.types.DataType; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * Utilities for API-specific {@link Expression}s. @@ -50,6 +52,24 @@ public final class ApiExpressionUtils { // private } + public static Expression objectToExpression(Object expression) { + if (expression instanceof ApiExpression) { + return ((ApiExpression) expression).toExpr(); + } else if (expression instanceof Expression) { + return (Expression) expression; + } else { + return valueLiteral(expression); + } + } + + public static Expression unwrapFromApi(Expression expression) { + if (expression instanceof ApiExpression) { + return ((ApiExpression) expression).toExpr(); + } else { + return expression; + } + } + public static LocalReferenceExpression localRef(String name, DataType dataType) { return new LocalReferenceExpression(name, dataType); } @@ -74,14 +94,17 @@ public final class ApiExpressionUtils { FunctionIdentifier functionIdentifier, FunctionDefinition functionDefinition, Expression... args) { - return new UnresolvedCallExpression(functionIdentifier, functionDefinition, Arrays.asList(args)); + return unresolvedCall(functionIdentifier, functionDefinition, Arrays.asList(args)); } public static UnresolvedCallExpression unresolvedCall( FunctionIdentifier functionIdentifier, FunctionDefinition functionDefinition, List<Expression> args) { - return new UnresolvedCallExpression(functionIdentifier, functionDefinition, args); + return new UnresolvedCallExpression(functionIdentifier, functionDefinition, + args.stream() + .map(ApiExpressionUtils::unwrapFromApi) + .collect(Collectors.toList())); } public static UnresolvedCallExpression unresolvedCall(FunctionDefinition functionDefinition, Expression... args) { @@ -89,7 +112,11 @@ public final class ApiExpressionUtils { } public static UnresolvedCallExpression unresolvedCall(FunctionDefinition functionDefinition, List<Expression> args) { - return new UnresolvedCallExpression(functionDefinition, args); + return new UnresolvedCallExpression( + functionDefinition, + args.stream() + .map(ApiExpressionUtils::unwrapFromApi) + .collect(Collectors.toList())); } public static TableReferenceExpression tableRef(String name, Table table) { @@ -101,7 +128,11 @@ public final class ApiExpressionUtils { } public static LookupCallExpression lookupCall(String name, Expression... args) { - return new LookupCallExpression(name, Arrays.asList(args)); + return new LookupCallExpression( + name, + Arrays.stream(args) + .map(ApiExpressionUtils::unwrapFromApi) + .collect(Collectors.toList())); } public static Expression toMonthInterval(Expression e, int multiplier) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java index d30efd5..ef0663a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.util.Preconditions; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -43,7 +42,7 @@ public final class LookupCallExpression implements Expression { LookupCallExpression(String unresolvedFunction, List<Expression> args) { this.unresolvedName = Preconditions.checkNotNull(unresolvedFunction); - this.args = Collections.unmodifiableList(new ArrayList<>(Preconditions.checkNotNull(args))); + this.args = Collections.unmodifiableList(Preconditions.checkNotNull(args)); } public String getUnresolvedName() { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java index 7fdf57d..a280d0e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java @@ -26,7 +26,6 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -61,8 +60,7 @@ public final class UnresolvedCallExpression implements Expression { Preconditions.checkNotNull(functionIdentifier, "Function identifier must not be null."); this.functionDefinition = Preconditions.checkNotNull(functionDefinition, "Function definition must not be null."); - this.args = Collections.unmodifiableList( - new ArrayList<>(Preconditions.checkNotNull(args, "Arguments must not be null."))); + this.args = Collections.unmodifiableList(Preconditions.checkNotNull(args, "Arguments must not be null.")); } UnresolvedCallExpression( @@ -71,8 +69,7 @@ public final class UnresolvedCallExpression implements Expression { this.functionIdentifier = null; this.functionDefinition = Preconditions.checkNotNull(functionDefinition, "Function definition must not be null."); - this.args = Collections.unmodifiableList( - new ArrayList<>(Preconditions.checkNotNull(args, "Arguments must not be null."))); + this.args = Collections.unmodifiableList(Preconditions.checkNotNull(args, "Arguments must not be null.")); } public Optional<FunctionIdentifier> getFunctionIdentifier() { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java index 434697d..59ef852 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java @@ -77,6 +77,7 @@ public class ExpressionResolver { */ public static List<ResolverRule> getExpandingResolverRules() { return Arrays.asList( + ResolverRules.UNWRAP_API_EXPRESSION, ResolverRules.LOOKUP_CALL_BY_NAME, ResolverRules.FLATTEN_STAR_REFERENCE, ResolverRules.EXPAND_COLUMN_FUNCTIONS); @@ -87,6 +88,7 @@ public class ExpressionResolver { */ public static List<ResolverRule> getAllResolverRules() { return Arrays.asList( + ResolverRules.UNWRAP_API_EXPRESSION, ResolverRules.LOOKUP_CALL_BY_NAME, ResolverRules.FLATTEN_STAR_REFERENCE, ResolverRules.EXPAND_COLUMN_FUNCTIONS, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java index 93471ef..3071fa6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java @@ -19,6 +19,7 @@ package org.apache.flink.table.expressions.resolver; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ApiExpression; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.expressions.Expression; @@ -65,6 +66,17 @@ public class LookupCallResolver extends ApiExpressionDefaultVisitor<Expression> } @Override + public Expression visitNonApiExpression(Expression other) { + // LookupCallResolver might be called outside of ExpressionResolver, thus we need to additionally + // handle the ApiExpressions here + if (other instanceof ApiExpression) { + return ((ApiExpression) other).toExpr().accept(this); + } else { + return defaultMethod(other); + } + } + + @Override protected Expression defaultMethod(Expression expression) { return expression; } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java index 42dc0757..3acb5af 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java @@ -55,7 +55,7 @@ final class OverWindowResolverRule implements ResolverRule { .collect(Collectors.toList()); } - private class ExpressionResolverVisitor extends RuleExpressionVisitor<Expression> { + private static class ExpressionResolverVisitor extends RuleExpressionVisitor<Expression> { ExpressionResolverVisitor(ResolutionContext context) { super(context); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java index 671b1e1..aab2272 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java @@ -19,6 +19,7 @@ package org.apache.flink.table.expressions.resolver.rules; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ApiExpression; import org.apache.flink.table.expressions.UnresolvedReferenceExpression; /** @@ -62,6 +63,11 @@ public final class ResolverRules { */ public static final ResolverRule QUALIFY_BUILT_IN_FUNCTIONS = new QualifyBuiltInFunctionsRule(); + /** + * Unwraps all {@link ApiExpression}. + */ + public static final ResolverRule UNWRAP_API_EXPRESSION = new UnwrapApiExpressionRule(); + private ResolverRules() { } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java similarity index 59% copy from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java copy to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java index 5180d33..1ef374b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java @@ -16,25 +16,25 @@ * limitations under the License. */ -package org.apache.flink.table.api; +package org.apache.flink.table.expressions.resolver.rules; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ApiExpression; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; +import java.util.List; +import java.util.stream.Collectors; + /** - * Tumbling window on time with alias. Fully specifies a window. + * Unwraps all {@link ApiExpression}. */ -@PublicEvolving -public final class TumbleWithSizeOnTimeWithAlias extends GroupWindow { - - private final Expression size; - - TumbleWithSizeOnTimeWithAlias(Expression alias, Expression timeField, Expression size) { - super(alias, timeField); - this.size = size; - } - - public Expression getSize() { - return size; +@Internal +final class UnwrapApiExpressionRule implements ResolverRule { + @Override + public List<Expression> apply( + List<Expression> expression, + ResolutionContext context) { + return expression.stream().map(ApiExpressionUtils::unwrapFromApi).collect(Collectors.toList()); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java index 4877bc4..3a3b0a0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java @@ -28,6 +28,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionUtils; import org.apache.flink.table.expressions.UnresolvedCallExpression; @@ -154,7 +155,7 @@ public class FieldInfoUtils { * used if the input type has a defined field order (tuple, case class, Row) and no of fields * references a field of the input type. */ - public static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) { + private static boolean isReferenceByPosition(CompositeType<?> ct, Expression[] fields) { if (!(ct instanceof TupleTypeInfoBase)) { return false; } @@ -224,7 +225,10 @@ public class FieldInfoUtils { public static <A> TypeInfoSchema getFieldsInfo(TypeInformation<A> inputType, Expression[] expressions) { validateInputTypeInfo(inputType); - final List<FieldInfo> fieldInfos = extractFieldInformation(inputType, expressions); + final List<FieldInfo> fieldInfos = extractFieldInformation( + inputType, + Arrays.stream(expressions).map(ApiExpressionUtils::unwrapFromApi).toArray(Expression[]::new) + ); validateNoStarReference(fieldInfos); boolean isRowtimeAttribute = checkIfRowtimeAttribute(fieldInfos); @@ -244,8 +248,8 @@ public class FieldInfoUtils { } private static <A> List<FieldInfo> extractFieldInformation( - TypeInformation<A> inputType, - Expression[] exprs) { + TypeInformation<A> inputType, + Expression[] exprs) { final List<FieldInfo> fieldInfos; if (inputType instanceof GenericTypeInfo && inputType.getTypeClass() == Row.class) { throw new ValidationException( @@ -367,7 +371,7 @@ public class FieldInfoUtils { final TypeInformation<?>[] fieldTypes; if (inputType instanceof CompositeType) { int arity = inputType.getArity(); - CompositeType ct = (CompositeType<?>) inputType; + CompositeType<?> ct = (CompositeType<?>) inputType; fieldTypes = IntStream.range(0, arity).mapToObj(ct::getTypeAt).toArray(TypeInformation[]::new); } else { fieldTypes = new TypeInformation[]{inputType}; diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala index cdf4042..cb71dc7 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala @@ -17,105 +17,82 @@ */ package org.apache.flink.table.api -import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort} -import java.math.{BigDecimal => JBigDecimal} -import java.sql.{Date, Time, Timestamp} -import java.time.{LocalDate, LocalDateTime, LocalTime} - import org.apache.flink.annotation.PublicEvolving -import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.internal.BaseExpressions +import org.apache.flink.table.expressions.ApiExpressionUtils._ import org.apache.flink.table.expressions._ -import ApiExpressionUtils._ import org.apache.flink.table.functions.BuiltInFunctionDefinitions._ import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedAggregateFunction, UserDefinedFunctionHelper, _} import org.apache.flink.table.types.DataType -import org.apache.flink.table.types.utils.TypeConversions -import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType + +import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort} +import java.math.{BigDecimal => JBigDecimal} +import java.sql.{Date, Time, Timestamp} +import java.time.{LocalDate, LocalDateTime, LocalTime} import _root_.scala.language.implicitConversions /** * These are all the operations that can be used to construct an [[Expression]] AST for * expression operations. - * - * These operations must be kept in sync with the parser in - * [[org.apache.flink.table.expressions.ExpressionParser]]. */ @PublicEvolving -trait ImplicitExpressionOperations { +trait ImplicitExpressionOperations extends BaseExpressions[Expression, Expression] { private[flink] def expr: Expression + override def toExpr: Expression = expr + + override protected def toApiSpecificExpression(expression: Expression): Expression = expression + /** - * Enables literals on left side of binary expressions. - * - * e.g. 12.toExpr % 'a - * - * @return expression - */ - def toExpr: Expression = expr + * Specifies a name for an expression i.e. a field. + * + * @param name name for one field + * @param extraNames additional names if the expression expands to multiple fields + * @return field with an alias + */ + def as(name: Symbol, extraNames: Symbol*): Expression = as(name.name, extraNames.map(_.name): _*) /** * Boolean AND in three-valued logic. */ - def && (other: Expression): Expression = unresolvedCall(AND, expr, other) + def && (other: Expression): Expression = and(other) /** * Boolean OR in three-valued logic. */ - def || (other: Expression): Expression = unresolvedCall(OR, expr, other) + def || (other: Expression): Expression = or(other) /** * Greater than. */ - def > (other: Expression): Expression = unresolvedCall(GREATER_THAN, expr, other) + def > (other: Expression): Expression = isGreater(other) /** * Greater than or equal. */ - def >= (other: Expression): Expression = unresolvedCall(GREATER_THAN_OR_EQUAL, expr, other) + def >= (other: Expression): Expression = isGreaterOrEqual(other) /** * Less than. */ - def < (other: Expression): Expression = unresolvedCall(LESS_THAN, expr, other) + def < (other: Expression): Expression = isLess(other) /** * Less than or equal. */ - def <= (other: Expression): Expression = unresolvedCall(LESS_THAN_OR_EQUAL, expr, other) + def <= (other: Expression): Expression = isLessOrEqual(other) /** * Equals. */ - def === (other: Expression): Expression = unresolvedCall(EQUALS, expr, other) + def === (other: Expression): Expression = isEqual(other) /** * Not equal. */ - def !== (other: Expression): Expression = unresolvedCall(NOT_EQUALS, expr, other) - - /** - * Returns true if the given expression is between lowerBound and upperBound (both inclusive). - * False otherwise. The parameters must be numeric types or identical comparable types. - * - * @param lowerBound numeric or comparable expression - * @param upperBound numeric or comparable expression - * @return boolean or null - */ - def between(lowerBound: Expression, upperBound: Expression): Expression = - unresolvedCall(BETWEEN, expr, lowerBound, upperBound) - - /** - * Returns true if the given expression is not between lowerBound and upperBound (both - * inclusive). False otherwise. The parameters must be numeric types or identical - * comparable types. - * - * @param lowerBound numeric or comparable expression - * @param upperBound numeric or comparable expression - * @return boolean or null - */ - def notBetween(lowerBound: Expression, upperBound: Expression): Expression = - unresolvedCall(NOT_BETWEEN, expr, lowerBound, upperBound) + def !== (other: Expression): Expression = isNotEqual(other) /** * Whether boolean expression is not true; returns null if boolean is null. @@ -125,7 +102,7 @@ trait ImplicitExpressionOperations { /** * Returns negative numeric. */ - def unary_- : Expression = unresolvedCall(MINUS_PREFIX, expr) + def unary_- : Expression = Expressions.negative(expr) /** * Returns numeric. @@ -133,54 +110,24 @@ trait ImplicitExpressionOperations { def unary_+ : Expression = expr /** - * Returns true if the given expression is null. - */ - def isNull: Expression = unresolvedCall(IS_NULL, expr) - - /** - * Returns true if the given expression is not null. - */ - def isNotNull: Expression = unresolvedCall(IS_NOT_NULL, expr) - - /** - * Returns true if given boolean expression is true. False otherwise (for null and false). - */ - def isTrue: Expression = unresolvedCall(IS_TRUE, expr) - - /** - * Returns true if given boolean expression is false. False otherwise (for null and true). - */ - def isFalse: Expression = unresolvedCall(IS_FALSE, expr) - - /** - * Returns true if given boolean expression is not true (for null and false). False otherwise. - */ - def isNotTrue: Expression = unresolvedCall(IS_NOT_TRUE, expr) - - /** - * Returns true if given boolean expression is not false (for null and true). False otherwise. - */ - def isNotFalse: Expression = unresolvedCall(IS_NOT_FALSE, expr) - - /** * Returns left plus right. */ - def + (other: Expression): Expression = unresolvedCall(PLUS, expr, other) + def + (other: Expression): Expression = plus(other) /** * Returns left minus right. */ - def - (other: Expression): Expression = unresolvedCall(MINUS, expr, other) + def - (other: Expression): Expression = minus(other) /** * Returns left divided by right. */ - def / (other: Expression): Expression = unresolvedCall(DIVIDE, expr, other) + def / (other: Expression): Expression = dividedBy(other) /** * Returns left multiplied by right. */ - def * (other: Expression): Expression = unresolvedCall(TIMES, expr, other) + def * (other: Expression): Expression = times(other) /** * Returns the remainder (modulus) of left divided by right. @@ -194,352 +141,23 @@ trait ImplicitExpressionOperations { * * e.g. withColumns(1 to 3) */ - def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, other) - - /** - * Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT a), declares that an - * aggregation function is only applied on distinct input values. - * - * For example: - * - * {{{ - * orders - * .groupBy('a) - * .select('a, 'b.sum.distinct as 'd) - * }}} - */ - def distinct: Expression = unresolvedCall(DISTINCT, expr) - - /** - * Returns the sum of the numeric field across all input values. - * If all values are null, null is returned. - */ - def sum: Expression = unresolvedCall(SUM, expr) - - /** - * Returns the sum of the numeric field across all input values. - * If all values are null, 0 is returned. - */ - def sum0: Expression = unresolvedCall(SUM0, expr) - - /** - * Returns the minimum value of field across all input values. - */ - def min: Expression = unresolvedCall(MIN, expr) - - /** - * Returns the maximum value of field across all input values. - */ - def max: Expression = unresolvedCall(MAX, expr) - - /** - * Returns the number of input rows for which the field is not null. - */ - def count: Expression = unresolvedCall(COUNT, expr) - - /** - * Returns the average (arithmetic mean) of the numeric field across all input values. - */ - def avg: Expression = unresolvedCall(AVG, expr) - - /** - * Returns the population standard deviation of an expression (the square root of varPop()). - */ - def stddevPop: Expression = unresolvedCall(STDDEV_POP, expr) - - /** - * Returns the sample standard deviation of an expression (the square root of varSamp()). - */ - def stddevSamp: Expression = unresolvedCall(STDDEV_SAMP, expr) - - /** - * Returns the population standard variance of an expression. - */ - def varPop: Expression = unresolvedCall(VAR_POP, expr) - - /** - * Returns the sample variance of a given expression. - */ - def varSamp: Expression = unresolvedCall(VAR_SAMP, expr) - - /** - * Returns multiset aggregate of a given expression. - */ - def collect: Expression = unresolvedCall(COLLECT, expr) - - /** - * Converts a value to a given data type. - * - * e.g. "42".cast(DataTypes.INT()) leads to 42. - * - * @return casted expression - */ - def cast(toType: DataType): Expression = - unresolvedCall(CAST, expr, typeLiteral(toType)) - - /** - * @deprecated This method will be removed in future versions as it uses the old type system. It - * is recommended to use [[cast(DataType)]] instead which uses the new type system - * based on [[DataTypes]]. Please make sure to use either the old or the new type - * system consistently to avoid unintended behavior. See the website documentation - * for more information. - */ - @deprecated - def cast(toType: TypeInformation[_]): Expression = - unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(toType))) - - /** - * Specifies a name for an expression i.e. a field. - * - * @param name name for one field - * @param extraNames additional names if the expression expands to multiple fields - * @return field with an alias - */ - def as(name: Symbol, extraNames: Symbol*): Expression = - unresolvedCall( - AS, - expr +: valueLiteral(name.name) +: extraNames.map(name => valueLiteral(name.name)): _*) - - /** - * Specifies ascending order of an expression i.e. a field for orderBy call. - * - * @return ascend expression - */ - def asc: Expression = unresolvedCall(ORDER_ASC, expr) - - /** - * Specifies descending order of an expression i.e. a field for orderBy call. - * - * @return descend expression - */ - def desc: Expression = unresolvedCall(ORDER_DESC, expr) - - /** - * Returns true if an expression exists in a given list of expressions. This is a shorthand - * for multiple OR conditions. - * - * If the testing set contains null, the result will be null if the element can not be found - * and true if it can be found. If the element is null, the result is always null. - * - * e.g. "42".in(1, 2, 3) leads to false. - */ - def in(elements: Expression*): Expression = unresolvedCall(IN, expr +: elements: _*) - - /** - * Returns true if an expression exists in a given table sub-query. The sub-query table - * must consist of one column. This column must have the same data type as the expression. - * - * Note: This operation is not supported in a streaming environment yet. - */ - def in(table: Table): Expression = unresolvedCall(IN, expr, tableRef(table.toString, table)) - - /** - * Returns the start time (inclusive) of a window when applied on a window reference. - */ - def start: Expression = unresolvedCall(WINDOW_START, expr) - - /** - * Returns the end time (exclusive) of a window when applied on a window reference. - * - * e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000. - */ - def end: Expression = unresolvedCall(WINDOW_END, expr) + def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, objectToExpression(other)) /** * Ternary conditional operator that decides which of two other expressions should be * based on a evaluated boolean condition. * - * e.g. (42 > 5).?("A", "B") leads to "A" + * e.g. ($"f0" > 5).?("A", "B") leads to "A" * * @param ifTrue expression to be evaluated if condition holds * @param ifFalse expression to be evaluated if condition does not hold */ def ?(ifTrue: Expression, ifFalse: Expression): Expression = - unresolvedCall(IF, expr, ifTrue, ifFalse) + Expressions.ifThenElse(expr, ifTrue, ifFalse).toExpr // scalar functions /** - * Calculates the remainder of division the given number by another one. - */ - def mod(other: Expression): Expression = unresolvedCall(MOD, expr, other) - - /** - * Calculates the Euler's number raised to the given power. - */ - def exp(): Expression = unresolvedCall(EXP, expr) - - /** - * Calculates the base 10 logarithm of the given value. - */ - def log10(): Expression = unresolvedCall(LOG10, expr) - - /** - * Calculates the base 2 logarithm of the given value. - */ - def log2(): Expression = unresolvedCall(LOG2, expr) - - /** - * Calculates the natural logarithm of the given value. - */ - def ln(): Expression = unresolvedCall(LN, expr) - - /** - * Calculates the natural logarithm of the given value. - */ - def log(): Expression = unresolvedCall(LOG, expr) - - /** - * Calculates the logarithm of the given value to the given base. - */ - def log(base: Expression): Expression = unresolvedCall(LOG, base, expr) - - /** - * Calculates the given number raised to the power of the other value. - */ - def power(other: Expression): Expression = unresolvedCall(POWER, expr, other) - - /** - * Calculates the hyperbolic cosine of a given value. - */ - def cosh(): Expression = unresolvedCall(COSH, expr) - - /** - * Calculates the square root of a given value. - */ - def sqrt(): Expression = unresolvedCall(SQRT, expr) - - /** - * Calculates the absolute value of given value. - */ - def abs(): Expression = unresolvedCall(ABS, expr) - - /** - * Calculates the largest integer less than or equal to a given number. - */ - def floor(): Expression = unresolvedCall(FLOOR, expr) - - /** - * Calculates the hyperbolic sine of a given value. - */ - def sinh(): Expression = unresolvedCall(SINH, expr) - - /** - * Calculates the smallest integer greater than or equal to a given number. - */ - def ceil(): Expression = unresolvedCall(CEIL, expr) - - /** - * Calculates the sine of a given number. - */ - def sin(): Expression = unresolvedCall(SIN, expr) - - /** - * Calculates the cosine of a given number. - */ - def cos(): Expression = unresolvedCall(COS, expr) - - /** - * Calculates the tangent of a given number. - */ - def tan(): Expression = unresolvedCall(TAN, expr) - - /** - * Calculates the cotangent of a given number. - */ - def cot(): Expression = unresolvedCall(COT, expr) - - /** - * Calculates the arc sine of a given number. - */ - def asin(): Expression = unresolvedCall(ASIN, expr) - - /** - * Calculates the arc cosine of a given number. - */ - def acos(): Expression = unresolvedCall(ACOS, expr) - - /** - * Calculates the arc tangent of a given number. - */ - def atan(): Expression = unresolvedCall(ATAN, expr) - - /** - * Calculates the hyperbolic tangent of a given number. - */ - def tanh(): Expression = unresolvedCall(TANH, expr) - - /** - * Converts numeric from radians to degrees. - */ - def degrees(): Expression = unresolvedCall(DEGREES, expr) - - /** - * Converts numeric from degrees to radians. - */ - def radians(): Expression = unresolvedCall(RADIANS, expr) - - /** - * Calculates the signum of a given number. - */ - def sign(): Expression = unresolvedCall(SIGN, expr) - - /** - * Rounds the given number to integer places right to the decimal point. - */ - def round(places: Expression): Expression = unresolvedCall(ROUND, expr, places) - - /** - * Returns a string representation of an integer numeric value in binary format. Returns null if - * numeric is null. E.g. "4" leads to "100", "12" leads to "1100". - */ - def bin(): Expression = unresolvedCall(BIN, expr) - - /** - * Returns a string representation of an integer numeric value or a string in hex format. Returns - * null if numeric or string is null. - * - * E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world" leads - * to "68656c6c6f2c776f726c64". - */ - def hex(): Expression = unresolvedCall(HEX, expr) - - /** - * Returns a number of truncated to n decimal places. - * If n is 0,the result has no decimal point or fractional part. - * n can be negative to cause n digits left of the decimal point of the value to become zero. - * E.g. truncate(42.345, 2) to 42.34. - */ - def truncate(n: Expression): Expression = unresolvedCall(TRUNCATE, expr, n) - - /** - * Returns a number of truncated to 0 decimal places. - * E.g. truncate(42.345) to 42.0. - */ - def truncate(): Expression = unresolvedCall(TRUNCATE, expr) - - // String operations - - /** - * Creates a substring of the given string at given index for a given length. - * - * @param beginIndex first character of the substring (starting at 1, inclusive) - * @param length number of characters of the substring - * @return substring - */ - def substring(beginIndex: Expression, length: Expression): Expression = - unresolvedCall(SUBSTRING, expr, beginIndex, length) - - /** - * Creates a substring of the given string beginning at the given index to the end. - * - * @param beginIndex first character of the substring (starting at 1, inclusive) - * @return substring - */ - def substring(beginIndex: Expression): Expression = - unresolvedCall(SUBSTRING, expr, beginIndex) - - /** * Removes leading and/or trailing characters from the given string. * * @param removeLeading if true, remove leading characters (default: true) @@ -552,324 +170,14 @@ trait ImplicitExpressionOperations { removeTrailing: Boolean = true, character: Expression = valueLiteral(" ")) : Expression = { - unresolvedCall(TRIM, valueLiteral(removeLeading), valueLiteral(removeTrailing), character, expr) + unresolvedCall( + TRIM, + valueLiteral(removeLeading), + valueLiteral(removeTrailing), + ApiExpressionUtils.objectToExpression(character), + expr) } - /** - * Returns a new string which replaces all the occurrences of the search target - * with the replacement string (non-overlapping). - */ - def replace(search: Expression, replacement: Expression): Expression = - unresolvedCall(REPLACE, expr, search, replacement) - - /** - * Returns the length of a string. - */ - def charLength(): Expression = unresolvedCall(CHAR_LENGTH, expr) - - /** - * Returns all of the characters in a string in upper case using the rules of - * the default locale. - */ - def upperCase(): Expression = unresolvedCall(UPPER, expr) - - /** - * Returns all of the characters in a string in lower case using the rules of - * the default locale. - */ - def lowerCase(): Expression = unresolvedCall(LOWER, expr) - - /** - * Converts the initial letter of each word in a string to uppercase. - * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace. - */ - def initCap(): Expression = unresolvedCall(INIT_CAP, expr) - - /** - * Returns true, if a string matches the specified LIKE pattern. - * - * e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n" - */ - def like(pattern: Expression): Expression = unresolvedCall(LIKE, expr, pattern) - - /** - * Returns true, if a string matches the specified SQL regex pattern. - * - * e.g. "A+" matches all strings that consist of at least one A - */ - def similar(pattern: Expression): Expression = unresolvedCall(SIMILAR, expr, pattern) - - /** - * Returns the position of string in an other string starting at 1. - * Returns 0 if string could not be found. - * - * e.g. "a".position("bbbbba") leads to 6 - */ - def position(haystack: Expression): Expression = unresolvedCall(POSITION, expr, haystack) - - /** - * Returns a string left-padded with the given pad string to a length of len characters. If - * the string is longer than len, the return value is shortened to len characters. - * - * e.g. "hi".lpad(4, '??') returns "??hi", "hi".lpad(1, '??') returns "h" - */ - def lpad(len: Expression, pad: Expression): Expression = unresolvedCall(LPAD, expr, len, pad) - - /** - * Returns a string right-padded with the given pad string to a length of len characters. If - * the string is longer than len, the return value is shortened to len characters. - * - * e.g. "hi".rpad(4, '??') returns "hi??", "hi".rpad(1, '??') returns "h" - */ - def rpad(len: Expression, pad: Expression): Expression = unresolvedCall(RPAD, expr, len, pad) - - /** - * Defines an aggregation to be used for a previously specified over window. - * - * For example: - * - * {{{ - * table - * .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) - * .select('c, 'a, 'a.count over 'w, 'a.sum over 'w) - * }}} - */ - def over(alias: Expression): Expression = unresolvedCall(OVER, expr, alias) - - /** - * Replaces a substring of string with a string starting at a position (starting at 1). - * - * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx" - */ - def overlay(newString: Expression, starting: Expression): Expression = - unresolvedCall(OVERLAY, expr, newString, starting) - - /** - * Replaces a substring of string with a string starting at a position (starting at 1). - * The length specifies how many characters should be removed. - * - * e.g. "xxxxxtest".overlay("xxxx", 6, 2) leads to "xxxxxxxxxst" - */ - def overlay(newString: Expression, starting: Expression, length: Expression): Expression = - unresolvedCall(OVERLAY, expr, newString, starting, length) - - /** - * Returns a string with all substrings that match the regular expression consecutively - * being replaced. - */ - def regexpReplace(regex: Expression, replacement: Expression): Expression = - unresolvedCall(REGEXP_REPLACE, expr, regex, replacement) - - /** - * Returns a string extracted with a specified regular expression and a regex match group - * index. - */ - def regexpExtract(regex: Expression, extractIndex: Expression): Expression = - unresolvedCall(REGEXP_EXTRACT, expr, regex, extractIndex) - - /** - * Returns a string extracted with a specified regular expression. - */ - def regexpExtract(regex: Expression): Expression = - unresolvedCall(REGEXP_EXTRACT, expr, regex) - - /** - * Returns the base string decoded with base64. - */ - def fromBase64(): Expression = unresolvedCall(FROM_BASE64, expr) - - /** - * Returns the base64-encoded result of the input string. - */ - def toBase64(): Expression = unresolvedCall(TO_BASE64, expr) - - /** - * Returns a string that removes the left whitespaces from the given string. - */ - def ltrim(): Expression = unresolvedCall(LTRIM, expr) - - /** - * Returns a string that removes the right whitespaces from the given string. - */ - def rtrim(): Expression = unresolvedCall(RTRIM, expr) - - /** - * Returns a string that repeats the base string n times. - */ - def repeat(n: Expression): Expression = unresolvedCall(REPEAT, expr, n) - - // Temporal operations - - /** - * Parses a date string in the form "yyyy-MM-dd" to a SQL Date. - */ - def toDate: Expression = - unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE))) - - /** - * Parses a time string in the form "HH:mm:ss" to a SQL Time. - */ - def toTime: Expression = - unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME))) - - /** - * Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp. - */ - def toTimestamp: Expression = - unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP))) - - /** - * Extracts parts of a time point or time interval. Returns the part as a long value. - * - * e.g. "2006-06-05".toDate.extract(DAY) leads to 5 - */ - def extract(timeIntervalUnit: TimeIntervalUnit): Expression = - unresolvedCall(EXTRACT, valueLiteral(timeIntervalUnit), expr) - - /** - * Rounds down a time point to the given unit. - * - * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00 - */ - def floor(timeIntervalUnit: TimeIntervalUnit): Expression = - unresolvedCall(FLOOR, valueLiteral(timeIntervalUnit), expr) - - /** - * Rounds up a time point to the given unit. - * - * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00 - */ - def ceil(timeIntervalUnit: TimeIntervalUnit): Expression = - unresolvedCall(CEIL, valueLiteral(timeIntervalUnit), expr) - - // Interval types - - /** - * Creates an interval of the given number of years. - * - * @return interval of months - */ - def year: Expression = toMonthInterval(expr, 12) - - /** - * Creates an interval of the given number of years. - * - * @return interval of months - */ - def years: Expression = year - - /** - * Creates an interval of the given number of quarters. - * - * @return interval of months - */ - def quarter: Expression = toMonthInterval(expr, 3) - - /** - * Creates an interval of the given number of quarters. - * - * @return interval of months - */ - def quarters: Expression = quarter - - /** - * Creates an interval of the given number of months. - * - * @return interval of months - */ - def month: Expression = toMonthInterval(expr, 1) - - /** - * Creates an interval of the given number of months. - * - * @return interval of months - */ - def months: Expression = month - - /** - * Creates an interval of the given number of weeks. - * - * @return interval of milliseconds - */ - def week: Expression = toMilliInterval(expr, 7 * MILLIS_PER_DAY) - - /** - * Creates an interval of the given number of weeks. - * - * @return interval of milliseconds - */ - def weeks: Expression = week - - /** - * Creates an interval of the given number of days. - * - * @return interval of milliseconds - */ - def day: Expression = toMilliInterval(expr, MILLIS_PER_DAY) - - /** - * Creates an interval of the given number of days. - * - * @return interval of milliseconds - */ - def days: Expression = day - - /** - * Creates an interval of the given number of hours. - * - * @return interval of milliseconds - */ - def hour: Expression = toMilliInterval(expr, MILLIS_PER_HOUR) - - /** - * Creates an interval of the given number of hours. - * - * @return interval of milliseconds - */ - def hours: Expression = hour - - /** - * Creates an interval of the given number of minutes. - * - * @return interval of milliseconds - */ - def minute: Expression = toMilliInterval(expr, MILLIS_PER_MINUTE) - - /** - * Creates an interval of the given number of minutes. - * - * @return interval of milliseconds - */ - def minutes: Expression = minute - - /** - * Creates an interval of the given number of seconds. - * - * @return interval of milliseconds - */ - def second: Expression = toMilliInterval(expr, MILLIS_PER_SECOND) - - /** - * Creates an interval of the given number of seconds. - * - * @return interval of milliseconds - */ - def seconds: Expression = second - - /** - * Creates an interval of the given number of milliseconds. - * - * @return interval of milliseconds - */ - def milli: Expression = toMilliInterval(expr, 1) - - /** - * Creates an interval of the given number of milliseconds. - * - * @return interval of milliseconds - */ - def millis: Expression = milli - // Row interval type /** @@ -879,121 +187,6 @@ trait ImplicitExpressionOperations { */ def rows: Expression = toRowInterval(expr) - // Advanced type helper functions - - /** - * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and - * returns it's value. - * - * @param name name of the field (similar to Flink's field expressions) - * @return value of the field - */ - def get(name: String): Expression = unresolvedCall(GET, expr, valueLiteral(name)) - - /** - * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and - * returns it's value. - * - * @param index position of the field - * @return value of the field - */ - def get(index: Int): Expression = unresolvedCall(GET, expr, valueLiteral(index)) - - /** - * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes - * into a flat representation where every subtype is a separate field. - */ - def flatten(): Expression = unresolvedCall(FLATTEN, expr) - - /** - * Accesses the element of an array or map based on a key or an index (starting at 1). - * - * @param index key or position of the element (array index starting at 1) - * @return value of the element - */ - def at(index: Expression): Expression = unresolvedCall(AT, expr, index) - - /** - * Returns the number of elements of an array or number of entries of a map. - * - * @return number of elements or entries - */ - def cardinality(): Expression = unresolvedCall(CARDINALITY, expr) - - /** - * Returns the sole element of an array with a single element. Returns null if the array is - * empty. Throws an exception if the array has more than one element. - * - * @return the first and only element of an array with a single element - */ - def element(): Expression = unresolvedCall(ARRAY_ELEMENT, expr) - - // Time definition - - /** - * Declares a field as the rowtime attribute for indicating, accessing, and working in - * Flink's event time. - */ - def rowtime: Expression = unresolvedCall(ROWTIME, expr) - - /** - * Declares a field as the proctime attribute for indicating, accessing, and working in - * Flink's processing time. - */ - def proctime: Expression = unresolvedCall(PROCTIME, expr) - - // Hash functions - - /** - * Returns the MD5 hash of the string argument; null if string is null. - * - * @return string of 32 hexadecimal digits or null - */ - def md5(): Expression = unresolvedCall(MD5, expr) - - /** - * Returns the SHA-1 hash of the string argument; null if string is null. - * - * @return string of 40 hexadecimal digits or null - */ - def sha1(): Expression = unresolvedCall(SHA1, expr) - - /** - * Returns the SHA-224 hash of the string argument; null if string is null. - * - * @return string of 56 hexadecimal digits or null - */ - def sha224(): Expression = unresolvedCall(SHA224, expr) - - /** - * Returns the SHA-256 hash of the string argument; null if string is null. - * - * @return string of 64 hexadecimal digits or null - */ - def sha256(): Expression = unresolvedCall(SHA256, expr) - - /** - * Returns the SHA-384 hash of the string argument; null if string is null. - * - * @return string of 96 hexadecimal digits or null - */ - def sha384(): Expression = unresolvedCall(SHA384, expr) - - /** - * Returns the SHA-512 hash of the string argument; null if string is null. - * - * @return string of 128 hexadecimal digits or null - */ - def sha512(): Expression = unresolvedCall(SHA512, expr) - - /** - * Returns the hash for the given string expression using the SHA-2 family of hash - * functions (SHA-224, SHA-256, SHA-384, or SHA-512). - * - * @param hashLength bit length of the result (either 224, 256, 384, or 512) - * @return string or null if one of the arguments is null. - */ - def sha2(hashLength: Expression): Expression = unresolvedCall(SHA2, expr, hashLength) } /** @@ -1109,7 +302,9 @@ trait ImplicitExpressionConversions { * Calls a scalar function for the given parameters. */ def apply(params: Expression*): Expression = { - unresolvedCall(new ScalarFunctionDefinition(s.getClass.getName, s), params:_*) + unresolvedCall( + new ScalarFunctionDefinition(s.getClass.getName, s), + params.map(ApiExpressionUtils.objectToExpression): _*) } } @@ -1121,7 +316,9 @@ trait ImplicitExpressionConversions { def apply(params: Expression*): Expression = { val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper .getReturnTypeOfTableFunction(t, implicitly[TypeInformation[T]]) - unresolvedCall(new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo), params: _*) + unresolvedCall( + new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo), + params.map(ApiExpressionUtils.objectToExpression): _*) } } @@ -1149,7 +346,9 @@ trait ImplicitExpressionConversions { * Calls an aggregate function for the given parameters. */ def apply(params: Expression*): Expression = { - unresolvedCall(createFunctionDefinition(), params: _*) + unresolvedCall( + createFunctionDefinition(), + params.map(ApiExpressionUtils.objectToExpression): _*) } /** @@ -1160,6 +359,25 @@ trait ImplicitExpressionConversions { } } + + /** + * Extends Scala's StringContext with a method for creating an unresolved reference via + * string interpolation. + */ + implicit class FieldExpression(val sc: StringContext) { + + /** + * Creates an unresolved reference to a table's field. + * + * Example: + * ``` + * tab.select($"key", $"value") + * ``` + * </pre> + */ + def $(args: Any*): Expression = unresolvedRef(sc.s(args: _*)) + } + implicit def tableSymbolToExpression(sym: TableSymbol): Expression = valueLiteral(sym) @@ -1169,7 +387,7 @@ trait ImplicitExpressionConversions { implicit def scalaRange2RangeExpression(range: Range.Inclusive): Expression = { val startExpression = valueLiteral(range.start) val endExpression = valueLiteral(range.end) - startExpression to endExpression + unresolvedCall(RANGE_TO, startExpression, endExpression) } implicit def byte2Literal(b: Byte): Expression = valueLiteral(b) @@ -1284,47 +502,77 @@ trait ImplicitExpressionConversions { * @see TableEnvironment#createTemporaryFunction * @see TableEnvironment#createTemporarySystemFunction */ - def call(path: String, params: Expression*): Expression = { - lookupCall(path, params: _*) - } + def call(path: String, params: Expression*): Expression = Expressions.call(path, params: _*) // ---------------------------------------------------------------------------------------------- // Implicit expressions in prefix notation // ---------------------------------------------------------------------------------------------- /** + * Creates a SQL literal. + * + * The data type is derived from the object's class and its value. + * + * For example: + * + * - `lit(12)`` leads to `INT` + * - `lit("abc")`` leads to `CHAR(3)` + * - `lit(new java.math.BigDecimal("123.45"))` leads to `DECIMAL(5, 2)` + * + * See [[org.apache.flink.table.types.utils.ValueDataTypeConverter]] for a list of supported + * literal values. + */ + def lit(v: Any): Expression = Expressions.lit(v) + + /** + * Creates a SQL literal of a given [[DataType]]. + * + * The method [[lit(Object)]] is preferred as it extracts the [[DataType]] + * automatically. The class of `v` must be supported according to the + * [[org.apache.flink.table.types.logical.LogicalType#supportsInputConversion(Class)]]. + */ + def lit(v: Any, dataType: DataType): Expression = Expressions.lit(v, dataType) + + /** + * Returns negative numeric. + */ + def negative(v: Expression): Expression = { + Expressions.negative(v) + } + + /** * Returns the current SQL date in UTC time zone. */ def currentDate(): Expression = { - unresolvedCall(CURRENT_DATE) + Expressions.currentDate() } /** * Returns the current SQL time in UTC time zone. */ def currentTime(): Expression = { - unresolvedCall(CURRENT_TIME) + Expressions.currentTime() } /** * Returns the current SQL timestamp in UTC time zone. */ def currentTimestamp(): Expression = { - unresolvedCall(CURRENT_TIMESTAMP) + Expressions.currentTimestamp() } /** * Returns the current SQL time in local time zone. */ def localTime(): Expression = { - unresolvedCall(LOCAL_TIME) + Expressions.localTime() } /** * Returns the current SQL timestamp in local time zone. */ def localTimestamp(): Expression = { - unresolvedCall(LOCAL_TIMESTAMP) + Expressions.localTimestamp() } /** @@ -1342,7 +590,7 @@ trait ImplicitExpressionConversions { rightTimePoint: Expression, rightTemporal: Expression) : Expression = { - unresolvedCall(TEMPORAL_OVERLAPS, leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) + Expressions.temporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } /** @@ -1360,7 +608,7 @@ trait ImplicitExpressionConversions { timestamp: Expression, format: Expression) : Expression = { - unresolvedCall(DATE_FORMAT, timestamp, format) + Expressions.dateFormat(timestamp, format) } /** @@ -1379,49 +627,49 @@ trait ImplicitExpressionConversions { timePoint1: Expression, timePoint2: Expression) : Expression = { - unresolvedCall(TIMESTAMP_DIFF, timePointUnit, timePoint1, timePoint2) + Expressions.timestampDiff(timePointUnit, timePoint1, timePoint2) } /** * Creates an array of literals. */ def array(head: Expression, tail: Expression*): Expression = { - unresolvedCall(ARRAY, head +: tail: _*) + Expressions.array(head, tail: _*) } /** * Creates a row of expressions. */ def row(head: Expression, tail: Expression*): Expression = { - unresolvedCall(ROW, head +: tail: _*) + Expressions.row(head, tail: _*) } /** * Creates a map of expressions. */ def map(key: Expression, value: Expression, tail: Expression*): Expression = { - unresolvedCall(MAP, key +: value +: tail: _*) + Expressions.map(key, value, tail: _*) } /** * Returns a value that is closer than any other value to pi. */ def pi(): Expression = { - unresolvedCall(PI) + Expressions.pi() } /** * Returns a value that is closer than any other value to e. */ def e(): Expression = { - unresolvedCall(E) + Expressions.e() } /** * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive). */ def rand(): Expression = { - unresolvedCall(RAND) + Expressions.rand() } /** @@ -1430,7 +678,7 @@ trait ImplicitExpressionConversions { * have same initial seed. */ def rand(seed: Expression): Expression = { - unresolvedCall(RAND, seed) + Expressions.rand(seed) } /** @@ -1438,7 +686,7 @@ trait ImplicitExpressionConversions { * value (exclusive). */ def randInteger(bound: Expression): Expression = { - unresolvedCall(RAND_INTEGER, bound) + Expressions.randInteger(bound) } /** @@ -1447,7 +695,7 @@ trait ImplicitExpressionConversions { * of numbers if they have same initial seed and same bound. */ def randInteger(seed: Expression, bound: Expression): Expression = { - unresolvedCall(RAND_INTEGER, seed, bound) + Expressions.randInteger(seed, bound) } /** @@ -1455,25 +703,38 @@ trait ImplicitExpressionConversions { * Returns NULL if any argument is NULL. */ def concat(string: Expression, strings: Expression*): Expression = { - unresolvedCall(CONCAT, string +: strings: _*) + Expressions.concat(string, strings: _*) } /** * Calculates the arc tangent of a given coordinate. */ def atan2(y: Expression, x: Expression): Expression = { - unresolvedCall(ATAN2, y, x) + Expressions.atan2(y, x) } /** * Returns the string that results from concatenating the arguments and separator. * Returns NULL If the separator is NULL. * - * Note: this user-defined function does not skip empty strings. However, it does skip any NULL + * Note: This function does not skip empty strings. However, it does skip any NULL * values after the separator argument. + * @deprecated use [[ImplicitExpressionConversions.concatWs()]] **/ + @deprecated def concat_ws(separator: Expression, string: Expression, strings: Expression*): Expression = { - unresolvedCall(CONCAT_WS, separator +: string +: strings: _*) + concatWs(separator, string, strings: _*) + } + + /** + * Returns the string that results from concatenating the arguments and separator. + * Returns NULL If the separator is NULL. + * + * Note: this user-defined function does not skip empty strings. However, it does skip any NULL + * values after the separator argument. + **/ + def concatWs(separator: Expression, string: Expression, strings: Expression*): Expression = { + Expressions.concatWs(separator, string, strings: _*) } /** @@ -1483,7 +744,7 @@ trait ImplicitExpressionConversions { * generator. */ def uuid(): Expression = { - unresolvedCall(UUID) + Expressions.uuid() } /** @@ -1492,7 +753,7 @@ trait ImplicitExpressionConversions { * e.g. nullOf(DataTypes.INT()) */ def nullOf(dataType: DataType): Expression = { - valueLiteral(null, dataType) + Expressions.nullOf(dataType) } /** @@ -1503,21 +764,21 @@ trait ImplicitExpressionConversions { * documentation for more information. */ def nullOf(typeInfo: TypeInformation[_]): Expression = { - nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo)) + Expressions.nullOf(typeInfo) } /** * Calculates the logarithm of the given value. */ def log(value: Expression): Expression = { - unresolvedCall(LOG, value) + Expressions.log(value) } /** * Calculates the logarithm of the given value to the given base. */ def log(base: Expression, value: Expression): Expression = { - unresolvedCall(LOG, base, value) + Expressions.log(base, value) } /** @@ -1531,7 +792,7 @@ trait ImplicitExpressionConversions { * @param ifFalse expression to be evaluated if condition does not hold */ def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression = { - unresolvedCall(IF, condition, ifTrue, ifFalse) + Expressions.ifThenElse(condition, ifTrue, ifFalse) } /** @@ -1544,7 +805,7 @@ trait ImplicitExpressionConversions { * e.g. withColumns('b to 'c) or withColumns('*) */ def withColumns(head: Expression, tail: Expression*): Expression = { - unresolvedCall(WITH_COLUMNS, head +: tail: _*) + Expressions.withColumns(head, tail: _*) } /** @@ -1558,6 +819,20 @@ trait ImplicitExpressionConversions { * e.g. withoutColumns('b to 'c) or withoutColumns('c) */ def withoutColumns(head: Expression, tail: Expression*): Expression = { - unresolvedCall(WITHOUT_COLUMNS, head +: tail: _*) + Expressions.withoutColumns(head, tail: _*) + } + + /** + * Boolean AND in three-valued logic. + */ + def and(predicate0: Expression, predicate1: Expression, predicates: Expression*): Expression = { + Expressions.and(predicate0, predicate1, predicates: _*) + } + + /** + * Boolean OR in three-valued logic. + */ + def or(predicate0: Expression, predicate1: Expression, predicates: Expression*): Expression = { + Expressions.or(predicate0, predicate1, predicates: _*) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala index 06a9b5f..fadf854 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.functions._ import org.apache.flink.table.planner.expressions.{E => PlannerE, UUID => PlannerUUID} import org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THROW_EXCEPTION import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo -import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE} +import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL} import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._ import _root_.scala.collection.JavaConverters._ @@ -151,12 +151,12 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp expr case AND => - assert(args.size == 2) - And(args.head, args.last) + assert(args.size >= 2) + args.reduceLeft(And) case OR => - assert(args.size == 2) - Or(args.head, args.last) + assert(args.size >= 2) + args.reduceLeft(Or) case NOT => assert(args.size == 1) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala index cb9f6b1..7b2c999 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala @@ -133,12 +133,12 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp expr case AND => - assert(args.size == 2) - And(args.head, args.last) + assert(args.size >= 2) + args.reduceLeft(And) case OR => - assert(args.size == 2) - Or(args.head, args.last) + assert(args.size >= 2) + args.reduceLeft(Or) case NOT => assert(args.size == 1)