twalthr closed pull request #7289: [FLINK-11001][table] Fix window rowtime 
attribute can't be renamed bug in Java
URL: https://github.com/apache/flink/pull/7289
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f308aca6824..e44df24ad5e 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1728,7 +1728,7 @@ This is the EBNF grammar for expressions:
 
 expressionList = expression , { "," , expression } ;
 
-expression = timeIndicator | overConstant | alias ;
+expression = overConstant | alias ;
 
 alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , 
fieldReference , { "," , fieldReference } , ")" ) ;
 
@@ -1744,7 +1744,7 @@ unary = [ "!" | "-" | "+" ] , composite ;
 
 composite = over | suffixed | nullLiteral | prefixed | atom ;
 
-suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | 
suffixFunctionCall ;
+suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | 
suffixFunctionCall | timeIndicator ;
 
 prefixed = prefixAs | prefixCast | prefixIf | prefixDistinct | 
prefixFunctionCall ;
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 8c6a1e0a04b..4fa501cde4d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -669,12 +669,15 @@ abstract class StreamTableEnvironment(
       case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
         extractRowtime(idx, name, None)
 
-      case (RowtimeAttribute(Alias(UnresolvedFieldReference(origName), name, 
_)), idx) =>
+      case (Alias(RowtimeAttribute(UnresolvedFieldReference(origName)), name, 
_), idx) =>
         extractRowtime(idx, name, Some(origName))
 
       case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) =>
         extractProctime(idx, name)
 
+      case (Alias(ProctimeAttribute(UnresolvedFieldReference(_)), name, _), 
idx) =>
+        extractProctime(idx, name)
+
       case (UnresolvedFieldReference(name), _) => fieldNames = name :: 
fieldNames
 
       case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames = 
name :: fieldNames
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index e28a471681d..ba789638ca3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -1089,7 +1089,7 @@ abstract class TableEnvironment(val config: TableConfig) {
             } else {
               referenceByName(origName, t).map((_, name))
             }
-          case (_: TimeAttribute, _) =>
+          case (_: TimeAttribute, _) | (Alias(_: TimeAttribute, _, _), _) =>
             None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression 
