This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 029d578e098e2b02281d5c320da55c06e239b2b9
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue Feb 11 08:25:12 2020 +0100

    [FLINK-16033][table-api] Introduced Java Table API Expression DSL
---
 .../org/apache/flink/table/api/ApiExpression.java  |   66 +
 .../org/apache/flink/table/api/Expressions.java    |  549 +++++++++
 .../org/apache/flink/table/api/GroupWindow.java    |    5 +-
 .../org/apache/flink/table/api/SessionWithGap.java |    3 +-
 .../flink/table/api/SessionWithGapOnTime.java      |    5 +-
 .../table/api/SessionWithGapOnTimeWithAlias.java   |    3 +-
 .../org/apache/flink/table/api/SlideWithSize.java  |    3 +-
 .../flink/table/api/SlideWithSizeAndSlide.java     |    5 +-
 .../table/api/SlideWithSizeAndSlideOnTime.java     |   13 +-
 .../api/SlideWithSizeAndSlideOnTimeWithAlias.java  |   13 +-
 .../org/apache/flink/table/api/TumbleWithSize.java |    3 +-
 .../flink/table/api/TumbleWithSizeOnTime.java      |    5 +-
 .../table/api/TumbleWithSizeOnTimeWithAlias.java   |    3 +-
 .../flink/table/api/internal/BaseExpressions.java  | 1284 ++++++++++++++++++++
 .../apache/flink/table/api/internal/TableImpl.java |   37 +-
 .../table/expressions/ApiExpressionUtils.java      |   39 +-
 .../table/expressions/LookupCallExpression.java    |    3 +-
 .../expressions/UnresolvedCallExpression.java      |    7 +-
 .../expressions/resolver/ExpressionResolver.java   |    2 +
 .../expressions/resolver/LookupCallResolver.java   |   12 +
 .../resolver/rules/OverWindowResolverRule.java     |    2 +-
 .../expressions/resolver/rules/ResolverRules.java  |    6 +
 .../resolver/rules/UnwrapApiExpressionRule.java}   |   30 +-
 .../flink/table/typeutils/FieldInfoUtils.java      |   14 +-
 .../org/apache/flink/table/api/expressionDsl.scala | 1045 +++-------------
 .../expressions/PlannerExpressionConverter.scala   |   10 +-
 .../expressions/PlannerExpressionConverter.scala   |    8 +-
 27 files changed, 2205 insertions(+), 970 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java
