twalthr closed pull request #7289: [FLINK-11001][table] Fix window rowtime attribute can't be renamed bug in Java URL: https://github.com/apache/flink/pull/7289
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index f308aca6824..e44df24ad5e 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -1728,7 +1728,7 @@ This is the EBNF grammar for expressions: expressionList = expression , { "," , expression } ; -expression = timeIndicator | overConstant | alias ; +expression = overConstant | alias ; alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ; @@ -1744,7 +1744,7 @@ unary = [ "!" | "-" | "+" ] , composite ; composite = over | suffixed | nullLiteral | prefixed | atom ; -suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall ; +suffixed = interval | suffixAs | suffixCast | suffixIf | suffixDistinct | suffixFunctionCall | timeIndicator ; prefixed = prefixAs | prefixCast | prefixIf | prefixDistinct | prefixFunctionCall ; diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 8c6a1e0a04b..4fa501cde4d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -669,12 +669,15 @@ abstract class StreamTableEnvironment( case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) => extractRowtime(idx, name, None) - case (RowtimeAttribute(Alias(UnresolvedFieldReference(origName), name, _)), idx) => + case (Alias(RowtimeAttribute(UnresolvedFieldReference(origName)), name, _), idx) => extractRowtime(idx, name, Some(origName)) case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) => extractProctime(idx, name) + case (Alias(ProctimeAttribute(UnresolvedFieldReference(_)), name, _), idx) => + extractProctime(idx, name) + case (UnresolvedFieldReference(name), _) => fieldNames = name :: fieldNames case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames = name :: fieldNames diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index e28a471681d..ba789638ca3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -1089,7 +1089,7 @@ abstract class TableEnvironment(val config: TableConfig) { } else { referenceByName(origName, t).map((_, name)) } - case (_: TimeAttribute, _) => + case (_: TimeAttribute, _) | (Alias(_: TimeAttribute, _, _), _) => None case _ => throw new TableException( "Field reference expression or alias on field expression expected.") @@ -1101,7 +1101,7 @@ abstract class TableEnvironment(val config: TableConfig) { referenceByName(name, p).map((_, name)) case Alias(UnresolvedFieldReference(origName), name: String, _) => referenceByName(origName, p).map((_, name)) - case _: TimeAttribute => + case _: TimeAttribute | Alias(_: TimeAttribute, _, _) => None case _ => throw new TableException( "Field reference expression or alias on field expression expected.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala index 7fd9309b5db..d5d64b48d69 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala @@ -355,7 +355,9 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { // expression with distinct suffix modifier suffixDistinct | // function call must always be at the end - suffixFunctionCall | suffixFunctionCallOneArg + suffixFunctionCall | suffixFunctionCallOneArg | + // rowtime or proctime + timeIndicator // prefix operators @@ -525,15 +527,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val timeIndicator: PackratParser[Expression] = proctime | rowtime - lazy val proctime: PackratParser[Expression] = - (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ PROCTIME ^^ { - case f ~ _ ~ _ => ProctimeAttribute(f) - } + lazy val proctime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ { + case f ~ _ ~ _ => ProctimeAttribute(f) + } - lazy val rowtime: PackratParser[Expression] = - (aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ ROWTIME ^^ { - case f ~ _ ~ _ => RowtimeAttribute(f) - } + lazy val rowtime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ { + case f ~ _ ~ _ => RowtimeAttribute(f) + } // alias @@ -547,7 +547,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case e ~ _ ~ name => Alias(e, name.name) } - lazy val expression: PackratParser[Expression] = timeIndicator | overConstant | alias | + lazy val expression: PackratParser[Expression] = overConstant | alias | failure("Invalid expression.") lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",") diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 1c097d3b19c..91077269e83 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -531,17 +531,17 @@ class TableEnvironmentTest extends TableTestBase { // case class util.verifySchema( - util.addTable[CClassWithTime]('cf1, ('cf2 as 'new).rowtime, 'cf3), + util.addTable[CClassWithTime]('cf1, 'cf2.rowtime as 'new, 'cf3), Seq("cf1" -> INT, "new" -> ROWTIME, "cf3" -> STRING)) // row util.verifySchema( - util.addTable('rf1, ('rf2 as 'new).rowtime, 'rf3)(TEST_ROW_WITH_TIME), + util.addTable('rf1, 'rf2.rowtime as 'new, 'rf3)(TEST_ROW_WITH_TIME), Seq("rf1" -> INT, "new" -> ROWTIME, "rf3" -> STRING)) // tuple util.verifySchema( - util.addTable[JTuple3[Int, Long, String]]('f0, ('f1 as 'new).rowtime, 'f2), + util.addTable[JTuple3[Int, Long, String]]('f0, 'f1.rowtime as 'new, 'f2), Seq("f0" -> INT, "new" -> ROWTIME, "f2" -> STRING)) } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala index bfa7bfa805b..e256ee89b75 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala @@ -38,7 +38,7 @@ class StreamTableEnvironmentValidationTest extends TableTestBase { def testInvalidRowtimeAliasByPosition(): Unit = { val util = streamTestUtil() // don't allow aliasing by position - util.addTable[(Long, Int, String, Int, Long)](('a as 'b).rowtime, 'b, 'c, 'd, 'e) + util.addTable[(Long, Int, String, Int, Long)]('a.rowtime as 'b, 'b, 'c, 'd, 'e) } @Test(expected = classOf[TableException]) @@ -178,13 +178,13 @@ class StreamTableEnvironmentValidationTest extends TableTestBase { def testInvalidAliasWithRowtimeAttribute(): Unit = { val util = streamTestUtil() // aliased field does not exist - util.addTable[(Int, Long, String)]('_1, ('newnew as 'new).rowtime, '_3) + util.addTable[(Int, Long, String)]('_1, 'newnew.rowtime as 'new, '_3) } @Test(expected = classOf[TableException]) def testInvalidAliasWithRowtimeAttribute2(): Unit = { val util = streamTestUtil() // aliased field has wrong type - util.addTable[(Int, Long, String)]('_1, ('_3 as 'new).rowtime, '_2) + util.addTable[(Int, Long, String)]('_1, '_3.rowtime as 'new, '_2) } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala index ec57436b420..868e0137213 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/AggregateStringExpressionTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.table.stringexpr import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.java.{Tumble => JTumble} import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMergeAndReset} import org.apache.flink.table.utils.TableTestBase @@ -128,4 +129,50 @@ class AggregateStringExpressionTest extends TableTestBase { verifyTableEquals(resJava, resScala) } + + @Test + def testProctimeRename(): Unit = { + val util = streamTestUtil() + val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'p.proctime as 'proctime) + + // Expression / Scala API + val resScala = t + .window(Tumble over 50.milli on 'proctime as 'w1) + .groupBy('w1, 'string) + .select('w1.proctime as 'proctime, 'string, 'int.count) + + // String / Java API + val resJava = t + .window(JTumble.over("50.milli").on("proctime").as("w1")) + .groupBy("w1, string") + .select("w1.proctime as proctime, string, int.count") + + verifyTableEquals(resJava, resScala) + } + + @Test + def testRowtimeRename(): Unit = { + val util = streamTestUtil() + val t = util.addTable[TestPojo]('int, 'long.rowtime as 'rowtime, 'string) + + // Expression / Scala API + val resScala = t + .window(Tumble over 50.milli on 'rowtime as 'w1) + .groupBy('w1, 'string) + .select('w1.rowtime as 'rowtime, 'string, 'int.count) + + // String / Java API + val resJava = t + .window(JTumble.over("50.milli").on("rowtime").as("w1")) + .groupBy("w1, string") + .select("w1.rowtime as rowtime, string, int.count") + + verifyTableEquals(resJava, resScala) + } +} + +class TestPojo() { + var int: Int = _ + var long: Long = _ + var string: String = _ } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index 1706fc8a112..21680e86aee 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -549,7 +549,7 @@ class TimeAttributesITCase extends AbstractTestBase { .fromElements(p1, p2) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermarkPojo) // use aliases, swap all attributes, and skip b2 - val table = stream.toTable(tEnv, ('b as 'b).rowtime, 'c as 'c, 'a as 'a) + val table = stream.toTable(tEnv, 'b.rowtime as 'b, 'c as 'c, 'a as 'a) // no aliases, no swapping val table2 = stream.toTable(tEnv, 'a, 'b.rowtime, 'c) // use proctime, no skipping @@ -560,7 +560,7 @@ class TimeAttributesITCase extends AbstractTestBase { // use aliases, swap all attributes, and skip b2 val table4 = stream.toTable( tEnv, - ExpressionParser.parseExpressionList("(b as b).rowtime, c as c, a as a"): _*) + ExpressionParser.parseExpressionList("b.rowtime as b, c as c, a as a"): _*) // no aliases, no swapping val table5 = stream.toTable( tEnv, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services