expected.")
@@ -1101,7 +1101,7 @@ abstract class TableEnvironment(val config: TableConfig) {
             referenceByName(name, p).map((_, name))
           case Alias(UnresolvedFieldReference(origName), name: String, _) =>
             referenceByName(origName, p).map((_, name))
-          case _: TimeAttribute =>
+          case _: TimeAttribute | Alias(_: TimeAttribute, _, _) =>
             None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression 
expected.")
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index 7fd9309b5db..d5d64b48d69 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -355,7 +355,9 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
     // expression with distinct suffix modifier
     suffixDistinct |
     // function call must always be at the end
-    suffixFunctionCall | suffixFunctionCallOneArg
+    suffixFunctionCall | suffixFunctionCallOneArg |
+    // rowtime or proctime
+    timeIndicator
 
   // prefix operators
 
@@ -525,15 +527,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val timeIndicator: PackratParser[Expression] = proctime | rowtime
 
-  lazy val proctime: PackratParser[Expression] =
-    (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ 
PROCTIME ^^ {
-      case f ~ _ ~ _ => ProctimeAttribute(f)
-    }
+  lazy val proctime: PackratParser[Expression] = fieldReference ~ "." ~ 
PROCTIME ^^ {
+    case f ~ _ ~ _ => ProctimeAttribute(f)
+  }
 
-  lazy val rowtime: PackratParser[Expression] =
-    (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ 
ROWTIME ^^ {
-      case f ~ _ ~ _ => RowtimeAttribute(f)
-    }
+  lazy val rowtime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME 
^^ {
+    case f ~ _ ~ _ => RowtimeAttribute(f)
+  }
 
   // alias
 
@@ -547,7 +547,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
       case e ~ _ ~ name => Alias(e, name.name)
   }
 
-  lazy val expression: PackratParser[Expression] = timeIndicator | 
overConstant | alias |
+  lazy val expression: PackratParser[Expression] = overConstant | alias |
     failure("Invalid expression.")
 
   lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 1c097d3b19c..91077269e83 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -531,17 +531,17 @@ class TableEnvironmentTest extends TableTestBase {
 
     // case class
     util.verifySchema(
-      util.addTable[CClassWithTime]('cf1, ('cf2 as 'new).rowtime, 'cf3),
+      util.addTable[CClassWithTime]('cf1, 'cf2.rowtime as 'new, 'cf3),
       Seq("cf1" -> INT, "new" -> ROWTIME, "cf3" -> STRING))
 
     // row
     util.verifySchema(
-      util.addTable('rf1, ('rf2 as 'new).rowtime, 'rf3)(TEST_ROW_WITH_TIME),
+      util.addTable('rf1, 'rf2.rowtime as 'new, 'rf3)(TEST_ROW_WITH_TIME),
       Seq("rf1" -> INT, "new" -> ROWTIME, "rf3" -> STRING))
 
     // tuple
     util.verifySchema(
-      util.addTable[JTuple3[Int, Long, String]]('f0, ('f1 as 'new).rowtime, 
'f2),
+      util.addTable[JTuple3[Int, Long, String]]('f0, 'f1.rowtime as 'new, 'f2),
       Seq("f0" -> INT, "new" -> ROWTIME, "f2" -> STRING))
   }
 }
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
index bfa7bfa805b..e256ee89b75 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
@@ -38,7 +38,7 @@ class StreamTableEnvironmentValidationTest extends 
TableTestBase {
   def testInvalidRowtimeAliasByPosition(): Unit = {
     val util = streamTestUtil()
     // don't allow aliasing by position
-    util.addTable[(Long, Int, String, Int, Long)](('a as 'b).rowtime, 'b, 'c, 
'd, 'e)
+    util.addTable[(Long, Int, String, Int, Long)]('a.rowtime as 'b, 'b, 'c, 
'd, 'e)
   }
 
   @Test(expected = classOf[TableException])
@@ -178,13 +178,13 @@ class StreamTableEnvironmentValidationTest extends 
TableTestBase {
   def testInvalidAliasWithRowtimeAttribute(): Unit = {
     val util = streamTestUtil()
     // aliased field does not exist
-    util.addTable[(Int, Long, String)]('_1, ('newnew as 'new).rowtime, '_3)
+    util.addTable[(Int, Long, String)]('_1, 'newnew.rowtime as 'new, '_3)
   }
 
   @Test(expected = classOf[TableException])
   def testInvalidAliasWithRowtimeAttribute2(): Unit = {
     val util = streamTestUtil()
     // aliased field has wrong type
-    util.addTable[(Int, Long, String)]('_1, ('_3 as 'new).rowtime, '_2)
+    util.addTable[(Int, Long, String)]('_1, '_3.rowtime as 'new, '_2)
   }
 }
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
index ec57436b420..868e0137213 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.table.stringexpr
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.java.{Tumble => JTumble}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, 
WeightedAvgWithMergeAndReset}
 import org.apache.flink.table.utils.TableTestBase
@@ -128,4 +129,50 @@ class AggregateStringExpressionTest extends TableTestBase {
 
     verifyTableEquals(resJava, resScala)
   }
+
+  @Test
+  def testProctimeRename(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 
'p.proctime as 'proctime)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Tumble over 50.milli on 'proctime as 'w1)
+      .groupBy('w1, 'string)
+      .select('w1.proctime as 'proctime, 'string, 'int.count)
+
+    // String / Java API
+    val resJava = t
+      .window(JTumble.over("50.milli").on("proctime").as("w1"))
+      .groupBy("w1, string")
+      .select("w1.proctime as proctime, string, int.count")
+
+    verifyTableEquals(resJava, resScala)
+  }
+
+  @Test
+  def testRowtimeRename(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[TestPojo]('int, 'long.rowtime as 'rowtime, 'string)
+
+    // Expression / Scala API
+    val resScala = t
+      .window(Tumble over 50.milli on 'rowtime as 'w1)
+      .groupBy('w1, 'string)
+      .select('w1.rowtime as 'rowtime, 'string, 'int.count)
+
+    // String / Java API
+    val resJava = t
+      .window(JTumble.over("50.milli").on("rowtime").as("w1"))
+      .groupBy("w1, string")
+      .select("w1.rowtime as rowtime, string, int.count")
+
+    verifyTableEquals(resJava, resScala)
+  }
+}
+
+class TestPojo() {
+  var int: Int = _
+  var long: Long = _
+  var string: String = _
 }
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 1706fc8a112..21680e86aee 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -549,7 +549,7 @@ class TimeAttributesITCase extends AbstractTestBase {
       .fromElements(p1, p2)
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermarkPojo)
     // use aliases, swap all attributes, and skip b2
-    val table = stream.toTable(tEnv, ('b as 'b).rowtime, 'c as 'c, 'a as 'a)
+    val table = stream.toTable(tEnv, 'b.rowtime as 'b, 'c as 'c, 'a as 'a)
     // no aliases, no swapping
     val table2 = stream.toTable(tEnv, 'a, 'b.rowtime, 'c)
     // use proctime, no skipping
@@ -560,7 +560,7 @@ class TimeAttributesITCase extends AbstractTestBase {
     // use aliases, swap all attributes, and skip b2
     val table4 = stream.toTable(
       tEnv,
-      ExpressionParser.parseExpressionList("(b as b).rowtime, c as c, a as 
a"): _*)
+      ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as a"): 
_*)
     // no aliases, no swapping
     val table5 = stream.toTable(
       tEnv,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to