Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140782081 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala --- @@ -121,19 +161,81 @@ class TableSourceTest extends TableTestBase { ) util.verifyTable(t, expected) } + + @Test + def testProjectableProcTimeTableSource(): Unit = { + // ensures that projection is not pushed into table source with proctime indicators + val util = streamTestUtil() + + val projectableTableSource = new TestProctimeSource("pTime") with ProjectableTableSource[Row] { + override def projectFields(fields: Array[Int]): TableSource[Row] = { + // ensure this method is not called! + Assert.fail() + null.asInstanceOf[TableSource[Row]] + } + } + util.tableEnv.registerTableSource("PTimeTable", projectableTableSource) + + val t = util.tableEnv.scan("PTimeTable") + .select('name, 'val) + .where('val > 10) + + val expected = + unaryNode( + "DataStreamCalc", + "StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, name, pTime])", + term("select", "name", "val"), + term("where", ">(val, 10)") + ) + util.verifyTable(t, expected) + } + + @Test + def testProjectableRowTimeTableSource(): Unit = { --- End diff -- Does filter push down work? With rowtime and proctime?
---