asfgit closed pull request #6949: [FLINK-10676][table] Add 'as' method for 
OverWindowWithOrderBy
URL: https://github.com/apache/flink/pull/6949
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 76bd5b28ef2..ed3a97d8def 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1571,13 +1571,15 @@ The `OverWindow` defines a range of rows over which 
aggregates are computed. `Ov
     </tr>
     <tr>
       <td><code>preceding</code></td>
-      <td>Required</td>
+      <td>Optional</td>
       <td>
         <p>Defines the interval of rows that are included in the window and 
precede the current row. The interval can either be specified as time or 
row-count interval.</p>
 
         <p><a href="tableApi.html#bounded-over-windows">Bounded over 
windows</a> are specified with the size of the interval, e.g., 
<code>10.minutes</code> for a time interval or <code>10.rows</code> for a 
row-count interval.</p>
 
         <p><a href="tableApi.html#unbounded-over-windows">Unbounded over 
windows</a> are specified using a constant, i.e., <code>UNBOUNDED_RANGE</code> 
for a time interval or <code>UNBOUNDED_ROW</code> for a row-count interval. 
Unbounded over windows start with the first row of a partition.</p>
+        
+        <p>If the <code>preceding</code> clause is omitted, 
<code>UNBOUNDED_RANGE</code> and <code>CURRENT_RANGE</code> are used as the 
default <code>preceding</code> and <code>following</code> for the window.</p>
       </td>
     </tr>
     <tr>
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
index f326f6f6a7c..121aab8f776 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.api.java
 
-import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, 
SlideWithSize, SessionWithGap}
+import org.apache.flink.table.api.scala.{CURRENT_RANGE, UNBOUNDED_RANGE}
+import org.apache.flink.table.api.{OverWindow, TumbleWithSize, 
OverWindowWithPreceding, SlideWithSize, SessionWithGap}
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
 
 /**
@@ -144,4 +145,21 @@ class OverWindowWithOrderBy(
     new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)
   }
 
+  /**
+    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    *
+    * @param alias alias for this over window
+    * @return over window
+    */
+  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    *
+    * @param alias alias for this over window
+    * @return over window
+    */
+  def as(alias: Expression): OverWindow = {
+    OverWindow(alias, partitionByExpr, orderByExpr, UNBOUNDED_RANGE, 
CURRENT_RANGE)
+  }
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
index 91bf1a6c739..2f88248a7f1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.api.scala
 
-import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, 
SlideWithSize, SessionWithGap}
-import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.api.{OverWindow, TumbleWithSize, 
OverWindowWithPreceding, SlideWithSize, SessionWithGap}
+import org.apache.flink.table.expressions.{Expression, ExpressionParser}
 
 /**
   * Helper object for creating a tumbling window. Tumbling windows are 
consecutive, non-overlapping
@@ -127,7 +127,6 @@ case class PartitionedOver(partitionBy: Array[Expression]) {
 
 case class OverWindowWithOrderBy(partitionBy: Seq[Expression], orderBy: 
Expression) {
 
-
   /**
     * Set the preceding offset (based on time or row-count intervals) for over 
window.
     *
@@ -138,4 +137,21 @@ case class OverWindowWithOrderBy(partitionBy: 
Seq[Expression], orderBy: Expressi
     new OverWindowWithPreceding(partitionBy, orderBy, preceding)
   }
 
+  /**
+    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    *
+    * @param alias alias for this over window
+    * @return over window
+    */
+  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
+
+  /**
+    * Assigns an alias for this window that the following `select()` clause 
can refer to.
+    *
+    * @param alias alias for this over window
+    * @return over window
+    */
+  def as(alias: Expression): OverWindow = {
+    OverWindow(alias, partitionBy, orderBy, UNBOUNDED_RANGE, CURRENT_RANGE)
+  }
 }
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
index 1be5810c36c..eca53c055f7 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
@@ -287,7 +287,15 @@ class OverWindowTest extends TableTestBase {
       "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED 
preceding) as cnt2 " +
       "FROM MyTable " +
       "WINDOW w AS (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED 
preceding)"
+
+    val sql3 = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY proctime) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY proctime) as cnt2 " +
+      "from MyTable"
+
     streamUtil.verifySqlPlansIdentical(sql, sql2)
+    streamUtil.verifySqlPlansIdentical(sql, sql3)
 
     val expected =
       unaryNode(
@@ -523,6 +531,13 @@ class OverWindowTest extends TableTestBase {
       "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) 
as cnt2 " +
       "from MyTable"
 
+    val sql1 = "SELECT " +
+      "c, " +
+      "count(a) OVER (PARTITION BY c ORDER BY rowtime) as cnt1, " +
+      "sum(a) OVER (PARTITION BY c ORDER BY rowtime) as cnt2 " +
+      "from MyTable"
+    streamUtil.verifySqlPlansIdentical(sql, sql1)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
index 55e3ecbac0d..eeb7d5f3d68 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/OverWindowTest.scala
@@ -194,10 +194,15 @@ class OverWindowTest extends TableTestBase {
     val weightedAvg = new WeightedAvgWithRetract
 
     val result = table
-      .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_RANGE 
following
-         CURRENT_RANGE as 'w)
+      .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_RANGE 
as 'w)
+      .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w)
+
+    val result2 = table
+      .window(Over partitionBy 'c orderBy 'proctime as 'w)
       .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w)
 
+    streamUtil.verify2Tables(result, result2)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
@@ -459,6 +464,12 @@ class OverWindowTest extends TableTestBase {
          CURRENT_RANGE as 'w)
       .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)
 
+    val result2 = table
+      .window(Over partitionBy 'c orderBy 'rowtime as 'w)
+      .select('a, 'c, 'a.count over 'w, weightedAvg('c, 'a) over 'w as 'wAvg)
+
+    streamUtil.verify2Tables(result, result2)
+
     val expected =
       unaryNode(
         "DataStreamCalc",
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala
index 3e757dabbba..99114b88318 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/OverWindowStringExpressionTest.scala
@@ -120,7 +120,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
   }
 
   @Test
-  def testUnboundedOverRange(): Unit = {
+  def testRowTimeUnboundedOverRange(): Unit = {
     val util = streamTestUtil()
     val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'rowtime.rowtime)
 
@@ -134,8 +134,37 @@ class OverWindowStringExpressionTest extends TableTestBase 
{
       .window(
         
JOver.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w"))
       .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
+    val resJava2 = t
+      .window(
+        JOver.orderBy("rowtime").as("w"))
+      .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
+
+    verifyTableEquals(resScala, resJava)
+    verifyTableEquals(resScala, resJava2)
+  }
+
+  @Test
+  def testProcTimeUnboundedOverRange(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'proctime.proctime)
+
+    val weightAvgFun = new WeightedAvg
+    util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+    val resScala = t
+      .window(SOver orderBy 'proctime preceding UNBOUNDED_RANGE following 
CURRENT_RANGE as 'w)
+      .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
+    val resJava = t
+      .window(
+        
JOver.orderBy("proctime").preceding("unbounded_range").following("current_range").as("w"))
+      .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
+    val resJava2 = t
+      .window(
+        JOver.orderBy("proctime").as("w"))
+      .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
 
     verifyTableEquals(resScala, resJava)
+    verifyTableEquals(resScala, resJava2)
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to