new file mode 100644
index 0000000..0e2027d
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ApiExpression.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.table.api.internal.BaseExpressions;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+
+import java.util.List;
+
+/**
+ * Java API class that gives access to expression operations.
+ *
+ * @see BaseExpressions
+ */
+public final class ApiExpression extends BaseExpressions<Object, 
ApiExpression> implements Expression {
+       private final Expression wrappedExpression;
+
+       @Override
+       public String asSummaryString() {
+               return wrappedExpression.asSummaryString();
+       }
+
+       ApiExpression(Expression wrappedExpression) {
+               if (wrappedExpression instanceof ApiExpression) {
+                       throw new UnsupportedOperationException("This is a bug. 
Please file an issue.");
+               }
+               this.wrappedExpression = wrappedExpression;
+       }
+
+       @Override
+       public Expression toExpr() {
+               return wrappedExpression;
+       }
+
+       @Override
+       protected ApiExpression toApiSpecificExpression(Expression expression) {
+               return new ApiExpression(expression);
+       }
+
+       @Override
+       public List<Expression> getChildren() {
+               return wrappedExpression.getChildren();
+       }
+
+       @Override
+       public <R> R accept(ExpressionVisitor<R> visitor) {
+               return wrappedExpression.accept(visitor);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
new file mode 100644
index 0000000..9e0c10e
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TimePointUnit;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.types.utils.ValueDataTypeConverter;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+
+/**
+ * Entry point of the Table API Expression DSL such as: {@code 
$("myField").plus(10).abs()}
+ *
+ * <p>This class contains static methods for referencing table columns, 
creating literals,
+ * and building more complex {@link Expression} chains. {@link ApiExpression 
ApiExpressions} are
+ * pure API entities that are further translated into {@link 
ResolvedExpression ResolvedExpressions}
+ * under the hood.
+ *
+ * <p>For fluent definition of expressions and easier readability, we 
recommend to add a
+ * star import to the methods of this class:
+ *
+ * <pre>
+ * import static org.apache.flink.table.api.Expressions.*;
+ * </pre>
+ *
+ * <p>Check the documentation for more programming language specific APIs, for 
example, by using
+ * Scala implicits.
+ */
+@PublicEvolving
+public final class Expressions {
+       /**
+        * Creates an unresolved reference to a table's field.
+        *
+        * <p>Example:
+        * <pre>{@code
+        *   tab.select($("key"), $("value"))
+        * }
+        * </pre>
+        */
+       //CHECKSTYLE.OFF: MethodName
+       public static ApiExpression $(String name) {
+               return new ApiExpression(unresolvedRef(name));
+       }
+       //CHECKSTYLE.ON: MethodName
+
+       /**
+        * Creates a SQL literal.
+        *
+        * <p>The data type is derived from the object's class and its value.
+        *
+        * <p>For example:
+        * <ul>
+        *     <li>{@code lit(12)} leads to {@code INT}</li>
+        *     <li>{@code lit("abc")} leads to {@code CHAR(3)}</li>
+        *     <li>{@code lit(new BigDecimal("123.45"))} leads to {@code 
DECIMAL(5, 2)}</li>
+        * </ul>
+        *
+        * <p>See {@link ValueDataTypeConverter} for a list of supported 
literal values.
+        */
+       public static ApiExpression lit(Object v) {
+               return new ApiExpression(valueLiteral(v));
+       }
+
+       /**
+        * Creates a SQL literal of a given {@link DataType}.
+        *
+        * <p>The method {@link #lit(Object)} is preferred as it extracts the 
{@link DataType} automatically.
+        * Use this method only when necessary. The class of {@code v} must be 
supported according to the
+        * {@link 
org.apache.flink.table.types.logical.LogicalType#supportsInputConversion(Class)}.
+        */
+       public static ApiExpression lit(Object v, DataType dataType) {
+               return new ApiExpression(valueLiteral(v, dataType));
+       }
+
+       /**
+        * Indicates a range from 'start' to 'end', which can be used in columns
+        * selection.
+        *
+        * <p>Example:
+        * <pre>{@code
+        * Table table = ...
+        * table.select(withColumns(range(b, c)))
+        * }</pre>
+        *
+        * @see #withColumns(Object, Object...)
+        * @see #withoutColumns(Object, Object...)
+        */
+       public static ApiExpression range(String start, String end) {
+               return apiCall(BuiltInFunctionDefinitions.RANGE_TO, 
unresolvedRef(start), unresolvedRef(end));
+       }
+
+       /**
+        * Indicates an index based range, which can be used in columns 
selection.
+        *
+        * <p>Example:
+        * <pre>{@code
+        * Table table = ...
+        * table.select(withColumns(range(3, 4)))
+        * }</pre>
+        *
+        * @see #withColumns(Object, Object...)
+        * @see #withoutColumns(Object, Object...)
+        */
+       public static ApiExpression range(int start, int end) {
+               return apiCall(BuiltInFunctionDefinitions.RANGE_TO, 
valueLiteral(start), valueLiteral(end));
+       }
+
+       /**
+        * Boolean AND in three-valued logic.
+        */
+       public static ApiExpression and(Object predicate0, Object predicate1, 
Object... predicates) {
+               return 
apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.AND, predicate0, 
predicate1, predicates);
+       }
+
+       /**
+        * Boolean OR in three-valued logic.
+        */
+       public static ApiExpression or(Object predicate0, Object predicate1, 
Object... predicates) {
+               return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.OR, 
predicate0, predicate1, predicates);
+       }
+
+       /**
+        * Offset constant to be used in the {@code preceding} clause of 
unbounded {@code Over} windows. Use this
+        * constant for a time interval. Unbounded over windows start with the 
first row of a partition.
+        */
+       public static final ApiExpression UNBOUNDED_ROW = 
apiCall(BuiltInFunctionDefinitions.UNBOUNDED_ROW);
+
+       /**
+        * Offset constant to be used in the {@code preceding} clause of 
unbounded {@link Over} windows. Use this
+        * constant for a row-count interval. Unbounded over windows start with 
the first row of a
+        * partition.
+        */
+       public static final ApiExpression UNBOUNDED_RANGE = 
apiCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE);
+
+       /**
+        * Offset constant to be used in the {@code following} clause of {@link 
Over} windows. Use this for setting
+        * the upper bound of the window to the current row.
+        */
+       public static final ApiExpression CURRENT_ROW = 
apiCall(BuiltInFunctionDefinitions.CURRENT_ROW);
+
+       /**
+        * Offset constant to be used in the {@code following} clause of {@link 
Over} windows. Use this for setting
+        * the upper bound of the window to the sort key of the current row, 
i.e., all rows with the same
+        * sort key as the current row are included in the window.
+        */
+       public static final ApiExpression CURRENT_RANGE = 
apiCall(BuiltInFunctionDefinitions.CURRENT_RANGE);
+
+       /**
+        * Returns the current SQL date in UTC time zone.
+        */
+       public static ApiExpression currentDate() {
+               return apiCall(BuiltInFunctionDefinitions.CURRENT_DATE);
+       }
+
+       /**
+        * Returns the current SQL time in UTC time zone.
+        */
+       public static ApiExpression currentTime() {
+               return apiCall(BuiltInFunctionDefinitions.CURRENT_TIME);
+       }
+
+       /**
+        * Returns the current SQL timestamp in UTC time zone.
+        */
+       public static ApiExpression currentTimestamp() {
+               return apiCall(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP);
+       }
+
+       /**
+        * Returns the current SQL time in local time zone.
+        */
+       public static ApiExpression localTime() {
+               return apiCall(BuiltInFunctionDefinitions.LOCAL_TIME);
+       }
+
+       /**
+        * Returns the current SQL timestamp in local time zone.
+        */
+       public static ApiExpression localTimestamp() {
+               return apiCall(BuiltInFunctionDefinitions.LOCAL_TIMESTAMP);
+       }
+
+       /**
+        * Determines whether two anchored time intervals overlap. Time point 
and temporal are
+        * transformed into a range defined by two time points (start, end). 
The function
+        * evaluates <code>leftEnd >= rightStart && rightEnd >= 
leftStart</code>.
+        *
+        * <p>It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
+        *
+        * <p>e.g.
+        * <pre>{@code
+        * temporalOverlaps(
+        *      lit("2:55:00").toTime(),
+        *      interval(Duration.ofHour(1)),
+        *      lit("3:30:00").toTime(),
+        *      interval(Duration.ofHour(2))
+        * }</pre>
+        * leads to true
+        */
+       public static ApiExpression temporalOverlaps(
+                       Object leftTimePoint,
+                       Object leftTemporal,
+                       Object rightTimePoint,
+                       Object rightTemporal) {
+               return apiCall(
+                       BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS,
+                       leftTimePoint,
+                       leftTemporal,
+                       rightTimePoint,
+                       rightTemporal);
+       }
+
+       /**
+        * Formats a timestamp as a string using a specified format.
+        * The format must be compatible with MySQL's date formatting syntax as 
used by the
+        * date_parse function.
+        *
+        * <p>For example {@code dataFormat($("time"), "%Y, %d %M")} results in 
strings formatted as "2017, 05 May".
+        *
+        * @param timestamp The timestamp to format as string.
+        * @param format The format of the string.
+        * @return The formatted timestamp as string.
+        */
+       public static ApiExpression dateFormat(
+                       Object timestamp,
+                       Object format) {
+               return apiCall(BuiltInFunctionDefinitions.DATE_FORMAT, 
timestamp, format);
+       }
+
+       /**
+        * Returns the (signed) number of {@link TimePointUnit} between 
timePoint1 and timePoint2.
+        *
+        * <p>For example, {@code timestampDiff(TimePointUnit.DAY, 
lit("2016-06-15").toDate(), lit("2016-06-18").toDate()}
+        * leads to 3.
+        *
+        * @param timePointUnit The unit to compute diff.
+        * @param timePoint1 The first point in time.
+        * @param timePoint2 The second point in time.
+        * @return The number of intervals as integer value.
+        */
+       public static ApiExpression timestampDiff(
+                       TimePointUnit timePointUnit,
+                       Object timePoint1,
+                       Object timePoint2) {
+               return apiCall(BuiltInFunctionDefinitions.TIMESTAMP_DIFF, 
valueLiteral(timePointUnit), timePoint1, timePoint2);
+       }
+
+       /**
+        * Creates an array of literals.
+        */
+       public static ApiExpression array(Object head, Object... tail) {
+               return 
apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ARRAY, head, tail);
+       }
+
+       /**
+        * Creates a row of expressions.
+        */
+       public static ApiExpression row(Object head, Object... tail) {
+               return 
apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ROW, head, tail);
+       }
+
+       /**
+        * Creates a map of expressions.
+        *
+        * <pre>{@code
+        *  table.select(
+        *      map(
+        *          "key1", 1,
+        *          "key2", 2,
+        *          "key3", 3
+        *      ))
+        * }</pre>
+        *
+        * <p>Note keys and values should have the same types for all entries.
+        */
+       public static ApiExpression map(Object key, Object value, Object... 
tail) {
+               return 
apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.MAP, key, value, tail);
+       }
+
+       /**
+        * Creates an interval of rows.
+        *
+        * @see Table#window(GroupWindow)
+        * @see Table#window(OverWindow...)
+        */
+       public static ApiExpression rowInterval(Long rows) {
+               return new ApiExpression(valueLiteral(rows));
+       }
+
+       /**
+        * Returns a value that is closer than any other value to pi.
+        */
+       public static ApiExpression pi() {
+               return apiCall(BuiltInFunctionDefinitions.PI);
+       }
+
+       /**
+        * Returns a value that is closer than any other value to e.
+        */
+       public static ApiExpression e() {
+               return apiCall(BuiltInFunctionDefinitions.E);
+       }
+
+       /**
+        * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 
(exclusive).
+        */
+       public static ApiExpression rand() {
+               return apiCall(BuiltInFunctionDefinitions.RAND);
+       }
+
+       /**
+        * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 
(exclusive) with a
+        * initial seed. Two rand() functions will return identical sequences 
of numbers if they
+        * have same initial seed.
+        */
+       public static ApiExpression rand(Object seed) {
+               return apiCall(BuiltInFunctionDefinitions.RAND, 
objectToExpression(seed));
+       }
+
+       /**
+        * Returns a pseudorandom integer value between 0.0 (inclusive) and the 
specified
+        * value (exclusive).
+        */
+       public static ApiExpression randInteger(Object bound) {
+               return apiCall(BuiltInFunctionDefinitions.RAND_INTEGER, 
objectToExpression(bound));
+       }
+
+       /**
+        * Returns a pseudorandom integer value between 0.0 (inclusive) and the 
specified value
+        * (exclusive) with a initial seed. Two randInteger() functions will 
return identical sequences
+        * of numbers if they have same initial seed and same bound.
+        */
+       public static ApiExpression randInteger(Object seed, Object bound) {
+               return apiCall(BuiltInFunctionDefinitions.RAND_INTEGER, 
objectToExpression(seed), objectToExpression(bound));
+       }
+
+       /**
+        * Returns the string that results from concatenating the arguments.
+        * Returns NULL if any argument is NULL.
+        */
+       public static ApiExpression concat(Object string, Object... strings) {
+               return 
apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.CONCAT, string, strings);
+       }
+
+       /**
+        * Calculates the arc tangent of a given coordinate.
+        */
+       public static ApiExpression atan2(Object y, Object x) {
+               return 
apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ATAN2, y, x);
+       }
+
+       /**
+        * Returns negative numeric.
+        */
+       public static ApiExpression negative(Object v) {
+               return apiCall(BuiltInFunctionDefinitions.MINUS_PREFIX, v);
+       }
+
+       /**
+        * Returns the string that results from concatenating the arguments and 
separator.
+        * Returns NULL If the separator is NULL.
+        *
+        * <p>Note: this function does not skip empty strings. However, it does 
skip any NULL
+        * values after the separator argument.
+        */
+       public static ApiExpression concatWs(Object separator, Object string, 
Object... strings) {
+               return 
apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.CONCAT_WS, separator, 
string, strings);
+       }
+
+       /**
+        * Returns an UUID (Universally Unique Identifier) string (e.g.,
+        * "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 
(pseudo randomly
+        * generated) UUID. The UUID is generated using a cryptographically 
strong pseudo random number
+        * generator.
+        */
+       public static ApiExpression uuid() {
+               return apiCall(BuiltInFunctionDefinitions.UUID);
+       }
+
+       /**
+        * Returns a null literal value of a given data type.
+        *
+        * <p>e.g. {@code nullOf(DataTypes.INT())}
+        */
+       public static ApiExpression nullOf(DataType dataType) {
+               return new ApiExpression(valueLiteral(null, dataType));
+       }
+
+       /**
+        * @deprecated This method will be removed in future versions as it 
uses the old type system.
+        *             It is recommended to use {@link #nullOf(DataType)} 
instead which uses the new type
+        *             system based on {@link DataTypes}. Please make sure to 
use either the old or the new
+        *             type system consistently to avoid unintended behavior. 
See the website
+        *             documentation for more information.
+        */
+       public static ApiExpression nullOf(TypeInformation<?> typeInfo) {
+               return 
nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo));
+       }
+
+       /**
+        * Calculates the logarithm of the given value.
+        */
+       public static ApiExpression log(Object value) {
+               return apiCall(BuiltInFunctionDefinitions.LOG, value);
+       }
+
+       /**
+        * Calculates the logarithm of the given value to the given base.
+        */
+       public static ApiExpression log(Object base, Object value) {
+               return apiCall(BuiltInFunctionDefinitions.LOG, base, value);
+       }
+
+       /**
+        * Ternary conditional operator that decides which of two other 
expressions should be evaluated
+        * based on a evaluated boolean condition.
+        *
+        * <p>e.g. ifThenElse($("f0") > 5, "A", "B") leads to "A"
+        *
+        * @param condition boolean condition
+        * @param ifTrue expression to be evaluated if condition holds
+        * @param ifFalse expression to be evaluated if condition does not hold
+        */
+       public static ApiExpression ifThenElse(Object condition, Object ifTrue, 
Object ifFalse) {
+               return apiCall(BuiltInFunctionDefinitions.IF, condition, 
ifTrue, ifFalse);
+       }
+
+       /**
+        * Creates an expression that selects a range of columns. It can be 
used wherever an array of
+        * expression is accepted such as function calls, projections, or 
groupings.
+        *
+        * <p>A range can either be index-based or name-based. Indices start at 
1 and boundaries are
+        * inclusive.
+        *
+        * <p>e.g. withColumns(range("b", "c")) or withoutColumns($("*"))
+        */
+       public static ApiExpression withColumns(Object head, Object... tail) {
+               return 
apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITH_COLUMNS, head, tail);
+       }
+
+       /**
+        * Creates an expression that selects all columns except for the given 
range of columns. It can
+        * be used wherever an array of expression is accepted such as function 
calls, projections, or
+        * groupings.
+        *
+        * <p>A range can either be index-based or name-based. Indices start at 
1 and boundaries are
+        * inclusive.
+        *
+        * <p>e.g. withoutColumns(range("b", "c")) or withoutColumns($("c"))
+        */
+       public static ApiExpression withoutColumns(Object head, Object... tail) 
{
+               return 
apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.WITHOUT_COLUMNS, head, 
tail);
+       }
+
+       /**
+        * A call to a function that will be looked up in a catalog. There are 
two kinds of functions:
+        * <ul>
+        *     <li>System functions - which are identified with one part 
names</li>
+        *     <li>Catalog functions - which are identified always with three 
parts names
+        *     (catalog, database, function)</li>
+        * </ul>
+        *
+        * <p>Moreover each function can either be a temporary function or 
permanent one
+        * (which is stored in an external catalog).
+        *
+        * <p>Based on that two properties the resolution order for looking up 
a function based on
+        * the provided {@code functionName} is following:
+        * <ul>
+        *     <li>Temporary system function</li>
+        *     <li>System function</li>
+        *     <li>Temporary catalog function</li>
+        *     <li>Catalog function</li>
+        * </ul>
+        *
+        * @see TableEnvironment#useCatalog(String)
+        * @see TableEnvironment#useDatabase(String)
+        * @see TableEnvironment#createTemporaryFunction
+        * @see TableEnvironment#createTemporarySystemFunction
+        */
+       public static ApiExpression call(String path, Object... params) {
+               return new ApiExpression(ApiExpressionUtils.lookupCall(
+                       path,
+                       
Arrays.stream(params).map(ApiExpressionUtils::objectToExpression).toArray(Expression[]::new)));
+       }
+
+       private static ApiExpression apiCall(FunctionDefinition 
functionDefinition, Object... args) {
+               List<Expression> arguments =
+                       Stream.of(args)
+                               .map(ApiExpressionUtils::objectToExpression)
+                               .collect(Collectors.toList());
+               return new ApiExpression(unresolvedCall(functionDefinition, 
arguments));
+       }
+
+       private static ApiExpression 
apiCallAtLeastOneArgument(FunctionDefinition functionDefinition,
+                       Object arg0,
+                       Object... args) {
+               List<Expression> arguments = Stream.concat(
+                       Stream.of(arg0),
+                       Stream.of(args)
+               ).map(ApiExpressionUtils::objectToExpression)
+                       .collect(Collectors.toList());
+               return new ApiExpression(unresolvedCall(functionDefinition, 
arguments));
+       }
+
+       private static ApiExpression apiCallAtLeastTwoArgument(
+                       FunctionDefinition functionDefinition,
+                       Object arg0,
+                       Object arg1,
+                       Object... args) {
+               List<Expression> arguments = Stream.concat(
+                       Stream.of(arg0, arg1),
+                       Stream.of(args)
+               ).map(ApiExpressionUtils::objectToExpression)
+                       .collect(Collectors.toList());
+               return new ApiExpression(unresolvedCall(functionDefinition, 
arguments));
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
index 4601e75..36cd855 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 
 /**
@@ -41,8 +42,8 @@ public abstract class GroupWindow {
        private final Expression timeField;
 
        GroupWindow(Expression alias, Expression timeField) {
-               this.alias = alias;
-               this.timeField = timeField;
+               this.alias = ApiExpressionUtils.unwrapFromApi(alias);
+               this.timeField = ApiExpressionUtils.unwrapFromApi(timeField);
        }
 
        public Expression getAlias() {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
index 6e646f7..0d9ee6d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -36,7 +37,7 @@ public final class SessionWithGap {
        private final Expression gap;
 
        SessionWithGap(Expression gap) {
-               this.gap = gap;
+               this.gap = ApiExpressionUtils.unwrapFromApi(gap);
        }
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
index 53c9c4f..ef63dde 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -32,8 +33,8 @@ public final class SessionWithGapOnTime {
        private final Expression gap;
 
        SessionWithGapOnTime(Expression timeField, Expression gap) {
-               this.timeField = timeField;
-               this.gap = gap;
+               this.timeField = ApiExpressionUtils.unwrapFromApi(timeField);
+               this.gap = ApiExpressionUtils.unwrapFromApi(gap);
        }
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
index 891ea7c..b88a93e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 
 /**
@@ -31,7 +32,7 @@ public final class SessionWithGapOnTimeWithAlias extends 
GroupWindow {
 
        SessionWithGapOnTimeWithAlias(Expression alias, Expression timeField, 
Expression gap) {
                super(alias, timeField);
-               this.gap = gap;
+               this.gap = ApiExpressionUtils.unwrapFromApi(gap);
        }
 
        public Expression getGap() {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
index 44470f9..983d07d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -32,7 +33,7 @@ public final class SlideWithSize {
        private final Expression size;
 
        SlideWithSize(Expression size) {
-               this.size = size;
+               this.size = ApiExpressionUtils.unwrapFromApi(size);
        }
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
index 4d509d1..d4d83c1 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -37,8 +38,8 @@ public final class SlideWithSizeAndSlide {
        private final Expression slide;
 
        SlideWithSizeAndSlide(Expression size, Expression slide) {
-               this.size = size;
-               this.slide = slide;
+               this.size = ApiExpressionUtils.unwrapFromApi(size);
+               this.slide = ApiExpressionUtils.unwrapFromApi(slide);
        }
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
index 604b9cd..c74529a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -33,12 +34,12 @@ public final class SlideWithSizeAndSlideOnTime {
        private final Expression slide;
 
        SlideWithSizeAndSlideOnTime(
-               Expression timeField,
-               Expression size,
-               Expression slide) {
-               this.timeField = timeField;
-               this.size = size;
-               this.slide = slide;
+                       Expression timeField,
+                       Expression size,
+                       Expression slide) {
+               this.timeField = ApiExpressionUtils.unwrapFromApi(timeField);
+               this.size = ApiExpressionUtils.unwrapFromApi(size);
+               this.slide = ApiExpressionUtils.unwrapFromApi(slide);
        }
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
index 50b3692..8b9c692 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 
 /**
@@ -31,13 +32,13 @@ public final class SlideWithSizeAndSlideOnTimeWithAlias 
extends GroupWindow {
        private final Expression slide;
 
        SlideWithSizeAndSlideOnTimeWithAlias(
-               Expression alias,
-               Expression timeField,
-               Expression size,
-               Expression slide) {
+                       Expression alias,
+                       Expression timeField,
+                       Expression size,
+                       Expression slide) {
                super(alias, timeField);
-               this.size = size;
-               this.slide = slide;
+               this.size = ApiExpressionUtils.unwrapFromApi(size);
+               this.slide = ApiExpressionUtils.unwrapFromApi(slide);
        }
 
        public Expression getSize() {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
index 600e3a1..bbc1825 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -36,7 +37,7 @@ public final class TumbleWithSize {
        private Expression size;
 
        TumbleWithSize(Expression size) {
-               this.size = size;
+               this.size = ApiExpressionUtils.unwrapFromApi(size);
        }
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
index f635ba5..07616d1 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
 
@@ -32,8 +33,8 @@ public final class TumbleWithSizeOnTime {
        private final Expression size;
 
        TumbleWithSizeOnTime(Expression time, Expression size) {
-               this.time = time;
-               this.size = size;
+               this.time = ApiExpressionUtils.unwrapFromApi(time);
+               this.size = ApiExpressionUtils.unwrapFromApi(size);
        }
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
index 5180d33..4e25a4f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 
 /**
@@ -31,7 +32,7 @@ public final class TumbleWithSizeOnTimeWithAlias extends 
GroupWindow {
 
        TumbleWithSizeOnTimeWithAlias(Expression alias, Expression timeField, 
Expression size) {
                super(alias, timeField);
-               this.size = size;
+               this.size = ApiExpressionUtils.unwrapFromApi(size);
        }
 
        public Expression getSize() {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
new file mode 100644
index 0000000..41cb93c
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -0,0 +1,1284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_DAY;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_HOUR;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_MINUTE;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.MILLIS_PER_SECOND;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.tableRef;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.toMilliInterval;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.toMonthInterval;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.typeLiteral;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
+//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.*;
+//CHECKSTYLE.ON: AvoidStarImport|ImportOrder
+
+/**
+ * These are Java and Scala common operations that can be used to construct an 
{@link Expression} AST for
+ * expression operations.
+ *
+ * @param <InType>  The accepted type of input expressions, it is {@code 
Expression} for Scala and
+ *                  {@code Object} for Java. Generally the expression DSL 
works on expressions, the
+ *                  reason why Java accepts Object is to remove cumbersome 
call to {@code lit()} for
+ *                  literals. Scala alleviates this problem via implicit 
conversions.
+ * @param <OutType> The produced type of the DSL. It is {@code ApiExpression} 
for Java and {@code Expression}
+ *                  for Scala. In Scala the infix operations are included via 
implicit conversions. In Java
+ *                  we introduced a wrapper that enables the operations 
without pulling them through the whole stack.
+ */
+@PublicEvolving
+public abstract class BaseExpressions<InType, OutType> {
+       protected abstract Expression toExpr();
+
+       protected abstract OutType toApiSpecificExpression(Expression 
expression);
+
+       /**
+        * Specifies a name for an expression i.e. a field.
+        *
+        * @param name       name for one field
+        * @param extraNames additional names if the expression expands to 
multiple fields
+        */
+       public OutType as(String name, String... extraNames) {
+               return 
toApiSpecificExpression(ApiExpressionUtils.unresolvedCall(
+                       BuiltInFunctionDefinitions.AS,
+                       Stream.concat(
+                               Stream.of(toExpr(), 
ApiExpressionUtils.valueLiteral(name)),
+                               
Stream.of(extraNames).map(ApiExpressionUtils::valueLiteral)
+                       ).toArray(Expression[]::new)));
+       }
+
+       /**
+        * Boolean AND in three-valued logic. This is an infix notation. See 
also
+        * {@link Expressions#and(Object, Object, Object...)} for prefix 
notation with multiple arguments.
+        *
+        * @see Expressions#and(Object, Object, Object...)
+        */
+       public OutType and(InType other) {
+               return toApiSpecificExpression(unresolvedCall(AND, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Boolean OR in three-valued logic. This is an infix notation. See also
+        * {@link Expressions#or(Object, Object, Object...)} for prefix 
notation with multiple arguments.
+        *
+        * @see Expressions#or(Object, Object, Object...)
+        */
+       public OutType or(InType other) {
+               return toApiSpecificExpression(unresolvedCall(OR, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Greater than.
+        */
+       public OutType isGreater(InType other) {
+               return toApiSpecificExpression(unresolvedCall(GREATER_THAN, 
toExpr(), objectToExpression(other)));
+       }
+
+       /**
+        * Greater than or equal.
+        */
+       public OutType isGreaterOrEqual(InType other) {
+               return 
toApiSpecificExpression(unresolvedCall(GREATER_THAN_OR_EQUAL, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Less than.
+        */
+       public OutType isLess(InType other) {
+               return toApiSpecificExpression(unresolvedCall(LESS_THAN, 
toExpr(), objectToExpression(other)));
+       }
+
+       /**
+        * Less than or equal.
+        */
+       public OutType isLessOrEqual(InType other) {
+               return 
toApiSpecificExpression(unresolvedCall(LESS_THAN_OR_EQUAL, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Equals.
+        */
+       public OutType isEqual(InType other) {
+               return toApiSpecificExpression(unresolvedCall(EQUALS, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Not equal.
+        */
+       public OutType isNotEqual(InType other) {
+               return toApiSpecificExpression(unresolvedCall(NOT_EQUALS, 
toExpr(), objectToExpression(other)));
+       }
+
+       /**
+        * Returns left plus right.
+        */
+       public OutType plus(InType other) {
+               return toApiSpecificExpression(unresolvedCall(PLUS, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Returns left minus right.
+        */
+       public OutType minus(InType other) {
+               return toApiSpecificExpression(unresolvedCall(MINUS, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Returns left divided by right.
+        */
+       public OutType dividedBy(InType other) {
+               return toApiSpecificExpression(unresolvedCall(DIVIDE, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Returns left multiplied by right.
+        */
+       public OutType times(InType other) {
+               return toApiSpecificExpression(unresolvedCall(TIMES, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Returns true if the given expression is between lowerBound and 
upperBound (both inclusive).
+        * False otherwise. The parameters must be numeric types or identical 
comparable types.
+        *
+        * @param lowerBound numeric or comparable expression
+        * @param upperBound numeric or comparable expression
+        */
+       public OutType between(InType lowerBound, InType upperBound) {
+               return toApiSpecificExpression(unresolvedCall(
+                       BETWEEN,
+                       toExpr(),
+                       objectToExpression(lowerBound),
+                       objectToExpression(upperBound)));
+       }
+
+       /**
+        * Returns true if the given expression is not between lowerBound and 
upperBound (both
+        * inclusive). False otherwise. The parameters must be numeric types or 
identical
+        * comparable types.
+        *
+        * @param lowerBound numeric or comparable expression
+        * @param upperBound numeric or comparable expression
+        */
+       public OutType notBetween(InType lowerBound, InType upperBound) {
+               return toApiSpecificExpression(unresolvedCall(
+                       NOT_BETWEEN,
+                       toExpr(),
+                       objectToExpression(lowerBound),
+                       objectToExpression(upperBound)));
+       }
+
+       /**
+        * Ternary conditional operator that decides which of two other 
expressions should be evaluated
+        * based on a evaluated boolean condition.
+        *
+        * <p>e.g. lit(42).isGreater(5).then("A", "B") leads to "A"
+        *
+        * @param ifTrue expression to be evaluated if condition holds
+        * @param ifFalse expression to be evaluated if condition does not hold
+        */
+       public OutType then(InType ifTrue, InType ifFalse) {
+               return toApiSpecificExpression(unresolvedCall(
+                       IF,
+                       toExpr(),
+                       objectToExpression(ifTrue),
+                       objectToExpression(ifFalse)));
+       }
+
+       /**
+        * Returns true if the given expression is null.
+        */
+       public OutType isNull() {
+               return toApiSpecificExpression(unresolvedCall(IS_NULL, 
toExpr()));
+       }
+
+       /**
+        * Returns true if the given expression is not null.
+        */
+       public OutType isNotNull() {
+               return toApiSpecificExpression(unresolvedCall(IS_NOT_NULL, 
toExpr()));
+       }
+
+       /**
+        * Returns true if given boolean expression is true. False otherwise 
(for null and false).
+        */
+       public OutType isTrue() {
+               return toApiSpecificExpression(unresolvedCall(IS_TRUE, 
toExpr()));
+       }
+
+       /**
+        * Returns true if given boolean expression is false. False otherwise 
(for null and true).
+        */
+       public OutType isFalse() {
+               return toApiSpecificExpression(unresolvedCall(IS_FALSE, 
toExpr()));
+       }
+
+       /**
+        * Returns true if given boolean expression is not true (for null and 
false). False otherwise.
+        */
+       public OutType isNotTrue() {
+               return toApiSpecificExpression(unresolvedCall(IS_NOT_TRUE, 
toExpr()));
+       }
+
+       /**
+        * Returns true if given boolean expression is not false (for null and 
true). False otherwise.
+        */
+       public OutType isNotFalse() {
+               return toApiSpecificExpression(unresolvedCall(IS_NOT_FALSE, 
toExpr()));
+       }
+
+       /**
+        * Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT 
a), declares that an
+        * aggregation function is only applied on distinct input values.
+        *
+        * <p>For example:
+        * <pre>
+        * {@code
+        * orders
+        *  .groupBy($("a"))
+        *  .select($("a"), $("b").sum().distinct().as("d"))
+        * }
+        * </pre>
+        */
+       public OutType distinct() {
+               return toApiSpecificExpression(unresolvedCall(DISTINCT, 
toExpr()));
+       }
+
+       /**
+        * Returns the sum of the numeric field across all input values.
+        * If all values are null, null is returned.
+        */
+       public OutType sum() {
+               return toApiSpecificExpression(unresolvedCall(SUM, toExpr()));
+       }
+
+       /**
+        * Returns the sum of the numeric field across all input values.
+        * If all values are null, 0 is returned.
+        */
+       public OutType sum0() {
+               return toApiSpecificExpression(unresolvedCall(SUM0, toExpr()));
+       }
+
+       /**
+        * Returns the minimum value of field across all input values.
+        */
+       public OutType min() {
+               return toApiSpecificExpression(unresolvedCall(MIN, toExpr()));
+       }
+
+       /**
+        * Returns the maximum value of field across all input values.
+        */
+       public OutType max() {
+               return toApiSpecificExpression(unresolvedCall(MAX, toExpr()));
+       }
+
+       /**
+        * Returns the number of input rows for which the field is not null.
+        */
+       public OutType count() {
+               return toApiSpecificExpression(unresolvedCall(COUNT, toExpr()));
+       }
+
+       /**
+        * Returns the average (arithmetic mean) of the numeric field across 
all input values.
+        */
+       public OutType avg() {
+               return toApiSpecificExpression(unresolvedCall(AVG, toExpr()));
+       }
+
+       /**
+        * Returns the population standard deviation of an expression (the 
square root of varPop()).
+        */
+       public OutType stddevPop() {
+               return toApiSpecificExpression(unresolvedCall(STDDEV_POP, 
toExpr()));
+       }
+
+       /**
+        * Returns the sample standard deviation of an expression (the square 
root of varSamp()).
+        */
+       public OutType stddevSamp() {
+               return toApiSpecificExpression(unresolvedCall(STDDEV_SAMP, 
toExpr()));
+       }
+
+       /**
+        * Returns the population standard variance of an expression.
+        */
+       public OutType varPop() {
+               return toApiSpecificExpression(unresolvedCall(VAR_POP, 
toExpr()));
+       }
+
+       /**
+        * Returns the sample variance of a given expression.
+        */
+       public OutType varSamp() {
+               return toApiSpecificExpression(unresolvedCall(VAR_SAMP, 
toExpr()));
+       }
+
+       /**
+        * Returns multiset aggregate of a given expression.
+        */
+       public OutType collect() {
+               return toApiSpecificExpression(unresolvedCall(COLLECT, 
toExpr()));
+       }
+
+       /**
+        * Converts a value to a given data type.
+        *
+        * <p>e.g. "42".cast(DataTypes.INT()) leads to 42.
+        */
+       public OutType cast(DataType toType) {
+               return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), 
typeLiteral(toType)));
+       }
+
+       /**
+        * @deprecated This method will be removed in future versions as it 
uses the old type system. It
+        *             is recommended to use {@link #cast(DataType)} instead 
which uses the new type system
+        *             based on {@link org.apache.flink.table.api.DataTypes}. 
Please make sure to use either the old
+        *             or the new type system consistently to avoid unintended 
behavior. See the website documentation
+        *             for more information.
+        */
+       @Deprecated
+       public OutType cast(TypeInformation<?> toType) {
+               return toApiSpecificExpression(unresolvedCall(CAST, toExpr(), 
typeLiteral(fromLegacyInfoToDataType(toType))));
+       }
+
+       /**
+        * Specifies ascending order of an expression i.e. a field for orderBy 
unresolvedCall.
+        */
+       public OutType asc() {
+               return toApiSpecificExpression(unresolvedCall(ORDER_ASC, 
toExpr()));
+       }
+
+       /**
+        * Specifies descending order of an expression i.e. a field for orderBy 
unresolvedCall.
+        */
+       public OutType desc() {
+               return toApiSpecificExpression(unresolvedCall(ORDER_DESC, 
toExpr()));
+       }
+
+       /**
+        * Returns true if an expression exists in a given list of expressions. 
This is a shorthand
+        * for multiple OR conditions.
+        *
+        * <p>If the testing set contains null, the result will be null if the 
element can not be found
+        * and true if it can be found. If the element is null, the result is 
always null.
+        *
+        * <p>e.g. lit("42").in(1, 2, 3) leads to false.
+        */
+       @SafeVarargs
+       public final OutType in(InType... elements) {
+               Expression[] args = Stream.concat(
+                       Stream.of(toExpr()),
+                       
Arrays.stream(elements).map(ApiExpressionUtils::objectToExpression))
+                       .toArray(Expression[]::new);
+               return toApiSpecificExpression(unresolvedCall(IN, args));
+       }
+
+       /**
+        * Returns true if an expression exists in a given table sub-query. The 
sub-query table
+        * must consist of one column. This column must have the same data type 
as the expression.
+        *
+        * <p>Note: This operation is not supported in a streaming environment 
yet.
+        */
+       public OutType in(Table table) {
+               return toApiSpecificExpression(unresolvedCall(IN, toExpr(), 
tableRef(table.toString(), table)));
+       }
+
+       /**
+        * Returns the start time (inclusive) of a window when applied on a 
window reference.
+        */
+       public OutType start() {
+               return toApiSpecificExpression(unresolvedCall(WINDOW_START, 
toExpr()));
+       }
+
+       /**
+        * Returns the end time (exclusive) of a window when applied on a 
window reference.
+        *
+        * <p>e.g. if a window ends at 10:59:59.999 this property will return 
11:00:00.000.
+        */
+       public OutType end() {
+               return toApiSpecificExpression(unresolvedCall(WINDOW_END, 
toExpr()));
+       }
+
+       /**
+        * Calculates the remainder of division the given number by another one.
+        */
+       public OutType mod(InType other) {
+               return toApiSpecificExpression(unresolvedCall(MOD, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Calculates the Euler's number raised to the given power.
+        */
+       public OutType exp() {
+               return toApiSpecificExpression(unresolvedCall(EXP, toExpr()));
+       }
+
+       /**
+        * Calculates the base 10 logarithm of the given value.
+        */
+       public OutType log10() {
+               return toApiSpecificExpression(unresolvedCall(LOG10, toExpr()));
+       }
+
+       /**
+        * Calculates the base 2 logarithm of the given value.
+        */
+       public OutType log2() {
+               return toApiSpecificExpression(unresolvedCall(LOG2, toExpr()));
+       }
+
+       /**
+        * Calculates the natural logarithm of the given value.
+        */
+       public OutType ln() {
+               return toApiSpecificExpression(unresolvedCall(LN, toExpr()));
+       }
+
+       /**
+        * Calculates the natural logarithm of the given value.
+        */
+       public OutType log() {
+               return toApiSpecificExpression(unresolvedCall(LOG, toExpr()));
+       }
+
+       /**
+        * Calculates the logarithm of the given value to the given base.
+        */
+       public OutType log(InType base) {
+               return toApiSpecificExpression(unresolvedCall(LOG, 
objectToExpression(base), toExpr()));
+       }
+
+       /**
+        * Calculates the given number raised to the power of the other value.
+        */
+       public OutType power(InType other) {
+               return toApiSpecificExpression(unresolvedCall(POWER, toExpr(), 
objectToExpression(other)));
+       }
+
+       /**
+        * Calculates the hyperbolic cosine of a given value.
+        */
+       public OutType cosh() {
+               return toApiSpecificExpression(unresolvedCall(COSH, toExpr()));
+       }
+
+       /**
+        * Calculates the square root of a given value.
+        */
+       public OutType sqrt() {
+               return toApiSpecificExpression(unresolvedCall(SQRT, toExpr()));
+       }
+
+       /**
+        * Calculates the absolute value of given value.
+        */
+       public OutType abs() {
+               return toApiSpecificExpression(unresolvedCall(ABS, toExpr()));
+       }
+
+       /**
+        * Calculates the largest integer less than or equal to a given number.
+        */
+       public OutType floor() {
+               return toApiSpecificExpression(unresolvedCall(FLOOR, toExpr()));
+       }
+
+       /**
+        * Calculates the hyperbolic sine of a given value.
+        */
+       public OutType sinh() {
+               return toApiSpecificExpression(unresolvedCall(SINH, toExpr()));
+       }
+
+       /**
+        * Calculates the smallest integer greater than or equal to a given 
number.
+        */
+       public OutType ceil() {
+               return toApiSpecificExpression(unresolvedCall(CEIL, toExpr()));
+       }
+
+       /**
+        * Calculates the sine of a given number.
+        */
+       public OutType sin() {
+               return toApiSpecificExpression(unresolvedCall(SIN, toExpr()));
+       }
+
+       /**
+        * Calculates the cosine of a given number.
+        */
+       public OutType cos() {
+               return toApiSpecificExpression(unresolvedCall(COS, toExpr()));
+       }
+
+       /**
+        * Calculates the tangent of a given number.
+        */
+       public OutType tan() {
+               return toApiSpecificExpression(unresolvedCall(TAN, toExpr()));
+       }
+
+       /**
+        * Calculates the cotangent of a given number.
+        */
+       public OutType cot() {
+               return toApiSpecificExpression(unresolvedCall(COT, toExpr()));
+       }
+
+       /**
+        * Calculates the arc sine of a given number.
+        */
+       public OutType asin() {
+               return toApiSpecificExpression(unresolvedCall(ASIN, toExpr()));
+       }
+
+       /**
+        * Calculates the arc cosine of a given number.
+        */
+       public OutType acos() {
+               return toApiSpecificExpression(unresolvedCall(ACOS, toExpr()));
+       }
+
+       /**
+        * Calculates the arc tangent of a given number.
+        */
+       public OutType atan() {
+               return toApiSpecificExpression(unresolvedCall(ATAN, toExpr()));
+       }
+
+       /**
+        * Calculates the hyperbolic tangent of a given number.
+        */
+       public OutType tanh() {
+               return toApiSpecificExpression(unresolvedCall(TANH, toExpr()));
+       }
+
+       /**
+        * Converts numeric from radians to degrees.
+        */
+       public OutType degrees() {
+               return toApiSpecificExpression(unresolvedCall(DEGREES, 
toExpr()));
+       }
+
+       /**
+        * Converts numeric from degrees to radians.
+        */
+       public OutType radians() {
+               return toApiSpecificExpression(unresolvedCall(RADIANS, 
toExpr()));
+       }
+
+       /**
+        * Calculates the signum of a given number.
+        */
+       public OutType sign() {
+               return toApiSpecificExpression(unresolvedCall(SIGN, toExpr()));
+       }
+
+       /**
+        * Rounds the given number to integer places right to the decimal point.
+        */
+       public OutType round(InType places) {
+               return toApiSpecificExpression(unresolvedCall(ROUND, toExpr(), 
objectToExpression(places)));
+       }
+
+       /**
+        * Returns a string representation of an integer numeric value in 
binary format. Returns null if
+        * numeric is null. E.g. "4" leads to "100", "12" leads to "1100".
+        */
+       public OutType bin() {
+               return toApiSpecificExpression(unresolvedCall(BIN, toExpr()));
+       }
+
+       /**
+        * Returns a string representation of an integer numeric value or a 
string in hex format. Returns
+        * null if numeric or string is null.
+        *
+        * <p>E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and 
a string "hello,world" leads
+        * to "68656c6c6f2c776f726c64".
+        */
+       public OutType hex() {
+               return toApiSpecificExpression(unresolvedCall(HEX, toExpr()));
+       }
+
+       /**
+        * Returns a number of truncated to n decimal places.
+        * If n is 0,the result has no decimal point or fractional part.
+        * n can be negative to cause n digits left of the decimal point of the 
value to become zero.
+        * E.g. truncate(42.345, 2) to 42.34.
+        */
+       public OutType truncate(InType n) {
+               return toApiSpecificExpression(unresolvedCall(TRUNCATE, 
toExpr(), objectToExpression(n)));
+       }
+
+       /**
+        * Returns a number of truncated to 0 decimal places.
+        * E.g. truncate(42.345) to 42.0.
+        */
+       public OutType truncate() {
+               return toApiSpecificExpression(unresolvedCall(TRUNCATE, 
toExpr()));
+       }
+
+       // String operations
+
+       /**
+        * Creates a substring of the given string at given index for a given 
length.
+        *
+        * @param beginIndex first character of the substring (starting at 1, 
inclusive)
+        * @param length number of characters of the substring
+        */
+       public OutType substring(InType beginIndex, InType length) {
+               return toApiSpecificExpression(unresolvedCall(SUBSTRING, 
toExpr(), objectToExpression(beginIndex), objectToExpression(length)));
+       }
+
+       /**
+        * Creates a substring of the given string beginning at the given index 
to the end.
+        *
+        * @param beginIndex first character of the substring (starting at 1, 
inclusive)
+        */
+       public OutType substring(InType beginIndex) {
+               return toApiSpecificExpression(unresolvedCall(SUBSTRING, 
toExpr(), objectToExpression(beginIndex)));
+       }
+
+       /**
+        * Removes leading space characters from the given string.
+        */
+       public OutType trimLeading() {
+               return toApiSpecificExpression(unresolvedCall(
+                       TRIM,
+                       valueLiteral(true),
+                       valueLiteral(false),
+                       valueLiteral(" "),
+                       toExpr()));
+       }
+
+       /**
+        * Removes leading characters from the given string.
+        *
+        * @param character string containing the character
+        */
+       public OutType trimLeading(InType character) {
+               return toApiSpecificExpression(unresolvedCall(
+                       TRIM,
+                       valueLiteral(true),
+                       valueLiteral(false),
+                       objectToExpression(character),
+                       toExpr()));
+       }
+
+       /**
+        * Removes trailing space characters from the given string.
+        */
+       public OutType trimTrailing() {
+               return toApiSpecificExpression(unresolvedCall(
+                       TRIM,
+                       valueLiteral(false),
+                       valueLiteral(true),
+                       valueLiteral(" "),
+                       toExpr()));
+       }
+
+       /**
+        * Removes trailing characters from the given string.
+        *
+        * @param character string containing the character
+        */
+       public OutType trimTrailing(InType character) {
+               return toApiSpecificExpression(unresolvedCall(
+                       TRIM,
+                       valueLiteral(false),
+                       valueLiteral(true),
+                       objectToExpression(character),
+                       toExpr()));
+       }
+
+       /**
+        * Removes leading and trailing space characters from the given string.
+        */
+       public OutType trim() {
+               return toApiSpecificExpression(unresolvedCall(
+                       TRIM,
+                       valueLiteral(true),
+                       valueLiteral(true),
+                       valueLiteral(" "),
+                       toExpr()));
+       }
+
+       /**
+        * Removes leading and trailing characters from the given string.
+        *
+        * @param character string containing the character
+        */
+       public OutType trim(InType character) {
+               return toApiSpecificExpression(unresolvedCall(
+                       TRIM,
+                       valueLiteral(true),
+                       valueLiteral(true),
+                       objectToExpression(character),
+                       toExpr()));
+       }
+
+       /**
+        * Returns a new string which replaces all the occurrences of the 
search target
+        * with the replacement string (non-overlapping).
+        */
+       public OutType replace(InType search, InType replacement) {
+               return toApiSpecificExpression(unresolvedCall(REPLACE, 
toExpr(), objectToExpression(search), objectToExpression(replacement)));
+       }
+
+       /**
+        * Returns the length of a string.
+        */
+       public OutType charLength() {
+               return toApiSpecificExpression(unresolvedCall(CHAR_LENGTH, 
toExpr()));
+       }
+
+       /**
+        * Returns all of the characters in a string in upper case using the 
rules of
+        * the default locale.
+        */
+       public OutType upperCase() {
+               return toApiSpecificExpression(unresolvedCall(UPPER, toExpr()));
+       }
+
+       /**
+        * Returns all of the characters in a string in lower case using the 
rules of
+        * the default locale.
+        */
+       public OutType lowerCase() {
+               return toApiSpecificExpression(unresolvedCall(LOWER, toExpr()));
+       }
+
+       /**
+        * Converts the initial letter of each word in a string to uppercase.
+        * Assumes a string containing only [A-Za-z0-9], everything else is 
treated as whitespace.
+        */
+       public OutType initCap() {
+               return toApiSpecificExpression(unresolvedCall(INIT_CAP, 
toExpr()));
+       }
+
+       /**
+        * Returns true, if a string matches the specified LIKE pattern.
+        *
+        * <p>e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary 
letter)n"
+        */
+       public OutType like(InType pattern) {
+               return toApiSpecificExpression(unresolvedCall(LIKE, toExpr(), 
objectToExpression(pattern)));
+       }
+
+       /**
+        * Returns true, if a string matches the specified SQL regex pattern.
+        *
+        * <p>e.g. "A+" matches all strings that consist of at least one A
+        */
+       public OutType similar(InType pattern) {
+               return toApiSpecificExpression(unresolvedCall(SIMILAR, 
toExpr(), objectToExpression(pattern)));
+       }
+
+       /**
+        * Returns the position of string in an other string starting at 1.
+        * Returns 0 if string could not be found.
+        *
+        * <p>e.g. lit("a").position("bbbbba") leads to 6
+        */
+       public OutType position(InType haystack) {
+               return toApiSpecificExpression(unresolvedCall(POSITION, 
toExpr(), objectToExpression(haystack)));
+       }
+
+       /**
+        * Returns a string left-padded with the given pad string to a length 
of len characters. If
+        * the string is longer than len, the return value is shortened to len 
characters.
+        *
+        * <p>e.g. lit("hi").lpad(4, "??") returns "??hi",  lit("hi").lpad(1, 
'??') returns "h"
+        */
+       public OutType lpad(InType len, InType pad) {
+               return toApiSpecificExpression(unresolvedCall(LPAD, toExpr(), 
objectToExpression(len), objectToExpression(pad)));
+       }
+
+       /**
+        * Returns a string right-padded with the given pad string to a length 
of len characters. If
+        * the string is longer than len, the return value is shortened to len 
characters.
+        *
+        * <p>e.g. lit("hi").rpad(4, "??") returns "hi??",  lit("hi").rpad(1, 
'??') returns "h"
+        */
+       public OutType rpad(InType len, InType pad) {
+               return toApiSpecificExpression(unresolvedCall(RPAD, toExpr(), 
objectToExpression(len), objectToExpression(pad)));
+       }
+
+       /**
+        * Defines an aggregation to be used for a previously specified over 
window.
+        *
+        * <p>For example:
+        *
+        * <pre>
+        * {@code
+        * table
+        *   .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows 
following CURRENT_ROW as 'w)
+        *   .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
+        * }</pre>
+        */
+       public OutType over(InType alias) {
+               return toApiSpecificExpression(unresolvedCall(OVER, toExpr(), 
objectToExpression(alias)));
+       }
+
+       /**
+        * Replaces a substring of string with a string starting at a position 
(starting at 1).
+        *
+        * <p>e.g. lit("xxxxxtest").overlay("xxxx", 6) leads to "xxxxxxxxx"
+        */
+       public OutType overlay(InType newString, InType starting) {
+               return toApiSpecificExpression(unresolvedCall(
+                       OVERLAY,
+                       toExpr(),
+                       objectToExpression(newString),
+                       objectToExpression(starting)));
+       }
+
+       /**
+        * Replaces a substring of string with a string starting at a position 
(starting at 1).
+        * The length specifies how many characters should be removed.
+        *
+        * <p>e.g. lit("xxxxxtest").overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
+        */
+       public OutType overlay(InType newString, InType starting, InType 
length) {
+               return toApiSpecificExpression(unresolvedCall(
+                       OVERLAY,
+                       toExpr(),
+                       objectToExpression(newString),
+                       objectToExpression(starting),
+                       objectToExpression(length)));
+       }
+
+       /**
+        * Returns a string with all substrings that match the regular 
expression consecutively
+        * being replaced.
+        */
+       public OutType regexpReplace(InType regex, InType replacement) {
+               return toApiSpecificExpression(unresolvedCall(
+                       REGEXP_REPLACE,
+                       toExpr(),
+                       objectToExpression(regex),
+                       objectToExpression(replacement)));
+       }
+
+       /**
+        * Returns a string extracted with a specified regular expression and a 
regex match group
+        * index.
+        */
+       public OutType regexpExtract(InType regex, InType extractIndex) {
+               return toApiSpecificExpression(unresolvedCall(
+                       REGEXP_EXTRACT,
+                       toExpr(),
+                       objectToExpression(regex),
+                       objectToExpression(extractIndex)));
+       }
+
+       /**
+        * Returns a string extracted with a specified regular expression.
+        */
+       public OutType regexpExtract(InType regex) {
+               return toApiSpecificExpression(unresolvedCall(REGEXP_EXTRACT, 
toExpr(), objectToExpression(regex)));
+       }
+
+       /**
+        * Returns the base string decoded with base64.
+        */
+       public OutType fromBase64() {
+               return toApiSpecificExpression(unresolvedCall(FROM_BASE64, 
toExpr()));
+       }
+
+       /**
+        * Returns the base64-encoded result of the input string.
+        */
+       public OutType toBase64() {
+               return toApiSpecificExpression(unresolvedCall(TO_BASE64, 
toExpr()));
+       }
+
+       /**
+        * Returns a string that removes the left whitespaces from the given 
string.
+        */
+       public OutType ltrim() {
+               return toApiSpecificExpression(unresolvedCall(LTRIM, toExpr()));
+       }
+
+       /**
+        * Returns a string that removes the right whitespaces from the given 
string.
+        */
+       public OutType rtrim() {
+               return toApiSpecificExpression(unresolvedCall(RTRIM, toExpr()));
+       }
+
+       /**
+        * Returns a string that repeats the base string n times.
+        */
+       public OutType repeat(InType n) {
+               return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(), 
objectToExpression(n)));
+       }
+
+       // Temporal operations
+
+       /**
+        * Parses a date string in the form "yyyy-MM-dd" to a SQL Date.
+        */
+       public OutType toDate() {
+               return toApiSpecificExpression(unresolvedCall(
+                       CAST,
+                       toExpr(),
+                       
typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE))));
+       }
+
+       /**
+        * Parses a time string in the form "HH:mm:ss" to a SQL Time.
+        */
+       public OutType toTime() {
+               return toApiSpecificExpression(unresolvedCall(
+                       CAST,
+                       toExpr(),
+                       
typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME))));
+       }
+
+       /**
+        * Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to 
a SQL Timestamp.
+        */
+       public OutType toTimestamp() {
+               return toApiSpecificExpression(unresolvedCall(
+                       CAST,
+                       toExpr(),
+                       
typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP))));
+       }
+
+       /**
+        * Extracts parts of a time point or time interval. Returns the part as 
a long value.
+        *
+        * <p>e.g. lit("2006-06-05").toDate().extract(DAY) leads to 5
+        */
+       public OutType extract(TimeIntervalUnit timeIntervalUnit) {
+               return toApiSpecificExpression(unresolvedCall(EXTRACT, 
valueLiteral(timeIntervalUnit), toExpr()));
+       }
+
+       /**
+        * Rounds down a time point to the given unit.
+        *
+        * <p>e.g. lit("12:44:31").toDate().floor(MINUTE) leads to 12:44:00
+        */
+       public OutType floor(TimeIntervalUnit timeIntervalUnit) {
+               return toApiSpecificExpression(unresolvedCall(FLOOR, 
valueLiteral(timeIntervalUnit), toExpr()));
+       }
+
+       /**
+        * Rounds up a time point to the given unit.
+        *
+        * <p>e.g. lit("12:44:31").toDate().ceil(MINUTE) leads to 12:45:00
+        */
+       public OutType ceil(TimeIntervalUnit timeIntervalUnit) {
+               return toApiSpecificExpression(unresolvedCall(
+                       CEIL,
+                       valueLiteral(timeIntervalUnit),
+                       toExpr()));
+       }
+
+       // Advanced type helper functions
+
+       /**
+        * Accesses the field of a Flink composite type (such as Tuple, POJO, 
etc.) by name and
+        * returns it's value.
+        *
+        * @param name name of the field (similar to Flink's field expressions)
+        */
+       public OutType get(String name) {
+               return toApiSpecificExpression(unresolvedCall(GET, toExpr(), 
valueLiteral(name)));
+       }
+
+       /**
+        * Accesses the field of a Flink composite type (such as Tuple, POJO, 
etc.) by index and
+        * returns it's value.
+        *
+        * @param index position of the field
+        */
+       public OutType get(int index) {
+               return toApiSpecificExpression(unresolvedCall(GET, toExpr(), 
valueLiteral(index)));
+       }
+
+       /**
+        * Converts a Flink composite type (such as Tuple, POJO, etc.) and all 
of its direct subtypes
+        * into a flat representation where every subtype is a separate field.
+        */
+       public OutType flatten() {
+               return toApiSpecificExpression(unresolvedCall(FLATTEN, 
toExpr()));
+       }
+
+       /**
+        * Accesses the element of an array or map based on a key or an index 
(starting at 1).
+        *
+        * @param index key or position of the element (array index starting at 
1)
+        */
+       public OutType at(InType index) {
+               return toApiSpecificExpression(unresolvedCall(AT, toExpr(), 
objectToExpression(index)));
+       }
+
+       /**
+        * Returns the number of elements of an array or number of entries of a 
map.
+        */
+       public OutType cardinality() {
+               return toApiSpecificExpression(unresolvedCall(CARDINALITY, 
toExpr()));
+       }
+
+       /**
+        * Returns the sole element of an array with a single element. Returns 
null if the array is
+        * empty. Throws an exception if the array has more than one element.
+        */
+       public OutType element() {
+               return toApiSpecificExpression(unresolvedCall(ARRAY_ELEMENT, 
toExpr()));
+       }
+
+       // Time definition
+
+       /**
+        * Declares a field as the rowtime attribute for indicating, accessing, 
and working in
+        * Flink's event time.
+        */
+       public OutType rowtime() {
+               return toApiSpecificExpression(unresolvedCall(ROWTIME, 
toExpr()));
+       }
+
+       /**
+        * Declares a field as the proctime attribute for indicating, 
accessing, and working in
+        * Flink's processing time.
+        */
+       public OutType proctime() {
+               return toApiSpecificExpression(unresolvedCall(PROCTIME, 
toExpr()));
+       }
+
+       /**
+        * Creates an interval of the given number of years.
+        *
+        * <p>The produced expression is of type {@code DataTypes.INTERVAL}
+        */
+       public OutType year() {
+               return toApiSpecificExpression(toMonthInterval(toExpr(), 12));
+       }
+
+       /**
+        * Creates an interval of the given number of years.
+        */
+       public OutType years() {
+               return year();
+       }
+
+       /**
+        * Creates an interval of the given number of quarters.
+        */
+       public OutType quarter() {
+               return toApiSpecificExpression(toMonthInterval(toExpr(), 3));
+       }
+
+       /**
+        * Creates an interval of the given number of quarters.
+        */
+       public OutType quarters() {
+               return quarter();
+       }
+
+       /**
+        * Creates an interval of the given number of months.
+        */
+       public OutType month() {
+               return toApiSpecificExpression(toMonthInterval(toExpr(), 1));
+       }
+
+       /**
+        * Creates an interval of the given number of months.
+        */
+       public OutType months() {
+               return month();
+       }
+
+       /**
+        * Creates an interval of the given number of weeks.
+        */
+       public OutType week() {
+               return toApiSpecificExpression(toMilliInterval(toExpr(), 7 * 
MILLIS_PER_DAY));
+       }
+
+       /**
+        * Creates an interval of the given number of weeks.
+        */
+       public OutType weeks() {
+               return week();
+       }
+
+       /**
+        * Creates an interval of the given number of days.
+        */
+       public OutType day() {
+               return toApiSpecificExpression(toMilliInterval(toExpr(), 
MILLIS_PER_DAY));
+       }
+
+       /**
+        * Creates an interval of the given number of days.
+        */
+       public OutType days() {
+               return day();
+       }
+
+       /**
+        * Creates an interval of the given number of hours.
+        */
+       public OutType hour() {
+               return toApiSpecificExpression(toMilliInterval(toExpr(), 
MILLIS_PER_HOUR));
+       }
+
+       /**
+        * Creates an interval of the given number of hours.
+        */
+       public OutType hours() {
+               return hour();
+       }
+
+       /**
+        * Creates an interval of the given number of minutes.
+        */
+       public OutType minute() {
+               return toApiSpecificExpression(toMilliInterval(toExpr(), 
MILLIS_PER_MINUTE));
+       }
+
+       /**
+        * Creates an interval of the given number of minutes.
+        */
+       public OutType minutes() {
+               return minute();
+       }
+
+       /**
+        * Creates an interval of the given number of seconds.
+        */
+       public OutType second() {
+               return toApiSpecificExpression(toMilliInterval(toExpr(), 
MILLIS_PER_SECOND));
+       }
+
+       /**
+        * Creates an interval of the given number of seconds.
+        */
+       public OutType seconds() {
+               return second();
+       }
+
+       /**
+        * Creates an interval of the given number of milliseconds.
+        */
+       public OutType milli() {
+               return toApiSpecificExpression(toMilliInterval(toExpr(), 1));
+       }
+
+       /**
+        * Creates an interval of the given number of milliseconds.
+        */
+       public OutType millis() {
+               return milli();
+       }
+
+       // Hash functions
+
+       /**
+        * Returns the MD5 hash of the string argument; null if string is null.
+        *
+        * @return string of 32 hexadecimal digits or null
+        */
+       public OutType md5() {
+               return toApiSpecificExpression(unresolvedCall(MD5, toExpr()));
+       }
+
+       /**
+        * Returns the SHA-1 hash of the string argument; null if string is 
null.
+        *
+        * @return string of 40 hexadecimal digits or null
+        */
+       public OutType sha1() {
+               return toApiSpecificExpression(unresolvedCall(SHA1, toExpr()));
+       }
+
+       /**
+        * Returns the SHA-224 hash of the string argument; null if string is 
null.
+        *
+        * @return string of 56 hexadecimal digits or null
+        */
+       public OutType sha224() {
+               return toApiSpecificExpression(unresolvedCall(SHA224, 
toExpr()));
+       }
+
+       /**
+        * Returns the SHA-256 hash of the string argument; null if string is 
null.
+        *
+        * @return string of 64 hexadecimal digits or null
+        */
+       public OutType sha256() {
+               return toApiSpecificExpression(unresolvedCall(SHA256, 
toExpr()));
+       }
+
+       /**
+        * Returns the SHA-384 hash of the string argument; null if string is 
null.
+        *
+        * @return string of 96 hexadecimal digits or null
+        */
+       public OutType sha384() {
+               return toApiSpecificExpression(unresolvedCall(SHA384, 
toExpr()));
+       }
+
+       /**
+        * Returns the SHA-512 hash of the string argument; null if string is 
null.
+        *
+        * @return string of 128 hexadecimal digits or null
+        */
+       public OutType sha512() {
+               return toApiSpecificExpression(unresolvedCall(SHA512, 
toExpr()));
+       }
+
+       /**
+        * Returns the hash for the given string expression using the SHA-2 
family of hash
+        * functions (SHA-224, SHA-256, SHA-384, or SHA-512).
+        *
+        * @param hashLength bit length of the result (either 224, 256, 384, or 
512)
+        * @return string or null if one of the arguments is null.
+        */
+       public OutType sha2(InType hashLength) {
+               return toApiSpecificExpression(unresolvedCall(SHA2, toExpr(), 
objectToExpression(hashLength)));
+       }
+}
+
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 2b545ab..2333ffa 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -119,9 +119,7 @@ public class TableImpl implements Table {
 
        @Override
        public Table select(Expression... fields) {
-               List<Expression> expressionsWithResolvedCalls = 
Arrays.stream(fields)
-                       .map(f -> f.accept(lookupResolver))
-                       .collect(Collectors.toList());
+               List<Expression> expressionsWithResolvedCalls = 
preprocessExpressions(fields);
                CategorizedExpressions extracted = 
OperationExpressionsUtils.extractAggregationsAndProperties(
                        expressionsWithResolvedCalls
                );
@@ -464,9 +462,7 @@ public class TableImpl implements Table {
        }
 
        private Table addColumnsOperation(boolean replaceIfExist, 
List<Expression> fields) {
-               List<Expression> expressionsWithResolvedCalls = fields.stream()
-                       .map(f -> f.accept(lookupResolver))
-                       .collect(Collectors.toList());
+               List<Expression> expressionsWithResolvedCalls = 
preprocessExpressions(fields);
                CategorizedExpressions extracted = 
OperationExpressionsUtils.extractAggregationsAndProperties(
                        expressionsWithResolvedCalls
                );
@@ -563,6 +559,16 @@ public class TableImpl implements Table {
                return new TableImpl(tableEnvironment, operation, 
operationTreeBuilder, lookupResolver);
        }
 
+       private List<Expression> preprocessExpressions(List<Expression> 
expressions) {
+               return preprocessExpressions(expressions.toArray(new 
Expression[0]));
+       }
+
+       private List<Expression> preprocessExpressions(Expression[] 
expressions) {
+               return Arrays.stream(expressions)
+                       .map(f -> f.accept(lookupResolver))
+                       .collect(Collectors.toList());
+       }
+
        private static final class GroupedTableImpl implements GroupedTable {
 
                private final TableImpl table;
@@ -582,9 +588,7 @@ public class TableImpl implements Table {
 
                @Override
                public Table select(Expression... fields) {
-                       List<Expression> expressionsWithResolvedCalls = 
Arrays.stream(fields)
-                               .map(f -> f.accept(table.lookupResolver))
-                               .collect(Collectors.toList());
+                       List<Expression> expressionsWithResolvedCalls = 
table.preprocessExpressions(fields);
                        CategorizedExpressions extracted = 
OperationExpressionsUtils.extractAggregationsAndProperties(
                                expressionsWithResolvedCalls
                        );
@@ -716,7 +720,8 @@ public class TableImpl implements Table {
 
                @Override
                public WindowGroupedTable groupBy(Expression... fields) {
-                       List<Expression> fieldsWithoutWindow = 
Arrays.stream(fields)
+                       List<Expression> fieldsWithoutWindow = 
table.preprocessExpressions(fields)
+                               .stream()
                                .filter(f -> !window.getAlias().equals(f))
                                .collect(Collectors.toList());
                        if (fields.length != fieldsWithoutWindow.size() + 1) {
@@ -749,9 +754,7 @@ public class TableImpl implements Table {
 
                @Override
                public Table select(Expression... fields) {
-                       List<Expression> expressionsWithResolvedCalls = 
Arrays.stream(fields)
-                               .map(f -> f.accept(table.lookupResolver))
-                               .collect(Collectors.toList());
+                       List<Expression> expressionsWithResolvedCalls = 
table.preprocessExpressions(fields);
                        CategorizedExpressions extracted = 
OperationExpressionsUtils.extractAggregationsAndProperties(
                                expressionsWithResolvedCalls
                        );
@@ -816,9 +819,7 @@ public class TableImpl implements Table {
 
                @Override
                public Table select(Expression... fields) {
-                       List<Expression> expressionsWithResolvedCalls = 
Arrays.stream(fields)
-                               .map(f -> f.accept(table.lookupResolver))
-                               .collect(Collectors.toList());
+                       List<Expression> expressionsWithResolvedCalls = 
table.preprocessExpressions(fields);
                        CategorizedExpressions extracted = 
OperationExpressionsUtils.extractAggregationsAndProperties(
                                expressionsWithResolvedCalls
                        );
@@ -873,9 +874,7 @@ public class TableImpl implements Table {
 
                @Override
                public Table select(Expression... fields) {
-                       List<Expression> expressionsWithResolvedCalls = 
Arrays.stream(fields)
-                               .map(f -> f.accept(table.lookupResolver))
-                               .collect(Collectors.toList());
+                       List<Expression> expressionsWithResolvedCalls = 
table.preprocessExpressions(fields);
                        CategorizedExpressions extracted = 
OperationExpressionsUtils.extractAggregationsAndProperties(
                                expressionsWithResolvedCalls
                        );
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
index ec57fd5..92f8d83 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.ValidationException;
@@ -31,6 +32,7 @@ import org.apache.flink.table.types.DataType;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Utilities for API-specific {@link Expression}s.
@@ -50,6 +52,24 @@ public final class ApiExpressionUtils {
                // private
        }
 
+       public static Expression objectToExpression(Object expression) {
+               if (expression instanceof ApiExpression) {
+                       return ((ApiExpression) expression).toExpr();
+               } else if (expression instanceof Expression) {
+                       return (Expression) expression;
+               } else {
+                       return valueLiteral(expression);
+               }
+       }
+
+       public static Expression unwrapFromApi(Expression expression) {
+               if (expression instanceof ApiExpression) {
+                       return ((ApiExpression) expression).toExpr();
+               } else {
+                       return expression;
+               }
+       }
+
        public static LocalReferenceExpression localRef(String name, DataType 
dataType) {
                return new LocalReferenceExpression(name, dataType);
        }
@@ -74,14 +94,17 @@ public final class ApiExpressionUtils {
                        FunctionIdentifier functionIdentifier,
                        FunctionDefinition functionDefinition,
                        Expression... args) {
-               return new UnresolvedCallExpression(functionIdentifier, 
functionDefinition, Arrays.asList(args));
+               return unresolvedCall(functionIdentifier, functionDefinition, 
Arrays.asList(args));
        }
 
        public static UnresolvedCallExpression unresolvedCall(
                        FunctionIdentifier functionIdentifier,
                        FunctionDefinition functionDefinition,
                        List<Expression> args) {
-               return new UnresolvedCallExpression(functionIdentifier, 
functionDefinition, args);
+               return new UnresolvedCallExpression(functionIdentifier, 
functionDefinition,
+                       args.stream()
+                               .map(ApiExpressionUtils::unwrapFromApi)
+                               .collect(Collectors.toList()));
        }
 
        public static UnresolvedCallExpression 
unresolvedCall(FunctionDefinition functionDefinition, Expression... args) {
@@ -89,7 +112,11 @@ public final class ApiExpressionUtils {
        }
 
        public static UnresolvedCallExpression 
unresolvedCall(FunctionDefinition functionDefinition, List<Expression> args) {
-               return new UnresolvedCallExpression(functionDefinition, args);
+               return new UnresolvedCallExpression(
+                       functionDefinition,
+                       args.stream()
+                               .map(ApiExpressionUtils::unwrapFromApi)
+                               .collect(Collectors.toList()));
        }
 
        public static TableReferenceExpression tableRef(String name, Table 
table) {
@@ -101,7 +128,11 @@ public final class ApiExpressionUtils {
        }
 
        public static LookupCallExpression lookupCall(String name, 
Expression... args) {
-               return new LookupCallExpression(name, Arrays.asList(args));
+               return new LookupCallExpression(
+                       name,
+                       Arrays.stream(args)
+                               .map(ApiExpressionUtils::unwrapFromApi)
+                               .collect(Collectors.toList()));
        }
 
        public static Expression toMonthInterval(Expression e, int multiplier) {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java
index d30efd5..ef0663a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/LookupCallExpression.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -43,7 +42,7 @@ public final class LookupCallExpression implements Expression 
{
 
        LookupCallExpression(String unresolvedFunction, List<Expression> args) {
                this.unresolvedName = 
Preconditions.checkNotNull(unresolvedFunction);
-               this.args = Collections.unmodifiableList(new 
ArrayList<>(Preconditions.checkNotNull(args)));
+               this.args = 
Collections.unmodifiableList(Preconditions.checkNotNull(args));
        }
 
        public String getUnresolvedName() {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
index 7fdf57d..a280d0e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
@@ -26,7 +26,6 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -61,8 +60,7 @@ public final class UnresolvedCallExpression implements 
Expression {
                        Preconditions.checkNotNull(functionIdentifier, 
"Function identifier must not be null.");
                this.functionDefinition =
                        Preconditions.checkNotNull(functionDefinition, 
"Function definition must not be null.");
-               this.args = Collections.unmodifiableList(
-                       new ArrayList<>(Preconditions.checkNotNull(args, 
"Arguments must not be null.")));
+               this.args = 
Collections.unmodifiableList(Preconditions.checkNotNull(args, "Arguments must 
not be null."));
        }
 
        UnresolvedCallExpression(
@@ -71,8 +69,7 @@ public final class UnresolvedCallExpression implements 
Expression {
                this.functionIdentifier = null;
                this.functionDefinition =
                        Preconditions.checkNotNull(functionDefinition, 
"Function definition must not be null.");
-               this.args = Collections.unmodifiableList(
-                       new ArrayList<>(Preconditions.checkNotNull(args, 
"Arguments must not be null.")));
+               this.args = 
Collections.unmodifiableList(Preconditions.checkNotNull(args, "Arguments must 
not be null."));
        }
 
        public Optional<FunctionIdentifier> getFunctionIdentifier() {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
index 434697d..59ef852 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/ExpressionResolver.java
@@ -77,6 +77,7 @@ public class ExpressionResolver {
         */
        public static List<ResolverRule> getExpandingResolverRules() {
                return Arrays.asList(
+                       ResolverRules.UNWRAP_API_EXPRESSION,
                        ResolverRules.LOOKUP_CALL_BY_NAME,
                        ResolverRules.FLATTEN_STAR_REFERENCE,
                        ResolverRules.EXPAND_COLUMN_FUNCTIONS);
@@ -87,6 +88,7 @@ public class ExpressionResolver {
         */
        public static List<ResolverRule> getAllResolverRules() {
                return Arrays.asList(
+                       ResolverRules.UNWRAP_API_EXPRESSION,
                        ResolverRules.LOOKUP_CALL_BY_NAME,
                        ResolverRules.FLATTEN_STAR_REFERENCE,
                        ResolverRules.EXPAND_COLUMN_FUNCTIONS,
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
index 93471ef..3071fa6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/LookupCallResolver.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions.resolver;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.expressions.Expression;
@@ -65,6 +66,17 @@ public class LookupCallResolver extends 
ApiExpressionDefaultVisitor<Expression>
        }
 
        @Override
+       public Expression visitNonApiExpression(Expression other) {
+               // LookupCallResolver might be called outside of 
ExpressionResolver, thus we need to additionally
+               // handle the ApiExpressions here
+               if (other instanceof ApiExpression) {
+                       return ((ApiExpression) other).toExpr().accept(this);
+               } else {
+                       return defaultMethod(other);
+               }
+       }
+
+       @Override
        protected Expression defaultMethod(Expression expression) {
                return expression;
        }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
index 42dc0757..3acb5af 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/OverWindowResolverRule.java
@@ -55,7 +55,7 @@ final class OverWindowResolverRule implements ResolverRule {
                        .collect(Collectors.toList());
        }
 
-       private class ExpressionResolverVisitor extends 
RuleExpressionVisitor<Expression> {
+       private static class ExpressionResolverVisitor extends 
RuleExpressionVisitor<Expression> {
 
                ExpressionResolverVisitor(ResolutionContext context) {
                        super(context);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
index 671b1e1..aab2272 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolverRules.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.expressions.resolver.rules;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 
 /**
@@ -62,6 +63,11 @@ public final class ResolverRules {
         */
        public static final ResolverRule QUALIFY_BUILT_IN_FUNCTIONS = new 
QualifyBuiltInFunctionsRule();
 
+       /**
+        * Unwraps all {@link ApiExpression}.
+        */
+       public static final ResolverRule UNWRAP_API_EXPRESSION = new 
UnwrapApiExpressionRule();
+
        private ResolverRules() {
        }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java
similarity index 59%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
copy to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java
index 5180d33..1ef374b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/UnwrapApiExpressionRule.java
@@ -16,25 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.api;
+package org.apache.flink.table.expressions.resolver.rules;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ApiExpression;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 /**
- * Tumbling window on time with alias. Fully specifies a window.
+ * Unwraps all {@link ApiExpression}.
  */
-@PublicEvolving
-public final class TumbleWithSizeOnTimeWithAlias extends GroupWindow {
-
-       private final Expression size;
-
-       TumbleWithSizeOnTimeWithAlias(Expression alias, Expression timeField, 
Expression size) {
-               super(alias, timeField);
-               this.size = size;
-       }
-
-       public Expression getSize() {
-               return size;
+@Internal
+final class UnwrapApiExpressionRule implements ResolverRule {
+       @Override
+       public List<Expression> apply(
+                       List<Expression> expression,
+                       ResolutionContext context) {
+               return 
expression.stream().map(ApiExpressionUtils::unwrapFromApi).collect(Collectors.toList());
        }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
index 4877bc4..3a3b0a0 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
@@ -154,7 +155,7 @@ public class FieldInfoUtils {
         * used if the input type has a defined field order (tuple, case class, 
Row) and no of fields
         * references a field of the input type.
         */
-       public static boolean isReferenceByPosition(CompositeType<?> ct, 
Expression[] fields) {
+       private static boolean isReferenceByPosition(CompositeType<?> ct, 
Expression[] fields) {
                if (!(ct instanceof TupleTypeInfoBase)) {
                        return false;
                }
@@ -224,7 +225,10 @@ public class FieldInfoUtils {
        public static <A> TypeInfoSchema getFieldsInfo(TypeInformation<A> 
inputType, Expression[] expressions) {
                validateInputTypeInfo(inputType);
 
-               final List<FieldInfo> fieldInfos = 
extractFieldInformation(inputType, expressions);
+               final List<FieldInfo> fieldInfos = extractFieldInformation(
+                       inputType,
+                       
Arrays.stream(expressions).map(ApiExpressionUtils::unwrapFromApi).toArray(Expression[]::new)
+               );
 
                validateNoStarReference(fieldInfos);
                boolean isRowtimeAttribute = 
checkIfRowtimeAttribute(fieldInfos);
@@ -244,8 +248,8 @@ public class FieldInfoUtils {
        }
 
        private static <A> List<FieldInfo> extractFieldInformation(
-               TypeInformation<A> inputType,
-               Expression[] exprs) {
+                       TypeInformation<A> inputType,
+                       Expression[] exprs) {
                final List<FieldInfo> fieldInfos;
                if (inputType instanceof GenericTypeInfo && 
inputType.getTypeClass() == Row.class) {
                        throw new ValidationException(
@@ -367,7 +371,7 @@ public class FieldInfoUtils {
                final TypeInformation<?>[] fieldTypes;
                if (inputType instanceof CompositeType) {
                        int arity = inputType.getArity();
-                       CompositeType ct = (CompositeType<?>) inputType;
+                       CompositeType<?> ct = (CompositeType<?>) inputType;
                        fieldTypes = IntStream.range(0, 
arity).mapToObj(ct::getTypeAt).toArray(TypeInformation[]::new);
                } else {
                        fieldTypes = new TypeInformation[]{inputType};
diff --git 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
index cdf4042..cb71dc7 100644
--- 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
+++ 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
@@ -17,105 +17,82 @@
  */
 package org.apache.flink.table.api
 
-import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Integer => JInteger, Long => JLong, Short => JShort}
-import java.math.{BigDecimal => JBigDecimal}
-import java.sql.{Date, Time, Timestamp}
-import java.time.{LocalDate, LocalDateTime, LocalTime}
-
 import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.internal.BaseExpressions
+import org.apache.flink.table.expressions.ApiExpressionUtils._
 import org.apache.flink.table.expressions._
-import ApiExpressionUtils._
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction, 
UserDefinedAggregateFunction, UserDefinedFunctionHelper, _}
 import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.utils.TypeConversions
-import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Integer => JInteger, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Date, Time, Timestamp}
+import java.time.{LocalDate, LocalDateTime, LocalTime}
 
 import _root_.scala.language.implicitConversions
 
 /**
   * These are all the operations that can be used to construct an 
[[Expression]] AST for
   * expression operations.
-  *
-  * These operations must be kept in sync with the parser in
-  * [[org.apache.flink.table.expressions.ExpressionParser]].
   */
 @PublicEvolving
-trait ImplicitExpressionOperations {
+trait ImplicitExpressionOperations extends BaseExpressions[Expression, 
Expression] {
   private[flink] def expr: Expression
 
+  override def toExpr: Expression = expr
+
+  override protected def toApiSpecificExpression(expression: Expression): 
Expression = expression
+
   /**
-    * Enables literals on left side of binary expressions.
-    *
-    * e.g. 12.toExpr % 'a
-    *
-    * @return expression
-    */
-  def toExpr: Expression = expr
+   * Specifies a name for an expression i.e. a field.
+   *
+   * @param name name for one field
+   * @param extraNames additional names if the expression expands to multiple 
fields
+   * @return field with an alias
+   */
+  def as(name: Symbol, extraNames: Symbol*): Expression = as(name.name, 
extraNames.map(_.name): _*)
 
   /**
     * Boolean AND in three-valued logic.
     */
-  def && (other: Expression): Expression = unresolvedCall(AND, expr, other)
+  def && (other: Expression): Expression = and(other)
 
   /**
     * Boolean OR in three-valued logic.
     */
-  def || (other: Expression): Expression = unresolvedCall(OR, expr, other)
+  def || (other: Expression): Expression = or(other)
 
   /**
     * Greater than.
     */
-  def > (other: Expression): Expression = unresolvedCall(GREATER_THAN, expr, 
other)
+  def > (other: Expression): Expression = isGreater(other)
 
   /**
     * Greater than or equal.
     */
-  def >= (other: Expression): Expression = 
unresolvedCall(GREATER_THAN_OR_EQUAL, expr, other)
+  def >= (other: Expression): Expression = isGreaterOrEqual(other)
 
   /**
     * Less than.
     */
-  def < (other: Expression): Expression = unresolvedCall(LESS_THAN, expr, 
other)
+  def < (other: Expression): Expression = isLess(other)
 
   /**
     * Less than or equal.
     */
-  def <= (other: Expression): Expression = unresolvedCall(LESS_THAN_OR_EQUAL, 
expr, other)
+  def <= (other: Expression): Expression = isLessOrEqual(other)
 
   /**
     * Equals.
     */
-  def === (other: Expression): Expression = unresolvedCall(EQUALS, expr, other)
+  def === (other: Expression): Expression = isEqual(other)
 
   /**
     * Not equal.
     */
-  def !== (other: Expression): Expression = unresolvedCall(NOT_EQUALS, expr, 
other)
-
-  /**
-    * Returns true if the given expression is between lowerBound and 
upperBound (both inclusive).
-    * False otherwise. The parameters must be numeric types or identical 
comparable types.
-    *
-    * @param lowerBound numeric or comparable expression
-    * @param upperBound numeric or comparable expression
-    * @return boolean or null
-    */
-  def between(lowerBound: Expression, upperBound: Expression): Expression =
-    unresolvedCall(BETWEEN, expr, lowerBound, upperBound)
-
-  /**
-    * Returns true if the given expression is not between lowerBound and 
upperBound (both
-    * inclusive). False otherwise. The parameters must be numeric types or 
identical
-    * comparable types.
-    *
-    * @param lowerBound numeric or comparable expression
-    * @param upperBound numeric or comparable expression
-    * @return boolean or null
-    */
-  def notBetween(lowerBound: Expression, upperBound: Expression): Expression =
-    unresolvedCall(NOT_BETWEEN, expr, lowerBound, upperBound)
+  def !== (other: Expression): Expression = isNotEqual(other)
 
   /**
     * Whether boolean expression is not true; returns null if boolean is null.
@@ -125,7 +102,7 @@ trait ImplicitExpressionOperations {
   /**
     * Returns negative numeric.
     */
-  def unary_- : Expression = unresolvedCall(MINUS_PREFIX, expr)
+  def unary_- : Expression = Expressions.negative(expr)
 
   /**
     * Returns numeric.
@@ -133,54 +110,24 @@ trait ImplicitExpressionOperations {
   def unary_+ : Expression = expr
 
   /**
-    * Returns true if the given expression is null.
-    */
-  def isNull: Expression = unresolvedCall(IS_NULL, expr)
-
-  /**
-    * Returns true if the given expression is not null.
-    */
-  def isNotNull: Expression = unresolvedCall(IS_NOT_NULL, expr)
-
-  /**
-    * Returns true if given boolean expression is true. False otherwise (for 
null and false).
-    */
-  def isTrue: Expression = unresolvedCall(IS_TRUE, expr)
-
-  /**
-    * Returns true if given boolean expression is false. False otherwise (for 
null and true).
-    */
-  def isFalse: Expression = unresolvedCall(IS_FALSE, expr)
-
-  /**
-    * Returns true if given boolean expression is not true (for null and 
false). False otherwise.
-    */
-  def isNotTrue: Expression = unresolvedCall(IS_NOT_TRUE, expr)
-
-  /**
-    * Returns true if given boolean expression is not false (for null and 
true). False otherwise.
-    */
-  def isNotFalse: Expression = unresolvedCall(IS_NOT_FALSE, expr)
-
-  /**
     * Returns left plus right.
     */
-  def + (other: Expression): Expression = unresolvedCall(PLUS, expr, other)
+  def + (other: Expression): Expression = plus(other)
 
   /**
     * Returns left minus right.
     */
-  def - (other: Expression): Expression = unresolvedCall(MINUS, expr, other)
+  def - (other: Expression): Expression = minus(other)
 
   /**
     * Returns left divided by right.
     */
-  def / (other: Expression): Expression = unresolvedCall(DIVIDE, expr, other)
+  def / (other: Expression): Expression = dividedBy(other)
 
   /**
     * Returns left multiplied by right.
     */
-  def * (other: Expression): Expression = unresolvedCall(TIMES, expr, other)
+  def * (other: Expression): Expression = times(other)
 
   /**
     * Returns the remainder (modulus) of left divided by right.
@@ -194,352 +141,23 @@ trait ImplicitExpressionOperations {
     *
     * e.g. withColumns(1 to 3)
     */
-  def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, 
other)
-
-  /**
-    * Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT a), 
declares that an
-    * aggregation function is only applied on distinct input values.
-    *
-    * For example:
-    *
-    * {{{
-    * orders
-    *   .groupBy('a)
-    *   .select('a, 'b.sum.distinct as 'd)
-    * }}}
-    */
-  def distinct: Expression = unresolvedCall(DISTINCT, expr)
-
-  /**
-    * Returns the sum of the numeric field across all input values.
-    * If all values are null, null is returned.
-    */
-  def sum: Expression = unresolvedCall(SUM, expr)
-
-  /**
-    * Returns the sum of the numeric field across all input values.
-    * If all values are null, 0 is returned.
-    */
-  def sum0: Expression = unresolvedCall(SUM0, expr)
-
-  /**
-    * Returns the minimum value of field across all input values.
-    */
-  def min: Expression = unresolvedCall(MIN, expr)
-
-  /**
-    * Returns the maximum value of field across all input values.
-    */
-  def max: Expression = unresolvedCall(MAX, expr)
-
-  /**
-    * Returns the number of input rows for which the field is not null.
-    */
-  def count: Expression = unresolvedCall(COUNT, expr)
-
-  /**
-    * Returns the average (arithmetic mean) of the numeric field across all 
input values.
-    */
-  def avg: Expression = unresolvedCall(AVG, expr)
-
-  /**
-    * Returns the population standard deviation of an expression (the square 
root of varPop()).
-    */
-  def stddevPop: Expression = unresolvedCall(STDDEV_POP, expr)
-
-  /**
-    * Returns the sample standard deviation of an expression (the square root 
of varSamp()).
-    */
-  def stddevSamp: Expression = unresolvedCall(STDDEV_SAMP, expr)
-
-  /**
-    * Returns the population standard variance of an expression.
-    */
-  def varPop: Expression = unresolvedCall(VAR_POP, expr)
-
-  /**
-    *  Returns the sample variance of a given expression.
-    */
-  def varSamp: Expression = unresolvedCall(VAR_SAMP, expr)
-
-  /**
-    * Returns multiset aggregate of a given expression.
-    */
-  def collect: Expression = unresolvedCall(COLLECT, expr)
-
-  /**
-    * Converts a value to a given data type.
-    *
-    * e.g. "42".cast(DataTypes.INT()) leads to 42.
-    *
-    * @return casted expression
-    */
-  def cast(toType: DataType): Expression =
-    unresolvedCall(CAST, expr, typeLiteral(toType))
-
-  /**
-    * @deprecated This method will be removed in future versions as it uses 
the old type system. It
-    *             is recommended to use [[cast(DataType)]] instead which uses 
the new type system
-    *             based on [[DataTypes]]. Please make sure to use either the 
old or the new type
-    *             system consistently to avoid unintended behavior. See the 
website documentation
-    *             for more information.
-    */
-  @deprecated
-  def cast(toType: TypeInformation[_]): Expression =
-    unresolvedCall(CAST, expr, typeLiteral(fromLegacyInfoToDataType(toType)))
-
-  /**
-    * Specifies a name for an expression i.e. a field.
-    *
-    * @param name name for one field
-    * @param extraNames additional names if the expression expands to multiple 
fields
-    * @return field with an alias
-    */
-  def as(name: Symbol, extraNames: Symbol*): Expression =
-    unresolvedCall(
-      AS,
-      expr +: valueLiteral(name.name) +: extraNames.map(name => 
valueLiteral(name.name)): _*)
-
-  /**
-    * Specifies ascending order of an expression i.e. a field for orderBy call.
-    *
-    * @return ascend expression
-    */
-  def asc: Expression = unresolvedCall(ORDER_ASC, expr)
-
-  /**
-    * Specifies descending order of an expression i.e. a field for orderBy 
call.
-    *
-    * @return descend expression
-    */
-  def desc: Expression = unresolvedCall(ORDER_DESC, expr)
-
-  /**
-    * Returns true if an expression exists in a given list of expressions. 
This is a shorthand
-    * for multiple OR conditions.
-    *
-    * If the testing set contains null, the result will be null if the element 
can not be found
-    * and true if it can be found. If the element is null, the result is 
always null.
-    *
-    * e.g. "42".in(1, 2, 3) leads to false.
-    */
-  def in(elements: Expression*): Expression = unresolvedCall(IN, expr +: 
elements: _*)
-
-  /**
-    * Returns true if an expression exists in a given table sub-query. The 
sub-query table
-    * must consist of one column. This column must have the same data type as 
the expression.
-    *
-    * Note: This operation is not supported in a streaming environment yet.
-    */
-  def in(table: Table): Expression = unresolvedCall(IN, expr, 
tableRef(table.toString, table))
-
-  /**
-    * Returns the start time (inclusive) of a window when applied on a window 
reference.
-    */
-  def start: Expression = unresolvedCall(WINDOW_START, expr)
-
-  /**
-    * Returns the end time (exclusive) of a window when applied on a window 
reference.
-    *
-    * e.g. if a window ends at 10:59:59.999 this property will return 
11:00:00.000.
-    */
-  def end: Expression = unresolvedCall(WINDOW_END, expr)
+  def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, 
objectToExpression(other))
 
   /**
     * Ternary conditional operator that decides which of two other expressions 
should be
     * based on a evaluated boolean condition.
     *
-    * e.g. (42 > 5).?("A", "B") leads to "A"
+    * e.g. ($"f0" > 5).?("A", "B") leads to "A"
     *
     * @param ifTrue expression to be evaluated if condition holds
     * @param ifFalse expression to be evaluated if condition does not hold
     */
   def ?(ifTrue: Expression, ifFalse: Expression): Expression =
-    unresolvedCall(IF, expr, ifTrue, ifFalse)
+    Expressions.ifThenElse(expr, ifTrue, ifFalse).toExpr
 
   // scalar functions
 
   /**
-    * Calculates the remainder of division the given number by another one.
-    */
-  def mod(other: Expression): Expression = unresolvedCall(MOD, expr, other)
-
-  /**
-    * Calculates the Euler's number raised to the given power.
-    */
-  def exp(): Expression = unresolvedCall(EXP, expr)
-
-  /**
-    * Calculates the base 10 logarithm of the given value.
-    */
-  def log10(): Expression = unresolvedCall(LOG10, expr)
-
-  /**
-    * Calculates the base 2 logarithm of the given value.
-    */
-  def log2(): Expression = unresolvedCall(LOG2, expr)
-
-  /**
-    * Calculates the natural logarithm of the given value.
-    */
-  def ln(): Expression = unresolvedCall(LN, expr)
-
-  /**
-    * Calculates the natural logarithm of the given value.
-    */
-  def log(): Expression = unresolvedCall(LOG, expr)
-
-  /**
-    * Calculates the logarithm of the given value to the given base.
-    */
-  def log(base: Expression): Expression = unresolvedCall(LOG, base, expr)
-
-  /**
-    * Calculates the given number raised to the power of the other value.
-    */
-  def power(other: Expression): Expression = unresolvedCall(POWER, expr, other)
-
-  /**
-    * Calculates the hyperbolic cosine of a given value.
-    */
-  def cosh(): Expression = unresolvedCall(COSH, expr)
-
-  /**
-    * Calculates the square root of a given value.
-    */
-  def sqrt(): Expression = unresolvedCall(SQRT, expr)
-
-  /**
-    * Calculates the absolute value of given value.
-    */
-  def abs(): Expression = unresolvedCall(ABS, expr)
-
-  /**
-    * Calculates the largest integer less than or equal to a given number.
-    */
-  def floor(): Expression = unresolvedCall(FLOOR, expr)
-
-  /**
-    * Calculates the hyperbolic sine of a given value.
-    */
-  def sinh(): Expression = unresolvedCall(SINH, expr)
-
-  /**
-    * Calculates the smallest integer greater than or equal to a given number.
-    */
-  def ceil(): Expression = unresolvedCall(CEIL, expr)
-
-  /**
-    * Calculates the sine of a given number.
-    */
-  def sin(): Expression = unresolvedCall(SIN, expr)
-
-  /**
-    * Calculates the cosine of a given number.
-    */
-  def cos(): Expression = unresolvedCall(COS, expr)
-
-  /**
-    * Calculates the tangent of a given number.
-    */
-  def tan(): Expression = unresolvedCall(TAN, expr)
-
-  /**
-    * Calculates the cotangent of a given number.
-    */
-  def cot(): Expression = unresolvedCall(COT, expr)
-
-  /**
-    * Calculates the arc sine of a given number.
-    */
-  def asin(): Expression = unresolvedCall(ASIN, expr)
-
-  /**
-    * Calculates the arc cosine of a given number.
-    */
-  def acos(): Expression = unresolvedCall(ACOS, expr)
-
-  /**
-    * Calculates the arc tangent of a given number.
-    */
-  def atan(): Expression = unresolvedCall(ATAN, expr)
-
-  /**
-    * Calculates the hyperbolic tangent of a given number.
-    */
-  def tanh(): Expression = unresolvedCall(TANH, expr)
-
-  /**
-    * Converts numeric from radians to degrees.
-    */
-  def degrees(): Expression = unresolvedCall(DEGREES, expr)
-
-  /**
-    * Converts numeric from degrees to radians.
-    */
-  def radians(): Expression = unresolvedCall(RADIANS, expr)
-
-  /**
-    * Calculates the signum of a given number.
-    */
-  def sign(): Expression = unresolvedCall(SIGN, expr)
-
-  /**
-    * Rounds the given number to integer places right to the decimal point.
-    */
-  def round(places: Expression): Expression = unresolvedCall(ROUND, expr, 
places)
-
-  /**
-    * Returns a string representation of an integer numeric value in binary 
format. Returns null if
-    * numeric is null. E.g. "4" leads to "100", "12" leads to "1100".
-    */
-  def bin(): Expression = unresolvedCall(BIN, expr)
-
-  /**
-    * Returns a string representation of an integer numeric value or a string 
in hex format. Returns
-    * null if numeric or string is null.
-    *
-    * E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a 
string "hello,world" leads
-    * to "68656c6c6f2c776f726c64".
-    */
-  def hex(): Expression = unresolvedCall(HEX, expr)
-
-  /**
-    * Returns a number of truncated to n decimal places.
-    * If n is 0,the result has no decimal point or fractional part.
-    * n can be negative to cause n digits left of the decimal point of the 
value to become zero.
-    * E.g. truncate(42.345, 2) to 42.34.
-    */
-  def truncate(n: Expression): Expression = unresolvedCall(TRUNCATE, expr, n)
-
-  /**
-    * Returns a number of truncated to 0 decimal places.
-    * E.g. truncate(42.345) to 42.0.
-    */
-  def truncate(): Expression = unresolvedCall(TRUNCATE, expr)
-
-  // String operations
-
-  /**
-    * Creates a substring of the given string at given index for a given 
length.
-    *
-    * @param beginIndex first character of the substring (starting at 1, 
inclusive)
-    * @param length number of characters of the substring
-    * @return substring
-    */
-  def substring(beginIndex: Expression, length: Expression): Expression =
-    unresolvedCall(SUBSTRING, expr, beginIndex, length)
-
-  /**
-    * Creates a substring of the given string beginning at the given index to 
the end.
-    *
-    * @param beginIndex first character of the substring (starting at 1, 
inclusive)
-    * @return substring
-    */
-  def substring(beginIndex: Expression): Expression =
-    unresolvedCall(SUBSTRING, expr, beginIndex)
-
-  /**
     * Removes leading and/or trailing characters from the given string.
     *
     * @param removeLeading if true, remove leading characters (default: true)
@@ -552,324 +170,14 @@ trait ImplicitExpressionOperations {
       removeTrailing: Boolean = true,
       character: Expression = valueLiteral(" "))
     : Expression = {
-    unresolvedCall(TRIM, valueLiteral(removeLeading), 
valueLiteral(removeTrailing), character, expr)
+    unresolvedCall(
+      TRIM,
+      valueLiteral(removeLeading),
+      valueLiteral(removeTrailing),
+      ApiExpressionUtils.objectToExpression(character),
+      expr)
   }
 
-  /**
-    * Returns a new string which replaces all the occurrences of the search 
target
-    * with the replacement string (non-overlapping).
-    */
-  def replace(search: Expression, replacement: Expression): Expression =
-    unresolvedCall(REPLACE, expr, search, replacement)
-
-  /**
-    * Returns the length of a string.
-    */
-  def charLength(): Expression = unresolvedCall(CHAR_LENGTH, expr)
-
-  /**
-    * Returns all of the characters in a string in upper case using the rules 
of
-    * the default locale.
-    */
-  def upperCase(): Expression = unresolvedCall(UPPER, expr)
-
-  /**
-    * Returns all of the characters in a string in lower case using the rules 
of
-    * the default locale.
-    */
-  def lowerCase(): Expression = unresolvedCall(LOWER, expr)
-
-  /**
-    * Converts the initial letter of each word in a string to uppercase.
-    * Assumes a string containing only [A-Za-z0-9], everything else is treated 
as whitespace.
-    */
-  def initCap(): Expression = unresolvedCall(INIT_CAP, expr)
-
-  /**
-    * Returns true, if a string matches the specified LIKE pattern.
-    *
-    * e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n"
-    */
-  def like(pattern: Expression): Expression = unresolvedCall(LIKE, expr, 
pattern)
-
-  /**
-    * Returns true, if a string matches the specified SQL regex pattern.
-    *
-    * e.g. "A+" matches all strings that consist of at least one A
-    */
-  def similar(pattern: Expression): Expression = unresolvedCall(SIMILAR, expr, 
pattern)
-
-  /**
-    * Returns the position of string in an other string starting at 1.
-    * Returns 0 if string could not be found.
-    *
-    * e.g. "a".position("bbbbba") leads to 6
-    */
-  def position(haystack: Expression): Expression = unresolvedCall(POSITION, 
expr, haystack)
-
-  /**
-    * Returns a string left-padded with the given pad string to a length of 
len characters. If
-    * the string is longer than len, the return value is shortened to len 
characters.
-    *
-    * e.g. "hi".lpad(4, '??') returns "??hi",  "hi".lpad(1, '??') returns "h"
-    */
-  def lpad(len: Expression, pad: Expression): Expression = 
unresolvedCall(LPAD, expr, len, pad)
-
-  /**
-    * Returns a string right-padded with the given pad string to a length of 
len characters. If
-    * the string is longer than len, the return value is shortened to len 
characters.
-    *
-    * e.g. "hi".rpad(4, '??') returns "hi??",  "hi".rpad(1, '??') returns "h"
-    */
-  def rpad(len: Expression, pad: Expression): Expression = 
unresolvedCall(RPAD, expr, len, pad)
-
-  /**
-    * Defines an aggregation to be used for a previously specified over window.
-    *
-    * For example:
-    *
-    * {{{
-    * table
-    *   .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows 
following CURRENT_ROW as 'w)
-    *   .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
-    * }}}
-    */
-  def over(alias: Expression): Expression = unresolvedCall(OVER, expr, alias)
-
-  /**
-    * Replaces a substring of string with a string starting at a position 
(starting at 1).
-    *
-    * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx"
-    */
-  def overlay(newString: Expression, starting: Expression): Expression =
-    unresolvedCall(OVERLAY, expr, newString, starting)
-
-  /**
-    * Replaces a substring of string with a string starting at a position 
(starting at 1).
-    * The length specifies how many characters should be removed.
-    *
-    * e.g. "xxxxxtest".overlay("xxxx", 6, 2) leads to "xxxxxxxxxst"
-    */
-  def overlay(newString: Expression, starting: Expression, length: 
Expression): Expression =
-    unresolvedCall(OVERLAY, expr, newString, starting, length)
-
-  /**
-    * Returns a string with all substrings that match the regular expression 
consecutively
-    * being replaced.
-    */
-  def regexpReplace(regex: Expression, replacement: Expression): Expression =
-    unresolvedCall(REGEXP_REPLACE, expr, regex, replacement)
-
-  /**
-    * Returns a string extracted with a specified regular expression and a 
regex match group
-    * index.
-    */
-  def regexpExtract(regex: Expression, extractIndex: Expression): Expression =
-    unresolvedCall(REGEXP_EXTRACT, expr, regex, extractIndex)
-
-  /**
-    * Returns a string extracted with a specified regular expression.
-    */
-  def regexpExtract(regex: Expression): Expression =
-    unresolvedCall(REGEXP_EXTRACT, expr, regex)
-
-  /**
-    * Returns the base string decoded with base64.
-    */
-  def fromBase64(): Expression = unresolvedCall(FROM_BASE64, expr)
-
-  /**
-    * Returns the base64-encoded result of the input string.
-    */
-  def toBase64(): Expression = unresolvedCall(TO_BASE64, expr)
-
-  /**
-    * Returns a string that removes the left whitespaces from the given string.
-    */
-  def ltrim(): Expression = unresolvedCall(LTRIM, expr)
-
-  /**
-    * Returns a string that removes the right whitespaces from the given 
string.
-    */
-  def rtrim(): Expression = unresolvedCall(RTRIM, expr)
-
-  /**
-    * Returns a string that repeats the base string n times.
-    */
-  def repeat(n: Expression): Expression = unresolvedCall(REPEAT, expr, n)
-
-  // Temporal operations
-
-  /**
-    * Parses a date string in the form "yyyy-MM-dd" to a SQL Date.
-    */
-  def toDate: Expression =
-    unresolvedCall(CAST, expr, 
typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.DATE)))
-
-  /**
-    * Parses a time string in the form "HH:mm:ss" to a SQL Time.
-    */
-  def toTime: Expression =
-    unresolvedCall(CAST, expr, 
typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIME)))
-
-  /**
-    * Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a 
SQL Timestamp.
-    */
-  def toTimestamp: Expression =
-    unresolvedCall(CAST, expr, 
typeLiteral(fromLegacyInfoToDataType(SqlTimeTypeInfo.TIMESTAMP)))
-
-  /**
-    * Extracts parts of a time point or time interval. Returns the part as a 
long value.
-    *
-    * e.g. "2006-06-05".toDate.extract(DAY) leads to 5
-    */
-  def extract(timeIntervalUnit: TimeIntervalUnit): Expression =
-    unresolvedCall(EXTRACT, valueLiteral(timeIntervalUnit), expr)
-
-  /**
-    * Rounds down a time point to the given unit.
-    *
-    * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00
-    */
-  def floor(timeIntervalUnit: TimeIntervalUnit): Expression =
-    unresolvedCall(FLOOR, valueLiteral(timeIntervalUnit), expr)
-
-  /**
-    * Rounds up a time point to the given unit.
-    *
-    * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00
-    */
-  def ceil(timeIntervalUnit: TimeIntervalUnit): Expression =
-    unresolvedCall(CEIL, valueLiteral(timeIntervalUnit), expr)
-
-  // Interval types
-
-  /**
-    * Creates an interval of the given number of years.
-    *
-    * @return interval of months
-    */
-  def year: Expression = toMonthInterval(expr, 12)
-
-  /**
-    * Creates an interval of the given number of years.
-    *
-    * @return interval of months
-    */
-  def years: Expression = year
-
-  /**
-    * Creates an interval of the given number of quarters.
-    *
-    * @return interval of months
-    */
-  def quarter: Expression = toMonthInterval(expr, 3)
-
-  /**
-    * Creates an interval of the given number of quarters.
-    *
-    * @return interval of months
-    */
-  def quarters: Expression = quarter
-
-  /**
-    * Creates an interval of the given number of months.
-    *
-    * @return interval of months
-    */
-  def month: Expression = toMonthInterval(expr, 1)
-
-  /**
-    * Creates an interval of the given number of months.
-    *
-    * @return interval of months
-    */
-  def months: Expression = month
-
-  /**
-    * Creates an interval of the given number of weeks.
-    *
-    * @return interval of milliseconds
-    */
-  def week: Expression = toMilliInterval(expr, 7 * MILLIS_PER_DAY)
-
-  /**
-    * Creates an interval of the given number of weeks.
-    *
-    * @return interval of milliseconds
-    */
-  def weeks: Expression = week
-
-  /**
-    * Creates an interval of the given number of days.
-    *
-    * @return interval of milliseconds
-    */
-  def day: Expression = toMilliInterval(expr, MILLIS_PER_DAY)
-
-  /**
-    * Creates an interval of the given number of days.
-    *
-    * @return interval of milliseconds
-    */
-  def days: Expression = day
-
-  /**
-    * Creates an interval of the given number of hours.
-    *
-    * @return interval of milliseconds
-    */
-  def hour: Expression = toMilliInterval(expr, MILLIS_PER_HOUR)
-
-  /**
-    * Creates an interval of the given number of hours.
-    *
-    * @return interval of milliseconds
-    */
-  def hours: Expression = hour
-
-  /**
-    * Creates an interval of the given number of minutes.
-    *
-    * @return interval of milliseconds
-    */
-  def minute: Expression = toMilliInterval(expr, MILLIS_PER_MINUTE)
-
-  /**
-    * Creates an interval of the given number of minutes.
-    *
-    * @return interval of milliseconds
-    */
-  def minutes: Expression = minute
-
-  /**
-    * Creates an interval of the given number of seconds.
-    *
-    * @return interval of milliseconds
-    */
-  def second: Expression = toMilliInterval(expr, MILLIS_PER_SECOND)
-
-  /**
-    * Creates an interval of the given number of seconds.
-    *
-    * @return interval of milliseconds
-    */
-  def seconds: Expression = second
-
-  /**
-    * Creates an interval of the given number of milliseconds.
-    *
-    * @return interval of milliseconds
-    */
-  def milli: Expression = toMilliInterval(expr, 1)
-
-  /**
-    * Creates an interval of the given number of milliseconds.
-    *
-    * @return interval of milliseconds
-    */
-  def millis: Expression = milli
-
   // Row interval type
 
   /**
@@ -879,121 +187,6 @@ trait ImplicitExpressionOperations {
     */
   def rows: Expression = toRowInterval(expr)
 
-  // Advanced type helper functions
-
-  /**
-    * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) 
by name and
-    * returns it's value.
-    *
-    * @param name name of the field (similar to Flink's field expressions)
-    * @return value of the field
-    */
-  def get(name: String): Expression = unresolvedCall(GET, expr, 
valueLiteral(name))
-
-  /**
-    * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) 
by index and
-    * returns it's value.
-    *
-    * @param index position of the field
-    * @return value of the field
-    */
-  def get(index: Int): Expression = unresolvedCall(GET, expr, 
valueLiteral(index))
-
-  /**
-    * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of 
its direct subtypes
-    * into a flat representation where every subtype is a separate field.
-    */
-  def flatten(): Expression = unresolvedCall(FLATTEN, expr)
-
-  /**
-    * Accesses the element of an array or map based on a key or an index 
(starting at 1).
-    *
-    * @param index key or position of the element (array index starting at 1)
-    * @return value of the element
-    */
-  def at(index: Expression): Expression = unresolvedCall(AT, expr, index)
-
-  /**
-    * Returns the number of elements of an array or number of entries of a map.
-    *
-    * @return number of elements or entries
-    */
-  def cardinality(): Expression = unresolvedCall(CARDINALITY, expr)
-
-  /**
-    * Returns the sole element of an array with a single element. Returns null 
if the array is
-    * empty. Throws an exception if the array has more than one element.
-    *
-    * @return the first and only element of an array with a single element
-    */
-  def element(): Expression = unresolvedCall(ARRAY_ELEMENT, expr)
-
-  // Time definition
-
-  /**
-    * Declares a field as the rowtime attribute for indicating, accessing, and 
working in
-    * Flink's event time.
-    */
-  def rowtime: Expression = unresolvedCall(ROWTIME, expr)
-
-  /**
-    * Declares a field as the proctime attribute for indicating, accessing, 
and working in
-    * Flink's processing time.
-    */
-  def proctime: Expression = unresolvedCall(PROCTIME, expr)
-
-  // Hash functions
-
-  /**
-    * Returns the MD5 hash of the string argument; null if string is null.
-    *
-    * @return string of 32 hexadecimal digits or null
-    */
-  def md5(): Expression = unresolvedCall(MD5, expr)
-
-  /**
-    * Returns the SHA-1 hash of the string argument; null if string is null.
-    *
-    * @return string of 40 hexadecimal digits or null
-    */
-  def sha1(): Expression = unresolvedCall(SHA1, expr)
-
-  /**
-    * Returns the SHA-224 hash of the string argument; null if string is null.
-    *
-    * @return string of 56 hexadecimal digits or null
-    */
-  def sha224(): Expression = unresolvedCall(SHA224, expr)
-
-  /**
-    * Returns the SHA-256 hash of the string argument; null if string is null.
-    *
-    * @return string of 64 hexadecimal digits or null
-    */
-  def sha256(): Expression = unresolvedCall(SHA256, expr)
-
-  /**
-    * Returns the SHA-384 hash of the string argument; null if string is null.
-    *
-    * @return string of 96 hexadecimal digits or null
-    */
-  def sha384(): Expression = unresolvedCall(SHA384, expr)
-
-  /**
-    * Returns the SHA-512 hash of the string argument; null if string is null.
-    *
-    * @return string of 128 hexadecimal digits or null
-    */
-  def sha512(): Expression = unresolvedCall(SHA512, expr)
-
-  /**
-    * Returns the hash for the given string expression using the SHA-2 family 
of hash
-    * functions (SHA-224, SHA-256, SHA-384, or SHA-512).
-    *
-    * @param hashLength bit length of the result (either 224, 256, 384, or 512)
-    * @return string or null if one of the arguments is null.
-    */
-  def sha2(hashLength: Expression): Expression = unresolvedCall(SHA2, expr, 
hashLength)
 }
 
 /**
@@ -1109,7 +302,9 @@ trait ImplicitExpressionConversions {
       * Calls a scalar function for the given parameters.
       */
     def apply(params: Expression*): Expression = {
-      unresolvedCall(new ScalarFunctionDefinition(s.getClass.getName, s), 
params:_*)
+      unresolvedCall(
+        new ScalarFunctionDefinition(s.getClass.getName, s),
+        params.map(ApiExpressionUtils.objectToExpression): _*)
     }
   }
 
@@ -1121,7 +316,9 @@ trait ImplicitExpressionConversions {
     def apply(params: Expression*): Expression = {
       val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper
         .getReturnTypeOfTableFunction(t, implicitly[TypeInformation[T]])
-      unresolvedCall(new TableFunctionDefinition(t.getClass.getName, t, 
resultTypeInfo), params: _*)
+      unresolvedCall(
+        new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo),
+        params.map(ApiExpressionUtils.objectToExpression): _*)
     }
   }
 
@@ -1149,7 +346,9 @@ trait ImplicitExpressionConversions {
       * Calls an aggregate function for the given parameters.
       */
     def apply(params: Expression*): Expression = {
-      unresolvedCall(createFunctionDefinition(), params: _*)
+      unresolvedCall(
+        createFunctionDefinition(),
+        params.map(ApiExpressionUtils.objectToExpression): _*)
     }
 
     /**
@@ -1160,6 +359,25 @@ trait ImplicitExpressionConversions {
     }
   }
 
+
+  /**
+   * Extends Scala's StringContext with a method for creating an unresolved 
reference via
+   * string interpolation.
+   */
+  implicit class FieldExpression(val sc: StringContext) {
+
+    /**
+     * Creates an unresolved reference to a table's field.
+     *
+     * Example:
+     * ```
+     * tab.select($"key", $"value")
+     * ```
+     * </pre>
+     */
+    def $(args: Any*): Expression = unresolvedRef(sc.s(args: _*))
+  }
+
   implicit def tableSymbolToExpression(sym: TableSymbol): Expression =
     valueLiteral(sym)
 
@@ -1169,7 +387,7 @@ trait ImplicitExpressionConversions {
   implicit def scalaRange2RangeExpression(range: Range.Inclusive): Expression 
= {
     val startExpression = valueLiteral(range.start)
     val endExpression = valueLiteral(range.end)
-    startExpression to endExpression
+    unresolvedCall(RANGE_TO, startExpression, endExpression)
   }
 
   implicit def byte2Literal(b: Byte): Expression = valueLiteral(b)
@@ -1284,47 +502,77 @@ trait ImplicitExpressionConversions {
    * @see TableEnvironment#createTemporaryFunction
    * @see TableEnvironment#createTemporarySystemFunction
    */
-  def call(path: String, params: Expression*): Expression = {
-    lookupCall(path, params: _*)
-  }
+  def call(path: String, params: Expression*): Expression = 
Expressions.call(path, params: _*)
 
   // 
----------------------------------------------------------------------------------------------
   // Implicit expressions in prefix notation
   // 
----------------------------------------------------------------------------------------------
 
   /**
+   * Creates a SQL literal.
+   *
+   * The data type is derived from the object's class and its value.
+   *
+   * For example:
+   *
+   *  - `lit(12)`` leads to `INT`
+   *  - `lit("abc")`` leads to `CHAR(3)`
+   *  - `lit(new java.math.BigDecimal("123.45"))` leads to `DECIMAL(5, 2)`
+   *
+   * See [[org.apache.flink.table.types.utils.ValueDataTypeConverter]] for a 
list of supported
+   * literal values.
+   */
+  def lit(v: Any): Expression = Expressions.lit(v)
+
+  /**
+   * Creates a SQL literal of a given [[DataType]].
+   *
+   * The method [[lit(Object)]] is preferred as it extracts the [[DataType]]
+   * automatically. The class of `v` must be supported according to the
+   * 
[[org.apache.flink.table.types.logical.LogicalType#supportsInputConversion(Class)]].
+   */
+  def lit(v: Any, dataType: DataType): Expression = Expressions.lit(v, 
dataType)
+
+  /**
+   * Returns negative numeric.
+   */
+  def negative(v: Expression): Expression = {
+    Expressions.negative(v)
+  }
+
+  /**
     * Returns the current SQL date in UTC time zone.
     */
   def currentDate(): Expression = {
-    unresolvedCall(CURRENT_DATE)
+    Expressions.currentDate()
   }
 
   /**
     * Returns the current SQL time in UTC time zone.
     */
   def currentTime(): Expression = {
-    unresolvedCall(CURRENT_TIME)
+    Expressions.currentTime()
   }
 
   /**
     * Returns the current SQL timestamp in UTC time zone.
     */
   def currentTimestamp(): Expression = {
-    unresolvedCall(CURRENT_TIMESTAMP)
+    Expressions.currentTimestamp()
   }
 
   /**
     * Returns the current SQL time in local time zone.
     */
   def localTime(): Expression = {
-    unresolvedCall(LOCAL_TIME)
+    Expressions.localTime()
   }
 
   /**
     * Returns the current SQL timestamp in local time zone.
     */
   def localTimestamp(): Expression = {
-    unresolvedCall(LOCAL_TIMESTAMP)
+    Expressions.localTimestamp()
   }
 
   /**
@@ -1342,7 +590,7 @@ trait ImplicitExpressionConversions {
       rightTimePoint: Expression,
       rightTemporal: Expression)
     : Expression = {
-    unresolvedCall(TEMPORAL_OVERLAPS, leftTimePoint, leftTemporal, 
rightTimePoint, rightTemporal)
+    Expressions.temporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 
   /**
@@ -1360,7 +608,7 @@ trait ImplicitExpressionConversions {
       timestamp: Expression,
       format: Expression)
     : Expression = {
-    unresolvedCall(DATE_FORMAT, timestamp, format)
+    Expressions.dateFormat(timestamp, format)
   }
 
   /**
@@ -1379,49 +627,49 @@ trait ImplicitExpressionConversions {
       timePoint1: Expression,
       timePoint2: Expression)
     : Expression = {
-    unresolvedCall(TIMESTAMP_DIFF, timePointUnit, timePoint1, timePoint2)
+    Expressions.timestampDiff(timePointUnit, timePoint1, timePoint2)
   }
 
   /**
     * Creates an array of literals.
     */
   def array(head: Expression, tail: Expression*): Expression = {
-    unresolvedCall(ARRAY, head +: tail: _*)
+    Expressions.array(head, tail: _*)
   }
 
   /**
     * Creates a row of expressions.
     */
   def row(head: Expression, tail: Expression*): Expression = {
-    unresolvedCall(ROW, head +: tail: _*)
+    Expressions.row(head, tail: _*)
   }
 
   /**
     * Creates a map of expressions.
     */
   def map(key: Expression, value: Expression, tail: Expression*): Expression = 
{
-    unresolvedCall(MAP, key +: value +: tail: _*)
+    Expressions.map(key, value, tail: _*)
   }
 
   /**
     * Returns a value that is closer than any other value to pi.
     */
   def pi(): Expression = {
-    unresolvedCall(PI)
+    Expressions.pi()
   }
 
   /**
     * Returns a value that is closer than any other value to e.
     */
   def e(): Expression = {
-    unresolvedCall(E)
+    Expressions.e()
   }
 
   /**
     * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 
(exclusive).
     */
   def rand(): Expression = {
-    unresolvedCall(RAND)
+    Expressions.rand()
   }
 
   /**
@@ -1430,7 +678,7 @@ trait ImplicitExpressionConversions {
     * have same initial seed.
     */
   def rand(seed: Expression): Expression = {
-    unresolvedCall(RAND, seed)
+    Expressions.rand(seed)
   }
 
   /**
@@ -1438,7 +686,7 @@ trait ImplicitExpressionConversions {
     * value (exclusive).
     */
   def randInteger(bound: Expression): Expression = {
-    unresolvedCall(RAND_INTEGER, bound)
+    Expressions.randInteger(bound)
   }
 
   /**
@@ -1447,7 +695,7 @@ trait ImplicitExpressionConversions {
     * of numbers if they have same initial seed and same bound.
     */
   def randInteger(seed: Expression, bound: Expression): Expression = {
-    unresolvedCall(RAND_INTEGER, seed, bound)
+    Expressions.randInteger(seed, bound)
   }
 
   /**
@@ -1455,25 +703,38 @@ trait ImplicitExpressionConversions {
     * Returns NULL if any argument is NULL.
     */
   def concat(string: Expression, strings: Expression*): Expression = {
-    unresolvedCall(CONCAT, string +: strings: _*)
+    Expressions.concat(string, strings: _*)
   }
 
   /**
     * Calculates the arc tangent of a given coordinate.
     */
   def atan2(y: Expression, x: Expression): Expression = {
-    unresolvedCall(ATAN2, y, x)
+    Expressions.atan2(y, x)
   }
 
   /**
     * Returns the string that results from concatenating the arguments and 
separator.
     * Returns NULL If the separator is NULL.
     *
-    * Note: this user-defined function does not skip empty strings. However, 
it does skip any NULL
+    * Note: This function does not skip empty strings. However, it does skip 
any NULL
     * values after the separator argument.
+    * @deprecated use [[ImplicitExpressionConversions.concatWs()]]
     **/
+  @deprecated
   def concat_ws(separator: Expression, string: Expression, strings: 
Expression*): Expression = {
-    unresolvedCall(CONCAT_WS, separator +: string +: strings: _*)
+    concatWs(separator, string, strings: _*)
+  }
+
+  /**
+   * Returns the string that results from concatenating the arguments and 
separator.
+   * Returns NULL If the separator is NULL.
+   *
+   * Note: this user-defined function does not skip empty strings. However, it 
does skip any NULL
+   * values after the separator argument.
+   **/
+  def concatWs(separator: Expression, string: Expression, strings: 
Expression*): Expression = {
+    Expressions.concatWs(separator, string, strings: _*)
   }
 
   /**
@@ -1483,7 +744,7 @@ trait ImplicitExpressionConversions {
     * generator.
     */
   def uuid(): Expression = {
-    unresolvedCall(UUID)
+    Expressions.uuid()
   }
 
   /**
@@ -1492,7 +753,7 @@ trait ImplicitExpressionConversions {
     * e.g. nullOf(DataTypes.INT())
     */
   def nullOf(dataType: DataType): Expression = {
-    valueLiteral(null, dataType)
+    Expressions.nullOf(dataType)
   }
 
   /**
@@ -1503,21 +764,21 @@ trait ImplicitExpressionConversions {
     *             documentation for more information.
     */
   def nullOf(typeInfo: TypeInformation[_]): Expression = {
-    nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo))
+    Expressions.nullOf(typeInfo)
   }
 
   /**
     * Calculates the logarithm of the given value.
     */
   def log(value: Expression): Expression = {
-    unresolvedCall(LOG, value)
+    Expressions.log(value)
   }
 
   /**
     * Calculates the logarithm of the given value to the given base.
     */
   def log(base: Expression, value: Expression): Expression = {
-    unresolvedCall(LOG, base, value)
+    Expressions.log(base, value)
   }
 
   /**
@@ -1531,7 +792,7 @@ trait ImplicitExpressionConversions {
     * @param ifFalse expression to be evaluated if condition does not hold
     */
   def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: 
Expression): Expression = {
-    unresolvedCall(IF, condition, ifTrue, ifFalse)
+    Expressions.ifThenElse(condition, ifTrue, ifFalse)
   }
 
   /**
@@ -1544,7 +805,7 @@ trait ImplicitExpressionConversions {
     * e.g. withColumns('b to 'c) or withColumns('*)
     */
   def withColumns(head: Expression, tail: Expression*): Expression = {
-    unresolvedCall(WITH_COLUMNS, head +: tail: _*)
+    Expressions.withColumns(head, tail: _*)
   }
 
   /**
@@ -1558,6 +819,20 @@ trait ImplicitExpressionConversions {
     * e.g. withoutColumns('b to 'c) or withoutColumns('c)
     */
   def withoutColumns(head: Expression, tail: Expression*): Expression = {
-    unresolvedCall(WITHOUT_COLUMNS, head +: tail: _*)
+    Expressions.withoutColumns(head, tail: _*)
+  }
+
+  /**
+   * Boolean AND in three-valued logic.
+   */
+  def and(predicate0: Expression, predicate1: Expression, predicates: 
Expression*): Expression = {
+    Expressions.and(predicate0, predicate1, predicates: _*)
+  }
+
+  /**
+   * Boolean OR in three-valued logic.
+   */
+  def or(predicate0: Expression, predicate1: Expression, predicates: 
Expression*): Expression = {
+    Expressions.or(predicate0, predicate1, predicates: _*)
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
index 06a9b5f..fadf854 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.functions._
 import org.apache.flink.table.planner.expressions.{E => PlannerE, UUID => 
PlannerUUID}
 import 
org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THROW_EXCEPTION
 import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, 
SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE}
+import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, 
SYMBOL}
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._
 
 import _root_.scala.collection.JavaConverters._
@@ -151,12 +151,12 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
             expr
 
           case AND =>
-            assert(args.size == 2)
-            And(args.head, args.last)
+            assert(args.size >= 2)
+            args.reduceLeft(And)
 
           case OR =>
-            assert(args.size == 2)
-            Or(args.head, args.last)
+            assert(args.size >= 2)
+            args.reduceLeft(Or)
 
           case NOT =>
             assert(args.size == 1)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index cb9f6b1..7b2c999 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -133,12 +133,12 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
             expr
 
           case AND =>
-            assert(args.size == 2)
-            And(args.head, args.last)
+            assert(args.size >= 2)
+            args.reduceLeft(And)
 
           case OR =>
-            assert(args.size == 2)
-            Or(args.head, args.last)
+            assert(args.size >= 2)
+            args.reduceLeft(Or)
 
           case NOT =>
             assert(args.size == 1)

Reply via email to