This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a27357  [FLINK-11908][table] Port window classes into 
flink-table-api-java
7a27357 is described below

commit 7a273577021bf84927242c9372757215e1ff9fc3
Author: hequn8128 <chenghe...@gmail.com>
AuthorDate: Sun Mar 17 12:57:01 2019 +0800

    [FLINK-11908][table] Port window classes into flink-table-api-java
    
    This closes #7976.
---
 .../org/apache/flink/table/api/GroupWindow.java    |  55 ++
 .../main/java/org/apache/flink/table/api/Over.java | 105 +++
 .../org/apache/flink/table/api/OverWindow.java     |  74 ++
 .../flink/table/api/OverWindowPartitioned.java     |  69 ++
 .../table/api/OverWindowPartitionedOrdered.java    |  89 +++
 .../api/OverWindowPartitionedOrderedPreceding.java |  88 +++
 .../java/org/apache/flink/table/api/Session.java   |  74 ++
 .../org/apache/flink/table/api/SessionWithGap.java |  71 ++
 .../flink/table/api/SessionWithGapOnTime.java      |  62 ++
 .../table/api/SessionWithGapOnTimeWithAlias.java   |  40 ++
 .../java/org/apache/flink/table/api/Slide.java     |  84 +++
 .../org/apache/flink/table/api/SlideWithSize.java  |  71 ++
 .../flink/table/api/SlideWithSizeAndSlide.java     |  73 ++
 .../table/api/SlideWithSizeAndSlideOnTime.java     |  67 ++
 .../api/SlideWithSizeAndSlideOnTimeWithAlias.java  |  50 ++
 .../java/org/apache/flink/table/api/Tumble.java    |  72 ++
 .../org/apache/flink/table/api/TumbleWithSize.java |  71 ++
 .../flink/table/api/TumbleWithSizeOnTime.java      |  62 ++
 .../table/api/TumbleWithSizeOnTimeWithAlias.java   |  40 ++
 .../flink/table/expressions/ExpressionParser.java  |  41 ++
 .../table/expressions/PlannerExpressionParser.java |  68 ++
 .../table/api/WindowCreationValidationTest.java    |  60 ++
 .../table/api/java/BatchTableEnvironment.scala     |   6 +-
 .../table/api/java/StreamTableEnvironment.scala    |   5 +-
 .../org/apache/flink/table/api/java/windows.scala  |  64 --
 .../flink/table/api/scala/expressionDsl.scala      |   2 +-
 .../org/apache/flink/table/api/scala/windows.scala |  68 --
 .../scala/org/apache/flink/table/api/table.scala   |  64 +-
 .../scala/org/apache/flink/table/api/windows.scala | 741 ---------------------
 ...ser.scala => PlannerExpressionParserImpl.scala} |  25 +-
 .../GroupWindowStringExpressionTest.scala          |   2 +-
 .../runtime/stream/TimeAttributesITCase.scala      |   6 +-
 32 files changed, 1530 insertions(+), 939 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
