wuchong commented on a change in pull request #11837:
URL: https://github.com/apache/flink/pull/11837#discussion_r413470860
##
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory
typeFactory,
}
}
}
+
+ // The following block is a workaround to support tables
defined by TableEnvironment.connect() and
+ // the actual table sources implement
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+ // It should be removed after we remove
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+ Optional sourceOpt = findAndCreateTableSource(new
TableConfig().getConfiguration());
+ if
(tableSchema.getTableColumns().stream().noneMatch(TableColumn::isGenerated)
+ && tableSchema.getWatermarkSpecs().isEmpty()
+ && sourceOpt.isPresent()) {
+ TableSource source = sourceOpt.get();
+ if ((source instanceof DefinedProctimeAttribute
+ && ((DefinedProctimeAttribute)
source).getProctimeAttribute() != null)
+ ||
+ (source instanceof
DefinedRowtimeAttributes
+ &&
((DefinedRowtimeAttributes) source).getRowtimeAttributeDescriptors() != null
+ &&
!((DefinedRowtimeAttributes)
source).getRowtimeAttributeDescriptors().isEmpty())) {
Review comment:
Add a `hasProctimeAttribute` to `TableSourceValidation` and the
condition can be simplified into
```java
if (hasRowtimeAttribute(source) && hasProctimeAttribute(source))
```
##
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory
typeFactory,
}
}
}
+
+ // The following block is a workaround to support tables
defined by TableEnvironment.connect() and
+ // the actual table sources implement
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+ // It should be removed after we remove
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+ Optional sourceOpt = findAndCreateTableSource(new
TableConfig().getConfiguration());
Review comment:
```suggestion
Optional> sourceOpt =
findAndCreateTableSource(new TableConfig().getConfiguration());
```
Add `` to TableSource to avoid IDEA warning.
##
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
##
@@ -130,6 +131,60 @@ class TableSourceTest extends TableTestBase {
util.verifyPlan(sqlQuery)
}
+
+ @Test
+ def testLegacyRowTimeTableGroupWindow(): Unit = {
+util.tableEnv.connect(new ConnectorDescriptor("TestTableSourceWithTime",
1, false) {
+ override protected def toConnectorProperties: JMap[String, String] = {
+Collections.emptyMap()
+ }
Review comment:
Can we have a dedicated descriptor for `TestTableSourceWithTime`? This
code looks confusing.
##
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory
typeFactory,
}
}
}
+
+ // The following block is a workaround to support tables
defined by TableEnvironment.connect() and
+ // the actual table sources implement
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+ // It should be removed after we remove
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+ Optional sourceOpt = findAndCreateTableSource(new
TableConfig().getConfiguration());
+ if
(tableSchema.getTableColumns().stream().noneMatch(TableColumn::isGenerated)
+ && tableSchema.getWatermarkSpecs().isEmpty()
Review comment:
Add `isStreamingMode` into this condition, and
`findAndCreateTableSource` when the condition is satisfied.
##
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory
typeFactory,
}
}
}
+
+ // The following block is a wor