[GitHub] [flink] wuchong commented on a change in pull request #11837: [FLINK-16160][table-planner-blink] Fix proctime()/rowtime() doesn't w…

2020-04-23 Thread GitBox


wuchong commented on a change in pull request #11837:
URL: https://github.com/apache/flink/pull/11837#discussion_r414255501



##
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
##
@@ -130,6 +131,60 @@ class TableSourceTest extends TableTestBase {
 util.verifyPlan(sqlQuery)
   }
 
+
+  @Test
+  def testLegacyRowTimeTableGroupWindow(): Unit = {
+util.tableEnv.connect(new ConnectorDescriptor("TestTableSourceWithTime", 
1, false) {
+  override protected def toConnectorProperties: JMap[String, String] = {
+Collections.emptyMap()
+  }

Review comment:
   Introducing a generic connector descriptor is another topic, and I'm 
concerned `CustomConnectorDescriptor` is not easy-to-use enough. 
   
   Maybe we can introduce a general `TestConnectorDescriptor` in tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11837: [FLINK-16160][table-planner-blink] Fix proctime()/rowtime() doesn't w…

2020-04-23 Thread GitBox


wuchong commented on a change in pull request #11837:
URL: https://github.com/apache/flink/pull/11837#discussion_r414254275



##
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
##
@@ -200,6 +199,65 @@ class TestTableSourceWithTime[T](
   }
 }
 
+class TestTableSourceWithTimeFactory[T] extends StreamTableSourceFactory[T] {
+  override def createStreamTableSource(properties: JMap[String, String]): 
StreamTableSource[T] = {
+val dp = new DescriptorProperties()
+dp.putProperties(properties)
+
+val isBounded = dp.getOptionalBoolean("is-bounded").orElse(false)
+val tableSchema = dp.getTableSchema(Schema.SCHEMA)
+val serializedData = dp.getOptionalString("data").orElse(null)
+val data = if (serializedData != null) {
+  EncodingUtils.decodeStringToObject(serializedData, classOf[List[T]])
+} else {
+  Seq.empty[T]
+}
+val rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(dp)
+val rowtime = if (rowtimeAttributes.isEmpty) {
+  null
+} else {
+  rowtimeAttributes.head.getAttributeName
+}
+val proctimeAttribute = SchemaValidator.deriveProctimeAttribute(dp)
+val proctime = if (proctimeAttribute.isPresent) {
+  proctimeAttribute.get()
+} else {
+  null
+}
+
+val serializedMapKeys = dp.getOptionalString("map-keys").orElse(null)
+val serializedMapVals = dp.getOptionalString("map-vals").orElse(null)
+val mapping = if (serializedMapKeys != null && serializedMapVals != null) {
+  val mapKeys = EncodingUtils.decodeStringToObject(serializedMapKeys, 
classOf[List[String]])
+  val mapVals = EncodingUtils.decodeStringToObject(serializedMapVals, 
classOf[List[String]])
+  if (mapKeys.length != mapVals.length) {
+null
+  } else {
+mapKeys.zip(mapVals).toMap
+  }
+} else {
+  null
+}
+
+val existingTs = dp.getOptionalString("existingTs").orElse(null)
+
+new TestTableSourceWithTime[T](
+  isBounded, tableSchema, null, data, rowtime, proctime, mapping, 
existingTs)

Review comment:
   Then, please add IT cases which uses these properties. Otherwise, it's 
hard to know whether this code is correct. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11837: [FLINK-16160][table-planner-blink] Fix proctime()/rowtime() doesn't w…

2020-04-22 Thread GitBox


wuchong commented on a change in pull request #11837:
URL: https://github.com/apache/flink/pull/11837#discussion_r413470860



##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory 
typeFactory,
}
}
}
+
+   // The following block is a workaround to support tables 
defined by TableEnvironment.connect() and
+   // the actual table sources implement 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+   // It should be removed after we remove 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+   Optional sourceOpt = findAndCreateTableSource(new 
TableConfig().getConfiguration());
+   if 
(tableSchema.getTableColumns().stream().noneMatch(TableColumn::isGenerated)
+   && tableSchema.getWatermarkSpecs().isEmpty()
+   && sourceOpt.isPresent()) {
+   TableSource source = sourceOpt.get();
+   if ((source instanceof DefinedProctimeAttribute
+   && ((DefinedProctimeAttribute) 
source).getProctimeAttribute() != null)
+   ||
+   (source instanceof 
DefinedRowtimeAttributes
+   && 
((DefinedRowtimeAttributes) source).getRowtimeAttributeDescriptors() != null
+   && 
!((DefinedRowtimeAttributes) 
source).getRowtimeAttributeDescriptors().isEmpty())) {

Review comment:
   Add a `hasProctimeAttribute` to `TableSourceValidation` and the 
condition can be simplified into 
   
   ```java
   if (hasRowtimeAttribute(source) && hasProctimeAttribute(source))
   ```

##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory 
typeFactory,
}
}
}
+
+   // The following block is a workaround to support tables 
defined by TableEnvironment.connect() and
+   // the actual table sources implement 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+   // It should be removed after we remove 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+   Optional sourceOpt = findAndCreateTableSource(new 
TableConfig().getConfiguration());

Review comment:
   ```suggestion
Optional> sourceOpt = 
findAndCreateTableSource(new TableConfig().getConfiguration());
   ```
   
   Add `` to TableSource to avoid IDEA warning.

##
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
##
@@ -130,6 +131,60 @@ class TableSourceTest extends TableTestBase {
 util.verifyPlan(sqlQuery)
   }
 
+
+  @Test
+  def testLegacyRowTimeTableGroupWindow(): Unit = {
+util.tableEnv.connect(new ConnectorDescriptor("TestTableSourceWithTime", 
1, false) {
+  override protected def toConnectorProperties: JMap[String, String] = {
+Collections.emptyMap()
+  }

Review comment:
   Can we have a dedicated descriptor for `TestTableSourceWithTime`? This 
code looks confusing. 

##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory 
typeFactory,
}
}
}
+
+   // The following block is a workaround to support tables 
defined by TableEnvironment.connect() and
+   // the actual table sources implement 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+   // It should be removed after we remove 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+   Optional sourceOpt = findAndCreateTableSource(new 
TableConfig().getConfiguration());
+   if 
(tableSchema.getTableColumns().stream().noneMatch(TableColumn::isGenerated)
+   && tableSchema.getWatermarkSpecs().isEmpty()

Review comment:
   Add `isStreamingMode` into this condition, and 
`findAndCreateTableSource`  when the condition is satisfied. 

##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
##
@@ -153,6 +166,27 @@ private static RelDataType getRowType(RelDataTypeFactory 
typeFactory,
}
}
}
+
+   // The following block is a wor