new file mode 100644
index 0000000..4601e75
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/GroupWindow.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+
+/**
+ * A group window specification.
+ *
+ * <p>Group windows group rows based on time or row-count intervals and is 
therefore essentially a
+ * special type of groupBy. Just like groupBy, group windows allow to compute 
aggregates
+ * on groups of elements.
+ *
+ * <p>Infinite streaming tables can only be grouped into time or row 
intervals. Hence window
+ * grouping is required to apply aggregations on streaming tables.
+ *
+ * <p>For finite batch tables, group windows provide shortcuts for time-based 
groupBy.
+ */
+@PublicEvolving
+public abstract class GroupWindow {
+
+       /** Alias name for the group window. */
+       private final Expression alias;
+       private final Expression timeField;
+
+       GroupWindow(Expression alias, Expression timeField) {
+               this.alias = alias;
+               this.timeField = timeField;
+       }
+
+       public Expression getAlias() {
+               return alias;
+       }
+
+       public Expression getTimeField() {
+               return timeField;
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
new file mode 100644
index 0000000..52acf85
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Over.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+import java.util.Arrays;
+
+/**
+ * Helper class for creating an over window. Similar to SQL, over window 
aggregates compute an
+ * aggregate for each input row over a range of its neighboring rows.
+ *
+ * <p>Java Example:
+ *
+ * <pre>
+ * {@code
+ *    
Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w")
+ * }
+ * </pre>
+ *
+ * <p>Scala Example:
+ *
+ * <pre>
+ * {@code
+ *    Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w
+ * }
+ * </pre>
+ */
+@PublicEvolving
+public final class Over {
+
+       /**
+        * Partitions the elements on some partition keys.
+        *
+        * <p>Each partition is individually sorted and aggregate functions are 
applied to each
+        * partition separately.
+        *
+        * @param partitionBy list of field references
+        * @return an over window with defined partitioning
+        */
+       public static OverWindowPartitioned partitionBy(String partitionBy) {
+               return new 
OverWindowPartitioned(ExpressionParser.parseExpressionList(partitionBy));
+       }
+
+       /**
+        * Partitions the elements on some partition keys.
+        *
+        * <p>Each partition is individually sorted and aggregate functions are 
applied to each
+        * partition separately.
+        *
+        * @param partitionBy list of field references
+        * @return an over window with defined partitioning
+        */
+       public static OverWindowPartitioned partitionBy(Expression... 
partitionBy) {
+               return new OverWindowPartitioned(Arrays.asList(partitionBy));
+       }
+
+       /**
+        * Specifies the time attribute on which rows are ordered.
+        *
+        * <p>For streaming tables, reference a rowtime or proctime time 
attribute here
+        * to specify the time mode.
+        *
+        * <p>For batch tables, refer to a timestamp or long attribute.
+        *
+        * @param orderBy field reference
+        * @return an over window with defined order
+        */
+       public static OverWindowPartitionedOrdered orderBy(String orderBy) {
+               return partitionBy().orderBy(orderBy);
+       }
+
+       /**
+        * Specifies the time attribute on which rows are ordered.
+        *
+        * <p>For streaming tables, reference a rowtime or proctime time 
attribute here
+        * to specify the time mode.
+        *
+        * <p>For batch tables, refer to a timestamp or long attribute.
+        *
+        * @param orderBy field reference
+        * @return an over window with defined order
+        */
+       public static OverWindowPartitionedOrdered orderBy(Expression orderBy) {
+               return partitionBy().orderBy(orderBy);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindow.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindow.java
new file mode 100644
index 0000000..852144d
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindow.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An over window specification.
+ *
+ * <p>Similar to SQL, over window aggregates compute an aggregate for each 
input row over a range
+ * of its neighboring rows.
+ */
+@PublicEvolving
+public final class OverWindow {
+
+       private final Expression alias;
+       private final List<Expression> partitioning;
+       private final Expression order;
+       private final Expression preceding;
+       private final Optional<Expression> following;
+
+       OverWindow(
+               Expression alias,
+               List<Expression> partitionBy,
+               Expression orderBy,
+               Expression preceding,
+               Optional<Expression> following) {
+               this.alias = alias;
+               this.partitioning = partitionBy;
+               this.order = orderBy;
+               this.preceding = preceding;
+               this.following = following;
+       }
+
+       public Expression getAlias() {
+               return alias;
+       }
+
+       public List<Expression> getPartitioning() {
+               return partitioning;
+       }
+
+       public Expression getOrder() {
+               return order;
+       }
+
+       public Expression getPreceding() {
+               return preceding;
+       }
+
+       public Optional<Expression> getFollowing() {
+               return following;
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
new file mode 100644
index 0000000..21f6292
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitioned.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+import java.util.List;
+
+/**
+ * Partially defined over window with partitioning.
+ */
+@PublicEvolving
+public final class OverWindowPartitioned {
+
+       /** Defines a partitioning of the input on one or more attributes. */
+       private final List<Expression> partitionBy;
+
+       OverWindowPartitioned(List<Expression> partitionBy) {
+               this.partitionBy = partitionBy;
+       }
+
+       /**
+        * Specifies the time attribute on which rows are ordered.
+        *
+        * <p>For streaming tables, reference a rowtime or proctime time 
attribute here
+        * to specify the time mode.
+        *
+        * <p>For batch tables, refer to a timestamp or long attribute.
+        *
+        * @param orderBy field reference
+        * @return an over window with defined order
+        */
+       public OverWindowPartitionedOrdered orderBy(String orderBy) {
+               return this.orderBy(ExpressionParser.parseExpression(orderBy));
+       }
+
+       /**
+        * Specifies the time attribute on which rows are ordered.
+        *
+        * <p>For streaming tables, reference a rowtime or proctime time 
attribute here
+        * to specify the time mode.
+        *
+        * <p>For batch tables, refer to a timestamp or long attribute.
+        *
+        * @param orderBy field reference
+        * @return an over window with defined order
+        */
+       public OverWindowPartitionedOrdered orderBy(Expression orderBy) {
+               return new OverWindowPartitionedOrdered(partitionBy, orderBy);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
new file mode 100644
index 0000000..04b3e75
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrdered.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Partially defined over window with (optional) partitioning and order.
+ */
+@PublicEvolving
+public final class OverWindowPartitionedOrdered {
+
+       private final List<Expression> partitionBy;
+       private final Expression orderBy;
+
+       OverWindowPartitionedOrdered(List<Expression> partitionBy, Expression 
orderBy) {
+               this.partitionBy = partitionBy;
+               this.orderBy = orderBy;
+       }
+
+       /**
+        * Set the preceding offset (based on time or row-count intervals) for 
over window.
+        *
+        * @param preceding preceding offset relative to the current row.
+        * @return an over window with defined preceding
+        */
+       public OverWindowPartitionedOrderedPreceding preceding(String 
preceding) {
+               return 
this.preceding(ExpressionParser.parseExpression(preceding));
+       }
+
+       /**
+        * Set the preceding offset (based on time or row-count intervals) for 
over window.
+        *
+        * @param preceding preceding offset relative to the current row.
+        * @return an over window with defined preceding
+        */
+       public OverWindowPartitionedOrderedPreceding preceding(Expression 
preceding) {
+               return new OverWindowPartitionedOrderedPreceding(partitionBy, 
orderBy, preceding);
+       }
+
+       /**
+        * Assigns an alias for this window that the following {@code select()} 
clause can refer to.
+        *
+        * @param alias alias for this over window
+        * @return the fully defined over window
+        */
+       public OverWindow as(String alias) {
+               return as(ExpressionParser.parseExpression(alias));
+       }
+
+       /**
+        * Assigns an alias for this window that the following {@code select()} 
clause can refer to.
+        *
+        * @param alias alias for this over window
+        * @return the fully defined over window
+        */
+       public OverWindow as(Expression alias) {
+               return new OverWindow(
+                       alias,
+                       partitionBy,
+                       orderBy,
+                       new 
CallExpression(BuiltInFunctionDefinitions.UNBOUNDED_RANGE, 
Collections.emptyList()),
+                       Optional.empty());
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
new file mode 100644
index 0000000..f87a210
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/OverWindowPartitionedOrderedPreceding.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Partially defined over window with (optional) partitioning, order, and 
preceding.
+ */
+@PublicEvolving
+public final class OverWindowPartitionedOrderedPreceding {
+
+       private final List<Expression> partitionBy;
+       private final Expression orderBy;
+       private final Expression preceding;
+       private Optional<Expression> optionalFollowing = Optional.empty();
+
+       OverWindowPartitionedOrderedPreceding(
+               List<Expression> partitionBy,
+               Expression orderBy,
+               Expression preceding) {
+               this.partitionBy = partitionBy;
+               this.orderBy = orderBy;
+               this.preceding = preceding;
+       }
+
+       /**
+        * Assigns an alias for this window that the following {@code select()} 
clause can refer to.
+        *
+        * @param alias alias for this over window
+        * @return the fully defined over window
+        */
+       public OverWindow as(String alias) {
+               return as(ExpressionParser.parseExpression(alias));
+       }
+
+       /**
+        * Assigns an alias for this window that the following {@code select()} 
clause can refer to.
+        *
+        * @param alias alias for this over window
+        * @return the fully defined over window
+        */
+       public OverWindow as(Expression alias) {
+               return new OverWindow(alias, partitionBy, orderBy, preceding, 
optionalFollowing);
+       }
+
+       /**
+        * Set the following offset (based on time or row-count intervals) for 
over window.
+        *
+        * @param following following offset that relative to the current row.
+        * @return an over window with defined following
+        */
+       public OverWindowPartitionedOrderedPreceding following(String 
following) {
+               return 
this.following(ExpressionParser.parseExpression(following));
+       }
+
+       /**
+        * Set the following offset (based on time or row-count intervals) for 
over window.
+        *
+        * @param following following offset that relative to the current row.
+        * @return an over window with defined following
+        */
+       public OverWindowPartitionedOrderedPreceding following(Expression 
following) {
+               optionalFollowing = Optional.of(following);
+               return this;
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
new file mode 100644
index 0000000..2d27548
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Session.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+/**
+ * Helper class for creating a session window. The boundary of session windows 
are defined by
+ * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
+ * gap period.
+ *
+ * <p>Java Example:
+ *
+ * <pre>
+ * {@code
+ *    Session.withGap("10.minutes").on("rowtime").as("w")
+ * }
+ * </pre>
+ *
+ * <p>Scala Example:
+ *
+ * <pre>
+ * {@code
+ *    Session withGap 10.minutes on 'rowtime as 'w
+ * }
+ * </pre>
+ */
+@PublicEvolving
+public final class Session {
+
+       /**
+        * Creates a session window. The boundary of session windows are 
defined by
+        * intervals of inactivity, i.e., a session window is closes if no 
event appears for a defined
+        * gap period.
+        *
+        * @param gap specifies how long (as interval of milliseconds) to wait 
for new data before
+        *            closing the session window.
+        * @return a partially defined session window
+        */
+       public static SessionWithGap withGap(String gap) {
+               return withGap(ExpressionParser.parseExpression(gap));
+       }
+
+       /**
+        * Creates a session window. The boundary of session windows are 
defined by
+        * intervals of inactivity, i.e., a session window is closes if no 
event appears for a defined
+        * gap period.
+        *
+        * @param gap specifies how long (as interval of milliseconds) to wait 
for new data before
+        *            closing the session window.
+        * @return a partially defined session window
+        */
+       public static SessionWithGap withGap(Expression gap) {
+               return new SessionWithGap(gap);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
new file mode 100644
index 0000000..6e646f7
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGap.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+/**
+ * Session window.
+ *
+ * <p>For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
+ *
+ * <p>For batch tables you can specify grouping on a timestamp or long 
attribute.
+ */
+@PublicEvolving
+public final class SessionWithGap {
+
+       /** The time interval of inactivity before a window is closed. */
+       private final Expression gap;
+
+       SessionWithGap(Expression gap) {
+               this.gap = gap;
+       }
+
+       /**
+        * Specifies the time attribute on which rows are grouped.
+        *
+        * <p>For streaming tables you can specify grouping by a event-time or 
processing-time
+        * attribute.
+        *
+        * <p>For batch tables you can specify grouping on a timestamp or long 
attribute.
+        *
+        * @param timeField time attribute for streaming and batch tables
+        * @return a tumbling window on event-time
+        */
+       public SessionWithGapOnTime on(String timeField) {
+               return on(ExpressionParser.parseExpression(timeField));
+       }
+
+       /**
+        * Specifies the time attribute on which rows are grouped.
+        *
+        * <p>For streaming tables you can specify grouping by a event-time or 
processing-time
+        * attribute.
+        *
+        * <p>For batch tables you can specify grouping on a timestamp or long 
attribute.
+        *
+        * @param timeField time attribute for streaming and batch tables
+        * @return a tumbling window on event-time
+        */
+       public SessionWithGapOnTime on(Expression timeField) {
+               return new SessionWithGapOnTime(timeField, gap);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
new file mode 100644
index 0000000..53c9c4f
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTime.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+/**
+ * Session window on time.
+ */
+@PublicEvolving
+public final class SessionWithGapOnTime {
+
+       private final Expression timeField;
+       private final Expression gap;
+
+       SessionWithGapOnTime(Expression timeField, Expression gap) {
+               this.timeField = timeField;
+               this.gap = gap;
+       }
+
+       /**
+        * Assigns an alias for this window that the following {@code 
groupBy()} and {@code select()}
+        * clause can refer to. {@code select()} statement can access window 
properties such as window
+        * start or end time.
+        *
+        * @param alias alias for this window
+        * @return this window
+        */
+       public SessionWithGapOnTimeWithAlias as(String alias) {
+               return as(ExpressionParser.parseExpression(alias));
+       }
+
+       /**
+        * Assigns an alias for this window that the following {@code 
groupBy()} and {@code select()}
+        * clause can refer to. {@code select()} statement can access window 
properties such as window
+        * start or end time.
+        *
+        * @param alias alias for this window
+        * @return this window
+        */
+       public SessionWithGapOnTimeWithAlias as(Expression alias) {
+               return new SessionWithGapOnTimeWithAlias(alias, timeField, gap);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
new file mode 100644
index 0000000..891ea7c
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SessionWithGapOnTimeWithAlias.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+
+/**
+ * Session window on time with alias. Fully specifies a window.
+ */
+@PublicEvolving
+public final class SessionWithGapOnTimeWithAlias extends GroupWindow {
+
+       private final Expression gap;
+
+       SessionWithGapOnTimeWithAlias(Expression alias, Expression timeField, 
Expression gap) {
+               super(alias, timeField);
+               this.gap = gap;
+       }
+
+       public Expression getGap() {
+               return gap;
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
new file mode 100644
index 0000000..85f79ce
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Slide.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+/**
+ * Helper class for creating a sliding window. Sliding windows have a fixed 
size and slide by
+ * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
+ * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
+ *
+ * <p>For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
+ * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
+ * consecutive window evaluations.
+ *
+ * <p>Java Example:
+ *
+ * <pre>
+ * {@code
+ *    Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w")
+ * }
+ * </pre>
+ *
+ * <p>Scala Example:
+ *
+ * <pre>
+ * {@code
+ *    Slide over 10.minutes every 5.minutes on 'rowtime as 'w
+ * }
+ * </pre>
+ */
+@PublicEvolving
+public final class Slide {
+
+       /**
+        * Creates a sliding window. Sliding windows have a fixed size and 
slide by
+        * a specified slide interval. If the slide interval is smaller than 
the window size, sliding
+        * windows are overlapping. Thus, an element can be assigned to 
multiple windows.
+        *
+        * <p>For example, a sliding window of size 15 minutes with 5 minutes 
sliding interval groups
+        * elements of 15 minutes and evaluates every five minutes. Each 
element is contained in three
+        * consecutive window evaluations.
+        *
+        * @param size the size of the window as time or row-count interval
+        * @return a partially specified sliding window
+        */
+       public static SlideWithSize over(String size) {
+               return over(ExpressionParser.parseExpression(size));
+       }
+
+       /**
+        * Creates a sliding window. Sliding windows have a fixed size and 
slide by
+        * a specified slide interval. If the slide interval is smaller than 
the window size, sliding
+        * windows are overlapping. Thus, an element can be assigned to 
multiple windows.
+        *
+        * <p>For example, a sliding window of size 15 minutes with 5 minutes 
sliding interval groups
+        * elements of 15 minutes and evaluates every five minutes. Each 
element is contained in three
+        * consecutive
+        *
+        * @param size the size of the window as time or row-count interval
+        * @return a partially specified sliding window
+        */
+       public static SlideWithSize over(Expression size) {
+               return new SlideWithSize(size);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
new file mode 100644
index 0000000..44470f9
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSize.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+/**
+ * Partially specified sliding window. The size of the window either as time 
or row-count interval.
+ */
+@PublicEvolving
+public final class SlideWithSize {
+
+       /** The size of the window either as time or row-count interval. */
+       private final Expression size;
+
+       SlideWithSize(Expression size) {
+               this.size = size;
+       }
+
+       /**
+        * Specifies the window's slide as time or row-count interval.
+        *
+        * <p>The slide determines the interval in which windows are started. 
Hence, sliding windows can
+        * overlap if the slide is smaller than the size of the window.
+        *
+        * <p>For example, you could have windows of size 15 minutes that slide 
by 3 minutes. With this
+        * 15 minutes worth of elements are grouped every 3 minutes and each 
row contributes to 5
+        * windows.
+        *
+        * @param slide the slide of the window either as time or row-count 
interval.
+        * @return a sliding window
+        */
+       public SlideWithSizeAndSlide every(String slide) {
+               return every(ExpressionParser.parseExpression(slide));
+       }
+
+       /**
+        * Specifies the window's slide as time or row-count interval.
+        *
+        * <p>The slide determines the interval in which windows are started. 
Hence, sliding windows can
+        * overlap if the slide is smaller than the size of the window.
+        *
+        * <p>For example, you could have windows of size 15 minutes that slide 
by 3 minutes. With this
+        * 15 minutes worth of elements are grouped every 3 minutes and each 
row contributes to 5
+        * windows.
+        *
+        * @param slide the slide of the window either as time or row-count 
interval.
+        * @return a sliding window
+        */
+       public SlideWithSizeAndSlide every(Expression slide) {
+               return new SlideWithSizeAndSlide(size, slide);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
new file mode 100644
index 0000000..4d509d1
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlide.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+/**
+ * Sliding window. The size of the window either as time or row-count interval.
+ *
+ * <p>For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
+ *
+ * <p>For batch tables you can specify grouping on a timestamp or long 
attribute.
+ */
+@PublicEvolving
+public final class SlideWithSizeAndSlide {
+
+       /** The size of the window either as time or row-count interval. */
+       private final Expression size;
+       private final Expression slide;
+
+       SlideWithSizeAndSlide(Expression size, Expression slide) {
+               this.size = size;
+               this.slide = slide;
+       }
+
+       /**
+        * Specifies the time attribute on which rows are grouped.
+        *
+        * <p>For streaming tables you can specify grouping by a event-time or 
processing-time
+        * attribute.
+        *
+        * <p>For batch tables you can specify grouping on a timestamp or long 
attribute.
+        *
+        * @param timeField time attribute for streaming and batch tables
+        * @return a tumbling window on event-time
+        */
+       public SlideWithSizeAndSlideOnTime on(String timeField) {
+               return on(ExpressionParser.parseExpression(timeField));
+       }
+
+       /**
+        * Specifies the time attribute on which rows are grouped.
+        *
+        * <p>For streaming tables you can specify grouping by a event-time or 
processing-time
+        * attribute.
+        *
+        * <p>For batch tables you can specify grouping on a timestamp or long 
attribute.
+        *
+        * @param timeField time attribute for streaming and batch tables
+        * @return a tumbling window on event-time
+        */
+       public SlideWithSizeAndSlideOnTime on(Expression timeField) {
+               return new SlideWithSizeAndSlideOnTime(timeField, size, slide);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
new file mode 100644
index 0000000..604b9cd
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTime.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+/**
+ * Sliding window on time.
+ */
+@PublicEvolving
+public final class SlideWithSizeAndSlideOnTime {
+
+       private final Expression timeField;
+       private final Expression size;
+       private final Expression slide;
+
+       SlideWithSizeAndSlideOnTime(
+               Expression timeField,
+               Expression size,
+               Expression slide) {
+               this.timeField = timeField;
+               this.size = size;
+               this.slide = slide;
+       }
+
+       /**
+        * Assigns an alias for this window that the following {@code 
groupBy()} and {@code select()}
+        * clause can refer to. {@code select()} statement can access window 
properties such as window
+        * start or end time.
+        *
+        * @param alias alias for this window
+        * @return this window
+        */
+       public SlideWithSizeAndSlideOnTimeWithAlias as(String alias) {
+               return as(ExpressionParser.parseExpression(alias));
+       }
+
+       /**
+        * Assigns an alias for this window that the following {@code 
groupBy()} and {@code select()}
+        * clause can refer to. {@code select()} statement can access window 
properties such as window
+        * start or end time.
+        *
+        * @param alias alias for this window
+        * @return this window
+        */
+       public SlideWithSizeAndSlideOnTimeWithAlias as(Expression alias) {
+               return new SlideWithSizeAndSlideOnTimeWithAlias(alias, 
timeField, size, slide);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
new file mode 100644
index 0000000..50b3692
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/SlideWithSizeAndSlideOnTimeWithAlias.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+
+/**
+ * Sliding window on time with alias. Fully specifies a window.
+ */
+@PublicEvolving
+public final class SlideWithSizeAndSlideOnTimeWithAlias extends GroupWindow {
+
+       private final Expression size;
+       private final Expression slide;
+
+       SlideWithSizeAndSlideOnTimeWithAlias(
+               Expression alias,
+               Expression timeField,
+               Expression size,
+               Expression slide) {
+               super(alias, timeField);
+               this.size = size;
+               this.slide = slide;
+       }
+
+       public Expression getSize() {
+               return size;
+       }
+
+       public Expression getSlide() {
+               return slide;
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
new file mode 100644
index 0000000..0b5d6b5
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Tumble.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+/**
+ * Helper class for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
+ * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
+ * elements in 5 minutes intervals.
+ *
+ * <p>Java Example:
+ *
+ * <pre>
+ * {@code
+ *    Tumble.over("10.minutes").on("rowtime").as("w")
+ * }
+ * </pre>
+ *
+ * <p>Scala Example:
+ *
+ * <pre>
+ * {@code
+ *    Tumble over 5.minutes on 'rowtime as 'w
+ * }
+ * </pre>
+ */
+@PublicEvolving
+public final class Tumble {
+
+       /**
+        * Creates a tumbling window. Tumbling windows are fixed-size, 
consecutive, non-overlapping
+        * windows of a specified fixed length. For example, a tumbling window 
of 5 minutes size groups
+        * elements in 5 minutes intervals.
+        *
+        * @param size the size of the window as time or row-count interval.
+        * @return a partially defined tumbling window
+        */
+       public static TumbleWithSize over(String size) {
+               return over(ExpressionParser.parseExpression(size));
+       }
+
+       /**
+        * Creates a tumbling window. Tumbling windows are fixed-size, 
consecutive, non-overlapping
+        * windows of a specified fixed length. For example, a tumbling window 
of 5 minutes size groups
+        * elements in 5 minutes intervals.
+        *
+        * @param size the size of the window as time or row-count interval.
+        * @return a partially defined tumbling window
+        */
+       public static TumbleWithSize over(Expression size) {
+               return new TumbleWithSize(size);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
new file mode 100644
index 0000000..600e3a1
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSize.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+/**
+ * Tumbling window.
+ *
+ * <p>For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
+ *
+ * <p>For batch tables you can specify grouping on a timestamp or long 
attribute.
+ */
+@PublicEvolving
+public final class TumbleWithSize {
+
+       /** The size of the window either as time or row-count interval. */
+       private Expression size;
+
+       TumbleWithSize(Expression size) {
+               this.size = size;
+       }
+
+       /**
+        * Specifies the time attribute on which rows are grouped.
+        *
+        * <p>For streaming tables you can specify grouping by a event-time or 
processing-time
+        * attribute.
+        *
+        * <p>For batch tables you can specify grouping on a timestamp or long 
attribute.
+        *
+        * @param timeField time attribute for streaming and batch tables
+        * @return a tumbling window on event-time
+        */
+       public TumbleWithSizeOnTime on(Expression timeField) {
+               return new TumbleWithSizeOnTime(timeField, size);
+       }
+
+       /**
+        * Specifies the time attribute on which rows are grouped.
+        *
+        * <p>For streaming tables you can specify grouping by a event-time or 
processing-time
+        * attribute.
+        *
+        * <p>For batch tables you can specify grouping on a timestamp or long 
attribute.
+        *
+        * @param timeField time attribute for streaming and batch tables
+        * @return a tumbling window on event-time
+        */
+       public TumbleWithSizeOnTime on(String timeField) {
+               return on(ExpressionParser.parseExpression(timeField));
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
new file mode 100644
index 0000000..f635ba5
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTime.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionParser;
+
+/**
+ * Tumbling window on time.
+ */
+@PublicEvolving
+public final class TumbleWithSizeOnTime {
+
+       private final Expression time;
+       private final Expression size;
+
+       TumbleWithSizeOnTime(Expression time, Expression size) {
+               this.time = time;
+               this.size = size;
+       }
+
+       /**
+        * Assigns an alias for this window that the following {@code 
groupBy()} and {@code select()}
+        * clause can refer to. {@code select()} statement can access window 
properties such as window
+        * start or end time.
+        *
+        * @param alias alias for this window
+        * @return this window
+        */
+       public TumbleWithSizeOnTimeWithAlias as(Expression alias) {
+               return new TumbleWithSizeOnTimeWithAlias(alias, time, size);
+       }
+
+       /**
+        * Assigns an alias for this window that the following {@code 
groupBy()} and {@code select()}
+        * clause can refer to. {@code select()} statement can access window 
properties such as window
+        * start or end time.
+        *
+        * @param alias alias for this window
+        * @return this window
+        */
+       public TumbleWithSizeOnTimeWithAlias as(String alias) {
+               return as(ExpressionParser.parseExpression(alias));
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
new file mode 100644
index 0000000..5180d33
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TumbleWithSizeOnTimeWithAlias.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.expressions.Expression;
+
+/**
+ * Tumbling window on time with alias. Fully specifies a window.
+ */
+@PublicEvolving
+public final class TumbleWithSizeOnTimeWithAlias extends GroupWindow {
+
+       private final Expression size;
+
+       TumbleWithSizeOnTimeWithAlias(Expression alias, Expression timeField, 
Expression size) {
+               super(alias, timeField);
+               this.size = size;
+       }
+
+       public Expression getSize() {
+               return size;
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
new file mode 100644
index 0000000..d576cf5
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ExpressionParser.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.List;
+
+/**
+ * Parser for expressions inside a String. This parses exactly the same 
expressions that
+ * would be accepted by the Scala Expression DSL.
+ *
+ * <p>{@link ExpressionParser} use {@link PlannerExpressionParser} to parse 
expressions.
+ */
+@Internal
+public final class ExpressionParser {
+
+       public static Expression parseExpression(String exprString) {
+               return 
PlannerExpressionParser.create().parseExpression(exprString);
+       }
+
+       public static List<Expression> parseExpressionList(String expression) {
+               return 
PlannerExpressionParser.create().parseExpressionList(expression);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/PlannerExpressionParser.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/PlannerExpressionParser.java
new file mode 100644
index 0000000..c347025
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/PlannerExpressionParser.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+/**
+ * Parser for expressions inside a String. This parses exactly the same 
expressions that
+ * would be accepted by the Scala Expression DSL.
+ *
+ * <p>{@link PlannerExpressionParser} is used by {@link ExpressionParser} to 
parse expressions.
+ */
+@Internal
+public interface PlannerExpressionParser {
+
+       static PlannerExpressionParser create() {
+               return SingletonPlannerExpressionParser.getExpressionParser();
+       }
+
+       Expression parseExpression(String exprString);
+
+       List<Expression> parseExpressionList(String expression);
+
+       /**
+        * Util class to create {@link PlannerExpressionParser} instance. Use 
singleton pattern to avoid
+        * creating many {@link PlannerExpressionParser}.
+        */
+       class SingletonPlannerExpressionParser {
+
+               private static volatile PlannerExpressionParser 
expressionParser;
+
+               private SingletonPlannerExpressionParser() {}
+
+               public static PlannerExpressionParser getExpressionParser() {
+
+                       if (expressionParser == null) {
+                               try {
+                                       Class<?> clazz = 
Class.forName("org.apache.flink.table.expressions.PlannerExpressionParserImpl");
+                                       Constructor<?> con = 
clazz.getConstructor();
+                                       expressionParser = 
(PlannerExpressionParser) con.newInstance();
+                               } catch (Throwable t) {
+                                       throw new TableException("Construction 
of PlannerExpressionParserImpl class failed.", t);
+                               }
+                       }
+                       return expressionParser;
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java
new file mode 100644
index 0000000..e29d813
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/WindowCreationValidationTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Test failures for the creation of window.
+ */
+public class WindowCreationValidationTest {
+
+       @Rule
+       public final ExpectedException exception = ExpectedException.none();
+
+       @Test
+       public void testTumbleOverForString() {
+               exception.expect(TableException.class);
+               exception.expectMessage("Construction of 
PlannerExpressionParserImpl class failed.");
+               Tumble.over("4.hours");
+       }
+
+       @Test
+       public void testSlideOverForString() {
+               exception.expect(TableException.class);
+               exception.expectMessage("Construction of 
PlannerExpressionParserImpl class failed.");
+               Slide.over("4.hours");
+       }
+
+       @Test
+       public void testSessionWithGapForString() {
+               exception.expect(TableException.class);
+               exception.expectMessage("Construction of 
PlannerExpressionParserImpl class failed.");
+               Session.withGap("4.hours");
+       }
+
+       @Test
+       public void testOverWithPartitionByForString() {
+               exception.expect(TableException.class);
+               exception.expectMessage("Construction of 
PlannerExpressionParserImpl class failed.");
+               Over.partitionBy("a");
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
index e1d02b1..8c06a37 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
@@ -24,6 +24,8 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
 
+import _root_.scala.collection.JavaConverters._
+
 /**
   * The [[TableEnvironment]] for a Java batch [[ExecutionEnvironment]] that 
works
   * with [[DataSet]]s.
@@ -80,7 +82,7 @@ class BatchTableEnvironment @Deprecated() (
     */
   def fromDataSet[T](dataSet: DataSet[T], fields: String): Table = {
     val exprs = ExpressionParser
-      .parseExpressionList(fields)
+      .parseExpressionList(fields).asScala
       .toArray
 
     val name = createUniqueTableName()
@@ -124,7 +126,7 @@ class BatchTableEnvironment @Deprecated() (
     */
   def registerDataSet[T](name: String, dataSet: DataSet[T], fields: String): 
Unit = {
     val exprs = ExpressionParser
-      .parseExpressionList(fields)
+      .parseExpressionList(fields).asScala
       .toArray
 
     checkValidTableName(name)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index 59f5d2a..f14863f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import _root_.java.lang.{Boolean => JBool}
+import _root_.scala.collection.JavaConverters._
 
 /**
   * The [[TableEnvironment]] for a Java [[StreamExecutionEnvironment]] that 
works with
@@ -84,7 +85,7 @@ class StreamTableEnvironment @Deprecated() (
     */
   def fromDataStream[T](dataStream: DataStream[T], fields: String): Table = {
     val exprs = ExpressionParser
-      .parseExpressionList(fields)
+      .parseExpressionList(fields).asScala
       .toArray
 
     val name = createUniqueTableName()
@@ -129,7 +130,7 @@ class StreamTableEnvironment @Deprecated() (
     */
   def registerDataStream[T](name: String, dataStream: DataStream[T], fields: 
String): Unit = {
     val exprs = ExpressionParser
-      .parseExpressionList(fields)
+      .parseExpressionList(fields).asScala
       .toArray
 
     checkValidTableName(name)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
deleted file mode 100644
index 24f39a1..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/windows.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java
-
-import org.apache.flink.table.api._
-
-/**
-  * Helper class for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
-  * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
-  * elements in 5 minutes intervals.
-  *
-  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Tumble]].
-  */
-@Deprecated
-object Tumble extends TumbleBase
-
-/**
-  * Helper class for creating a sliding window. Sliding windows have a fixed 
size and slide by
-  * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-  * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-  *
-  * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups elements
-  * of 15 minutes and evaluates every five minutes. Each element is contained 
in three consecutive
-  * window evaluations.
-  *
-  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Slide]].
-  */
-@Deprecated
-object Slide extends SlideBase
-
-/**
-  * Helper class for creating a session window. The boundary of session 
windows are defined by
-  * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-  * gap period.
-  *
-  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Session]].
-  */
-@Deprecated
-object Session extends SessionBase
-
-/**
-  * Helper class for creating an over window. Similar to SQL, over window 
aggregates compute an
-  * aggregate for each input row over a range of its neighboring rows.
-  *
-  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Over]].
-  */
-@Deprecated
-object Over extends OverBase
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index edeaf1d..c252fe5 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -22,7 +22,7 @@ import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.api.{Over, Table, ValidationException}
 import org.apache.flink.table.expressions.ApiExpressionUtils._
 import org.apache.flink.table.expressions.BuiltInFunctionDefinitions.{E => 
FDE, UUID => FDUUID, _}
 import org.apache.flink.table.expressions._
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
deleted file mode 100644
index 4c2a5dd..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/windows.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala
-
-import org.apache.flink.table.api._
-
-/**
-  * Helper object for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
-  * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
-  * elements in 5 minutes intervals.
-  *
-  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Tumble]].
-  */
-@deprecated(
-  "This class will be replaced by org.apache.flink.table.api.Tumble.", "1.8")
-object Tumble extends TumbleBase
-
-/**
-  * Helper object for creating a sliding window. Sliding windows have a fixed 
size and slide by
-  * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-  * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-  *
-  * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups elements
-  * of 15 minutes and evaluates every five minutes. Each element is contained 
in three consecutive
-  * window evaluations.
-  *
-  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Slide]].
-  */
-@deprecated(
-  "This class will be replaced by org.apache.flink.table.api.Slide.", "1.8")
-object Slide extends SlideBase
-
-/**
-  * Helper object for creating a session window. The boundary of session 
windows are defined by
-  * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-  * gap period.
-  *
-  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Session]].
-  */
-@deprecated(
-  "This class will be replaced by org.apache.flink.table.api.Session.", "1.8")
-object Session extends SessionBase
-
-/**
-  * Helper class for creating an over window. Similar to SQL, over window 
aggregates compute an
-  * aggregate for each input row over a range of its neighboring rows.
-  *
-  * @deprecated This class will be replaced by 
[[org.apache.flink.table.api.Over]].
-  */
-@deprecated(
-  "This class will be replaced by org.apache.flink.table.api.Over.", "1.8")
-object Over extends OverBase
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
index 2c3738a..e66d14b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/table.scala
@@ -27,9 +27,11 @@ import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.plan.ProjectionTranslator._
 import org.apache.flink.table.plan.logical.{Minus, _}
 import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.util.JavaScalaConversionUtil
 
 import _root_.scala.annotation.varargs
 import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.JavaConversions._
 
 /**
   * A Table is the core component of the Table API.
@@ -1089,35 +1091,7 @@ class Table(
     * If the `groupBy(...)` only references a window alias, the streamed table 
will be processed
     * by a single task, i.e., with parallelism 1.
     *
-    * @param window window that specifies how elements are grouped.
-    * @return A windowed table.
-    *
-    * @deprecated Will be removed in a future release. Please use 
Table.window(window: GroupWindow)
-    *             instead.
-    */
-  @deprecated(
-    "This method will be removed. Please use Table.window(window: GroupWindow) 
instead.",
-    "1.8")
-  @Deprecated
-  def window(window: Window): WindowedTable = {
-    new WindowedTable(this, window)
-  }
-
-  /**
-    * Groups the records of a table by assigning them to windows defined by a 
time or row interval.
-    *
-    * For streaming tables of infinite size, grouping into windows is required 
to define finite
-    * groups on which group-based aggregates can be computed.
-    *
-    * For batch tables of finite size, windowing essentially provides 
shortcuts for time-based
-    * groupBy.
-    *
-    * __Note__: Computing windowed aggregates on a streaming table is only a 
parallel operation
-    * if additional grouping attributes are added to the `groupBy(...)` clause.
-    * If the `groupBy(...)` only references a window alias, the streamed table 
will be processed
-    * by a single task, i.e., with parallelism 1.
-    *
-    * @param window group window that specifies how elements are grouped.
+    * @param window groupWindow that specifies how elements are grouped.
     * @return A group windowed table.
     */
   def window(window: GroupWindow): GroupWindowedTable = {
@@ -1231,16 +1205,11 @@ class GroupedTable(
 }
 
 /**
-  * A table that has been windowed for grouping [[Window]]s.
-  *
-  * @deprecated Will be replaced by [[GroupWindowedTable]].
+  * A table that has been windowed for [[GroupWindow]]s.
   */
-@Deprecated
-@deprecated(
-  "This class will be replaced by GroupWindowedTable.", "1.8")
-class WindowedTable(
+class GroupWindowedTable(
     private[flink] val table: Table,
-    private[flink] val window: Window) {
+    private[flink] val window: GroupWindow) {
 
   /**
     * Groups the elements by a mandatory window and one or more optional 
grouping attributes.
@@ -1289,27 +1258,12 @@ class WindowedTable(
 }
 
 /**
-  * A table that has been windowed for [[GroupWindow]]s.
-  */
-class GroupWindowedTable(
-    override private[flink] val table: Table,
-    override private[flink] val window: GroupWindow)
-  extends WindowedTable(table, window)
-
-/**
   * A table that has been windowed and grouped for [[GroupWindow]]s.
-  *
-  * @deprecated The constructor contains [[Window]] parameter will be removed. 
Use constructor
-  *             with [[GroupWindow]] instead.
   */
-class WindowGroupedTable @Deprecated() (
+class WindowGroupedTable(
     private[flink] val table: Table,
     private[flink] val groupKeys: Seq[Expression],
-    private[flink] val window: Window) {
-
-  def this(table: Table, groupKeys: Seq[Expression], window: GroupWindow) {
-    this(table, groupKeys, window.asInstanceOf[Window])
-  }
+    private[flink] val window: GroupWindow) {
 
   /**
     * Performs a selection operation on a window grouped table. Similar to an 
SQL SELECT statement.
@@ -1466,7 +1420,7 @@ class OverWindowedTable(
       overWindow.getPartitioning.map(table.expressionBridge.bridge),
       table.expressionBridge.bridge(overWindow.getOrder),
       table.expressionBridge.bridge(overWindow.getPreceding),
-      overWindow.getFollowing.map(table.expressionBridge.bridge)
+      
JavaScalaConversionUtil.toScala(overWindow.getFollowing).map(table.expressionBridge.bridge)
     )
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
deleted file mode 100644
index 948a620..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/windows.scala
+++ /dev/null
@@ -1,741 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api
-
-import org.apache.flink.table.expressions._
-
-/**
-  * An over window specification.
-  *
-  * Similar to SQL, over window aggregates compute an aggregate for each input 
row over a range
-  * of its neighboring rows.
-  */
-class OverWindow(
-    alias: Expression,
-    partitionBy: Seq[Expression],
-    orderBy: Expression,
-    preceding: Expression,
-    following: Option[Expression]) {
-
-  def getAlias: Expression = alias
-
-  def getPartitioning: Seq[Expression] = partitionBy
-
-  def getOrder: Expression = orderBy
-
-  def getPreceding: Expression = preceding
-
-  def getFollowing: Option[Expression] = following
-}
-
-// 
------------------------------------------------------------------------------------------------
-// Over windows
-// 
------------------------------------------------------------------------------------------------
-
-/**
-  * Partially defined over window with partitioning.
-  */
-class OverWindowPartitioned(partitionBy: Seq[Expression]) {
-
-  /**
-    * Specifies the time attribute on which rows are ordered.
-    *
-    * For streaming tables, reference a rowtime or proctime time attribute here
-    * to specify the time mode.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param orderBy field reference
-    * @return an over window with defined order
-    */
-  def orderBy(orderBy: String): OverWindowPartitionedOrdered = {
-    this.orderBy(ExpressionParser.parseExpression(orderBy))
-  }
-
-  /**
-    * Specifies the time attribute on which rows are ordered.
-    *
-    * For streaming tables, reference a rowtime or proctime time attribute here
-    * to specify the time mode.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param orderBy field reference
-    * @return an over window with defined order
-    */
-  def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = {
-    new OverWindowPartitionedOrdered(partitionBy, orderBy)
-  }
-}
-
-/**
-  * Partially defined over window with (optional) partitioning and order.
-  */
-class OverWindowPartitionedOrdered(partitionBy: Seq[Expression], orderBy: 
Expression) {
-
-  /**
-    * Set the preceding offset (based on time or row-count intervals) for over 
window.
-    *
-    * @param preceding preceding offset relative to the current row.
-    * @return an over window with defined preceding
-    */
-  def preceding(preceding: String): OverWindowPartitionedOrderedPreceding = {
-    this.preceding(ExpressionParser.parseExpression(preceding))
-  }
-
-  /**
-    * Set the preceding offset (based on time or row-count intervals) for over 
window.
-    *
-    * @param preceding preceding offset relative to the current row.
-    * @return an over window with defined preceding
-    */
-  def preceding(preceding: Expression): OverWindowPartitionedOrderedPreceding 
= {
-    new OverWindowPartitionedOrderedPreceding(partitionBy, orderBy, preceding)
-  }
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause 
can refer to.
-    *
-    * @param alias alias for this over window
-    * @return the fully defined over window
-    */
-  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause 
can refer to.
-    *
-    * @param alias alias for this over window
-    * @return the fully defined over window
-    */
-  def as(alias: Expression): OverWindow = {
-    new OverWindow(alias, partitionBy, orderBy, UnboundedRange(), None)
-  }
-}
-
-/**
-  * Partially defined over window with (optional) partitioning, order, and 
preceding.
-  */
-class OverWindowPartitionedOrderedPreceding(
-    private val partitionBy: Seq[Expression],
-    private val orderBy: Expression,
-    private val preceding: Expression) {
-
-  private var optionalFollowing: Option[Expression] = None
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause 
can refer to.
-    *
-    * @param alias alias for this over window
-    * @return the fully defined over window
-    */
-  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
-
-  /**
-    * Assigns an alias for this window that the following `select()` clause 
can refer to.
-    *
-    * @param alias alias for this over window
-    * @return the fully defined over window
-    */
-  def as(alias: Expression): OverWindow = {
-    new OverWindow(alias, partitionBy, orderBy, preceding, optionalFollowing)
-  }
-
-  /**
-    * Set the following offset (based on time or row-count intervals) for over 
window.
-    *
-    * @param following following offset that relative to the current row.
-    * @return an over window with defined following
-    */
-  def following(following: String): OverWindowPartitionedOrderedPreceding = {
-    this.following(ExpressionParser.parseExpression(following))
-  }
-
-  /**
-    * Set the following offset (based on time or row-count intervals) for over 
window.
-    *
-    * @param following following offset that relative to the current row.
-    * @return an over window with defined following
-    */
-  def following(following: Expression): OverWindowPartitionedOrderedPreceding 
= {
-    optionalFollowing = Some(following)
-    this
-  }
-}
-
-// 
------------------------------------------------------------------------------------------------
-// Group windows
-// 
------------------------------------------------------------------------------------------------
-
-/**
-  * A group window specification.
-  *
-  * Group windows group rows based on time or row-count intervals and is 
therefore essentially a
-  * special type of groupBy. Just like groupBy, group windows allow to compute 
aggregates
-  * on groups of elements.
-  *
-  * Infinite streaming tables can only be grouped into time or row intervals. 
Hence window grouping
-  * is required to apply aggregations on streaming tables.
-  *
-  * For finite batch tables, group windows provide shortcuts for time-based 
groupBy.
-  *
-  * @deprecated Will be replaced by [[GroupWindow]]
-  */
-@Deprecated
-@deprecated(
-  "This class will be replaced by GroupWindow.", "1.8")
-abstract class Window(alias: Expression, timeField: Expression) {
-
-  def getAlias: Expression = {
-    alias
-  }
-
-  def getTimeField: Expression = {
-    timeField
-  }
-}
-
-/**
-  * A group window specification.
-  *
-  * Group windows group rows based on time or row-count intervals and is 
therefore essentially a
-  * special type of groupBy. Just like groupBy, group windows allow to compute 
aggregates
-  * on groups of elements.
-  *
-  * Infinite streaming tables can only be grouped into time or row intervals. 
Hence window grouping
-  * is required to apply aggregations on streaming tables.
-  *
-  * For finite batch tables, group windows provide shortcuts for time-based 
groupBy.
-  */
-abstract class GroupWindow(alias: Expression, timeField: Expression)
-  extends Window(alias, timeField)
-
-// 
------------------------------------------------------------------------------------------------
-// Tumbling windows
-// 
------------------------------------------------------------------------------------------------
-
-/**
-  * Tumbling window.
-  *
-  * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-  *
-  * For batch tables you can specify grouping on a timestamp or long attribute.
-  *
-  * @param size the size of the window either as time or row-count interval.
-  */
-class TumbleWithSize(size: Expression) {
-
-  /**
-    * Tumbling window.
-    *
-    * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-    *
-    * For batch tables you can specify grouping on a timestamp or long 
attribute.
-    *
-    * @param size the size of the window either as time or row-count interval.
-    */
-  def this(size: String) = this(ExpressionParser.parseExpression(size))
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-    *
-    * For batch tables you can specify grouping on a timestamp or long 
attribute.
-    *
-    * @param timeField time attribute for streaming and batch tables
-    * @return a tumbling window on event-time
-    */
-  def on(timeField: Expression): TumbleWithSizeOnTime =
-    new TumbleWithSizeOnTime(timeField, size)
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-    *
-    * For batch tables you can specify grouping on a timestamp or long 
attribute.
-    *
-    * @param timeField time attribute for streaming and batch tables
-    * @return a tumbling window on event-time
-    */
-  def on(timeField: String): TumbleWithSizeOnTime =
-    on(ExpressionParser.parseExpression(timeField))
-}
-
-/**
-  * Tumbling window on time.
-  */
-class TumbleWithSizeOnTime(time: Expression, size: Expression) {
-
-  /**
-    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
-    * refer to. `select()` statement can access window properties such as 
window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): TumbleWithSizeOnTimeWithAlias = {
-    new TumbleWithSizeOnTimeWithAlias(alias, time, size)
-  }
-
-  /**
-    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
-    * refer to. `select()` statement can access window properties such as 
window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: String): TumbleWithSizeOnTimeWithAlias = {
-    as(ExpressionParser.parseExpression(alias))
-  }
-}
-
-/**
-  * Tumbling window on time with alias. Fully specifies a window.
-  */
-class TumbleWithSizeOnTimeWithAlias(
-    alias: Expression,
-    timeField: Expression,
-    size: Expression)
-  extends GroupWindow(
-    alias,
-    timeField) {
-
-  def getSize: Expression = {
-    size
-  }
-}
-
-// 
------------------------------------------------------------------------------------------------
-// Sliding windows
-// 
------------------------------------------------------------------------------------------------
-
-/**
-  * Partially specified sliding window.
-  *
-  * @param size the size of the window either as time or row-count interval.
-  */
-class SlideWithSize(size: Expression) {
-
-  /**
-    * Partially specified sliding window.
-    *
-    * @param size the size of the window either as time or row-count interval.
-    */
-  def this(size: String) = this(ExpressionParser.parseExpression(size))
-
-  /**
-    * Specifies the window's slide as time or row-count interval.
-    *
-    * The slide determines the interval in which windows are started. Hence, 
sliding windows can
-    * overlap if the slide is smaller than the size of the window.
-    *
-    * For example, you could have windows of size 15 minutes that slide by 3 
minutes. With this
-    * 15 minutes worth of elements are grouped every 3 minutes and each row 
contributes to 5
-    * windows.
-    *
-    * @param slide the slide of the window either as time or row-count 
interval.
-    * @return a sliding window
-    */
-  def every(slide: Expression): SlideWithSizeAndSlide = new 
SlideWithSizeAndSlide(size, slide)
-
-  /**
-    * Specifies the window's slide as time or row-count interval.
-    *
-    * The slide determines the interval in which windows are started. Hence, 
sliding windows can
-    * overlap if the slide is smaller than the size of the window.
-    *
-    * For example, you could have windows of size 15 minutes that slide by 3 
minutes. With this
-    * 15 minutes worth of elements are grouped every 3 minutes and each row 
contributes to 5
-    * windows.
-    *
-    * @param slide the slide of the window either as time or row-count 
interval.
-    * @return a sliding window
-    */
-  def every(slide: String): SlideWithSizeAndSlide = 
every(ExpressionParser.parseExpression(slide))
-}
-
-/**
-  * Sliding window.
-  *
-  * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-  *
-  * For batch tables you can specify grouping on a timestamp or long attribute.
-  *
-  * @param size the size of the window either as time or row-count interval.
-  */
-class SlideWithSizeAndSlide(size: Expression, slide: Expression) {
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-    *
-    * For batch tables you can specify grouping on a timestamp or long 
attribute.
-    *
-    * @param timeField time attribute for streaming and batch tables
-    * @return a tumbling window on event-time
-    */
-  def on(timeField: Expression): SlideWithSizeAndSlideOnTime =
-    new SlideWithSizeAndSlideOnTime(timeField, size, slide)
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-    *
-    * For batch tables you can specify grouping on a timestamp or long 
attribute.
-    *
-    * @param timeField time attribute for streaming and batch tables
-    * @return a tumbling window on event-time
-    */
-  def on(timeField: String): SlideWithSizeAndSlideOnTime =
-    on(ExpressionParser.parseExpression(timeField))
-}
-
-/**
-  * Sliding window on time.
-  */
-class SlideWithSizeAndSlideOnTime(timeField: Expression, size: Expression, 
slide: Expression) {
-
-  /**
-    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
-    * refer to. `select()` statement can access window properties such as 
window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): SlideWithSizeAndSlideOnTimeWithAlias = {
-    new SlideWithSizeAndSlideOnTimeWithAlias(alias, timeField, size, slide)
-  }
-
-  /**
-    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
-    * refer to. `select()` statement can access window properties such as 
window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: String): SlideWithSizeAndSlideOnTimeWithAlias = {
-    as(ExpressionParser.parseExpression(alias))
-  }
-}
-
-/**
-  * Sliding window on time with alias. Fully specifies a window.
-  */
-class SlideWithSizeAndSlideOnTimeWithAlias(
-    alias: Expression,
-    timeField: Expression,
-    size: Expression,
-    slide: Expression)
-  extends GroupWindow(
-    alias,
-    timeField) {
-
-  def getSize: Expression = {
-    size
-  }
-
-  def getSlide: Expression = {
-    slide
-  }
-}
-
-// 
------------------------------------------------------------------------------------------------
-// Session windows
-// 
------------------------------------------------------------------------------------------------
-
-/**
-  * Session window.
-  *
-  * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-  *
-  * For batch tables you can specify grouping on a timestamp or long attribute.
-  *
-  * @param gap the time interval of inactivity before a window is closed.
-  */
-class SessionWithGap(gap: Expression) {
-
-  /**
-    * Session window.
-    *
-    * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-    *
-    * For batch tables you can specify grouping on a timestamp or long 
attribute.
-    *
-    * @param gap the time interval of inactivity before a window is closed.
-    */
-  def this(gap: String) = this(ExpressionParser.parseExpression(gap))
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-    *
-    * For batch tables you can specify grouping on a timestamp or long 
attribute.
-    *
-    * @param timeField time attribute for streaming and batch tables
-    * @return a tumbling window on event-time
-    */
-  def on(timeField: Expression): SessionWithGapOnTime =
-    new SessionWithGapOnTime(timeField, gap)
-
-  /**
-    * Specifies the time attribute on which rows are grouped.
-    *
-    * For streaming tables you can specify grouping by a event-time or 
processing-time attribute.
-    *
-    * For batch tables you can specify grouping on a timestamp or long 
attribute.
-    *
-    * @param timeField time attribute for streaming and batch tables
-    * @return a tumbling window on event-time
-    */
-  def on(timeField: String): SessionWithGapOnTime =
-    on(ExpressionParser.parseExpression(timeField))
-}
-
-/**
-  * Session window on time.
-  */
-class SessionWithGapOnTime(timeField: Expression, gap: Expression) {
-
-  /**
-    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
-    * refer to. `select()` statement can access window properties such as 
window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: Expression): SessionWithGapOnTimeWithAlias = {
-    new SessionWithGapOnTimeWithAlias(alias, timeField, gap)
-  }
-
-  /**
-    * Assigns an alias for this window that the following `groupBy()` and 
`select()` clause can
-    * refer to. `select()` statement can access window properties such as 
window start or end time.
-    *
-    * @param alias alias for this window
-    * @return this window
-    */
-  def as(alias: String): SessionWithGapOnTimeWithAlias = {
-    as(ExpressionParser.parseExpression(alias))
-  }
-}
-
-/**
-  * Session window on time with alias. Fully specifies a window.
-  */
-class SessionWithGapOnTimeWithAlias(
-    alias: Expression,
-    timeField: Expression,
-    gap: Expression)
-  extends GroupWindow(
-    alias,
-    timeField) {
-
-  def getGap: Expression = {
-    gap
-  }
-}
-
-/**
-  * Base class for Tumble Window Helper classes. This class contains help 
methods to create Tumble
-  * Windows.
-  */
-class TumbleBase {
-
-  /**
-    * Creates a tumbling window. Tumbling windows are consecutive, 
non-overlapping
-    * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
-    * elements in 5 minutes intervals.
-    *
-    * @param size the size of the window as time or row-count interval.
-    * @return a partially defined tumbling window
-    */
-  def over(size: String): TumbleWithSize = new TumbleWithSize(size)
-
-  /**
-    * Creates a tumbling window. Tumbling windows are fixed-size, consecutive, 
non-overlapping
-    * windows. For example, a tumbling window of 5 minutes size groups
-    * elements in 5 minutes intervals.
-    *
-    * @param size the size of the window as time or row-count interval.
-    * @return a partially defined tumbling window
-    */
-  def over(size: Expression): TumbleWithSize = new TumbleWithSize(size)
-}
-
-/**
-  * Base class for Slide Window Helper classes. This class contains help 
methods to create Slide
-  * Windows.
-  */
-class SlideBase {
-
-  /**
-    * Creates a sliding window. Sliding windows have a fixed size and slide by
-    * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-    * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-    *
-    * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
-    * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
-    * consecutive window evaluations.
-    *
-    * @param size the size of the window as time or row-count interval
-    * @return a partially specified sliding window
-    */
-  def over(size: String): SlideWithSize = new SlideWithSize(size)
-
-  /**
-    * Creates a sliding window. Sliding windows have a fixed size and slide by
-    * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-    * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-    *
-    * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups
-    * elements of 15 minutes and evaluates every five minutes. Each element is 
contained in three
-    * consecutive
-    *
-    * @param size the size of the window as time or row-count interval
-    * @return a partially specified sliding window
-    */
-  def over(size: Expression): SlideWithSize = new SlideWithSize(size)
-}
-
-/**
-  * Base class for Session Window Helper classes. This class contains help 
methods to create Session
-  * Windows.
-  */
-class SessionBase {
-
-  /**
-    * Creates a session window. The boundary of session windows are defined by
-    * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-    * gap period.
-    *
-    * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
-    *            closing the session window.
-    * @return a partially defined session window
-    */
-  def withGap(gap: String): SessionWithGap = new SessionWithGap(gap)
-
-  /**
-    * Creates a session window. The boundary of session windows are defined by
-    * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-    * gap period.
-    *
-    * @param gap specifies how long (as interval of milliseconds) to wait for 
new data before
-    *            closing the session window.
-    * @return a partially defined session window
-    */
-  def withGap(gap: Expression): SessionWithGap = new SessionWithGap(gap)
-}
-
-/**
-  * Base class for Over Window Helper classes. This class contains help 
methods to create Over
-  * Windows.
-  */
-class OverBase {
-
-  /**
-    * Specifies the time attribute on which rows are ordered.
-    *
-    * For streaming tables, reference a rowtime or proctime time attribute here
-    * to specify the time mode.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param orderBy field reference
-    * @return an over window with defined order
-    */
-  def orderBy(orderBy: String): OverWindowPartitionedOrdered = {
-    new OverWindowPartitionedOrdered(Seq(), 
ExpressionParser.parseExpression(orderBy))
-  }
-
-  /**
-    * Specifies the time attribute on which rows are ordered.
-    *
-    * For streaming tables, reference a rowtime or proctime time attribute here
-    * to specify the time mode.
-    *
-    * For batch tables, refer to a timestamp or long attribute.
-    *
-    * @param orderBy field reference
-    * @return an over window with defined order
-    */
-  def orderBy(orderBy: Expression): OverWindowPartitionedOrdered = {
-    new OverWindowPartitionedOrdered(Seq(), orderBy)
-  }
-
-  /**
-    * Partitions the elements on some partition keys.
-    *
-    * Each partition is individually sorted and aggregate functions are 
applied to each
-    * partition separately.
-    *
-    * @param partitionBy list of field references
-    * @return an over window with defined partitioning
-    */
-  def partitionBy(partitionBy: String): OverWindowPartitioned = {
-    new 
OverWindowPartitioned(ExpressionParser.parseExpressionList(partitionBy))
-  }
-
-  /**
-    * Partitions the elements on some partition keys.
-    *
-    * Each partition is individually sorted and aggregate functions are 
applied to each
-    * partition separately.
-    *
-    * @param partitionBy list of field references
-    * @return an over window with defined partitioning
-    */
-  def partitionBy(partitionBy: Expression*): OverWindowPartitioned = {
-    new OverWindowPartitioned(partitionBy)
-  }
-}
-
-/**
-  * Helper class for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
-  * windows of a specified fixed length. For example, a tumbling window of 5 
minutes size groups
-  * elements in 5 minutes intervals.
-  */
-object Tumble extends TumbleBase
-
-/**
-  * Helper class for creating a sliding window. Sliding windows have a fixed 
size and slide by
-  * a specified slide interval. If the slide interval is smaller than the 
window size, sliding
-  * windows are overlapping. Thus, an element can be assigned to multiple 
windows.
-  *
-  * For example, a sliding window of size 15 minutes with 5 minutes sliding 
interval groups elements
-  * of 15 minutes and evaluates every five minutes. Each element is contained 
in three consecutive
-  * window evaluations.
-  */
-object Slide extends SlideBase
-
-/**
-  * Helper class for creating a session window. The boundary of session 
windows are defined by
-  * intervals of inactivity, i.e., a session window is closes if no event 
appears for a defined
-  * gap period.
-  */
-object Session extends SessionBase
-
-/**
-  * Helper class for creating an over window. Similar to SQL, over window 
aggregates compute an
-  * aggregate for each input row over a range of its neighboring rows.
-  */
-object Over extends OverBase
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
similarity index 96%
rename from 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
rename to 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
index 72fba67..044e0d2 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
@@ -21,8 +21,24 @@ import 
org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.api._
 import org.apache.flink.table.expressions.ApiExpressionUtils._
 
+import _root_.java.util.{List => JList}
 import _root_.scala.language.implicitConversions
 import _root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
+import _root_.scala.collection.JavaConversions._
+
+/**
+  * The implementation of a [[PlannerExpressionParser]] which parsers 
expressions inside a String.
+  */
+class PlannerExpressionParserImpl extends PlannerExpressionParser {
+
+  def parseExpression(exprString: String): Expression = {
+    PlannerExpressionParserImpl.parseExpression(exprString)
+  }
+
+  override def parseExpressionList(expression: String): JList[Expression] = {
+    PlannerExpressionParserImpl.parseExpressionList(expression)
+  }
+}
 
 /**
  * Parser for expressions inside a String. This parses exactly the same 
expressions that
@@ -33,7 +49,10 @@ import 
_root_.scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
  * available in the Scala Expression DSL. This parser must be kept in sync 
with the Scala DSL
  * lazy valined in the above files.
  */
-object ExpressionParser extends JavaTokenParsers with PackratParsers {
+object PlannerExpressionParserImpl extends JavaTokenParsers
+  with PackratParsers
+  with PlannerExpressionParser {
+
   case class Keyword(key: String)
 
   // Convert the keyword into an case insensitive Parser
@@ -113,7 +132,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val TRIM_MODE_TRAILING: Keyword = Keyword("TRAILING")
   lazy val TRIM_MODE_BOTH: Keyword = Keyword("BOTH")
 
-  def functionIdent: ExpressionParser.Parser[String] = super.ident
+  def functionIdent: PlannerExpressionParserImpl.Parser[String] = super.ident
 
   // symbols
 
@@ -625,7 +644,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
 
-  def parseExpressionList(expression: String): List[Expression] = {
+  def parseExpressionList(expression: String): JList[Expression] = {
     parseAll(expressionList, expression) match {
       case Success(lst, _) => lst
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
index 2e05139..b1e5860 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.table.api.stream.table.stringexpr
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{Session, Slide, Tumble}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Session, Slide, Tumble}
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.table.utils.TableTestBase
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index a81b3ed..01a55f1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -43,6 +43,8 @@ import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
+import scala.collection.JavaConverters._
+
 
 /**
   * Tests for access and materialization of time attributes.
@@ -560,11 +562,11 @@ class TimeAttributesITCase extends AbstractTestBase {
     // use aliases, swap all attributes, and skip b2
     val table4 = stream.toTable(
       tEnv,
-      ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as a"): 
_*)
+      ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as 
a").asScala: _*)
     // no aliases, no swapping
     val table5 = stream.toTable(
       tEnv,
-      ExpressionParser.parseExpressionList("a, b.rowtime, c"): _*)
+      ExpressionParser.parseExpressionList("a, b.rowtime, c").asScala: _*)
 
     val t = table.select('b, 'c , 'a)
       .unionAll(table2.select('b, 'c, 'a))

Reply via email to