This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 247ab12  [FLINK-16160][table] Fix proctime()/rowtime() doesn't work 
for TableEnvironment.connect().createTemporaryTable()
247ab12 is described below

commit 247ab12599e101af4404d87f7f5abb59262483cf
Author: Zhenghua Gao <[email protected]>
AuthorDate: Tue May 19 14:16:57 2020 +0800

    [FLINK-16160][table] Fix proctime()/rowtime() doesn't work for 
TableEnvironment.connect().createTemporaryTable()
    
    This closes #12218
---
 .../flink/table/catalog/ConnectorCatalogTable.java |  2 +-
 .../table/descriptors/ConnectTableDescriptor.java  |  7 ++-
 .../flink/table/sources/TableSourceValidation.java |  9 +++
 .../table/planner/catalog/CatalogSchemaTable.java  | 50 ++++++++++++++++-
 .../planner/plan/schema/CatalogSourceTable.scala   | 42 +++++++++-----
 .../org.apache.flink.table.factories.TableFactory  |  1 +
 .../planner/plan/stream/sql/TableSourceTest.xml    | 58 ++++++++++++++++++++
 .../planner/plan/stream/sql/TableSourceTest.scala  | 61 ++++++++++++++++++++-
 .../table/planner/utils/testTableSources.scala     | 64 +++++++++++++++++++++-
 9 files changed, 275 insertions(+), 19 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index 2113a66..f6100a3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -117,7 +117,7 @@ public class ConnectorCatalogTable<T1, T2> extends 
AbstractCatalogTable {
                return Optional.empty();
        }
 
-       private static <T1> TableSchema calculateSourceSchema(TableSource<T1> 
source, boolean isBatch) {
+       public static <T1> TableSchema calculateSourceSchema(TableSource<T1> 
source, boolean isBatch) {
                TableSchema tableSchema = source.getTableSchema();
                if (isBatch) {
                        return tableSchema;
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
index 4ec3f96..01431f3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
@@ -140,7 +140,12 @@ public abstract class ConnectTableDescriptor
                TableSchema tableSchema = getTableSchema(schemaProperties);
 
                Map<String, String> properties = new HashMap<>(toProperties());
-               schemaProperties.keySet().forEach(properties::remove);
+
+               // DescriptorProperties#putTableSchema and getTableSchema are 
not symmetrical
+               // We should retain time attribute properties when remove table 
schema properties
+               DescriptorProperties descriptorProperties = new 
DescriptorProperties();
+               descriptorProperties.putTableSchema(Schema.SCHEMA, tableSchema);
+               
descriptorProperties.asMap().keySet().forEach(properties::remove);
 
                CatalogTableImpl catalogTable = new CatalogTableImpl(
                        tableSchema,
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
index 6bc84d2..a471cf7 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
@@ -75,6 +75,15 @@ public class TableSourceValidation {
                return !getRowtimeAttributes(tableSource).isEmpty();
        }
 
+       /**
+        * Checks if the given {@link TableSource} defines a proctime attribute.
+        * @param tableSource The table source to check.
+        * @return true if the given table source defines proctime attribute.
+        */
+       public static boolean hasProctimeAttribute(TableSource<?> tableSource) {
+               return getProctimeAttribute(tableSource).isPresent();
+       }
+
        private static void 
validateSingleRowtimeAttribute(List<RowtimeAttributeDescriptor> 
rowtimeAttributes) {
                if (rowtimeAttributes.size() > 1) {
                        throw new ValidationException("Currently, only a single 
rowtime attribute is supported. " +
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
index 942a456..238e84b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
@@ -19,14 +19,22 @@
 package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 import org.apache.flink.table.planner.sources.TableSourceUtil;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceValidation;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.TimestampKind;
@@ -124,7 +132,7 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
                return statistic;
        }
 
-       private static RelDataType getRowType(RelDataTypeFactory typeFactory,
+       private RelDataType getRowType(RelDataTypeFactory typeFactory,
                        CatalogBaseTable catalogBaseTable,
                        boolean isStreamingMode) {
                final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) 
typeFactory;
@@ -153,6 +161,24 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
                                }
                        }
                }
+
+               // The following block is a workaround to support tables 
defined by TableEnvironment.connect() and
+               // the actual table sources implement 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+               // It should be removed after we remove 
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+               Optional<TableSource<?>> sourceOpt = findAndCreateTableSource();
+               if (isStreamingMode
+                               && 
tableSchema.getTableColumns().stream().noneMatch(TableColumn::isGenerated)
+                               && tableSchema.getWatermarkSpecs().isEmpty()
+                               && sourceOpt.isPresent()) {
+                       TableSource<?> source = sourceOpt.get();
+                       if (TableSourceValidation.hasProctimeAttribute(source)
+                                       || 
TableSourceValidation.hasRowtimeAttribute(source)) {
+                               // If the table is defined by 
TableEnvironment.connect(), and use the legacy proctime and rowtime
+                               // descriptors, the TableSchema should fallback 
to ConnectorCatalogTable#calculateSourceSchema
+                               tableSchema = 
ConnectorCatalogTable.calculateSourceSchema(source, false);
+                       }
+               }
+
                return TableSourceUtil.getSourceRowType(flinkTypeFactory,
                        tableSchema,
                        scala.Option.empty(),
@@ -168,4 +194,26 @@ public class CatalogSchemaTable extends AbstractTable 
implements TemporalTable {
        public String getSysEndFieldName() {
                return "sys_end";
        }
+
+       private Optional<TableSource<?>> findAndCreateTableSource() {
+               Optional<TableSource<?>> tableSource = Optional.empty();
+               try {
+                       if (catalogBaseTable instanceof CatalogTableImpl) {
+                               TableSource<?> source = 
TableFactoryUtil.findAndCreateTableSource((CatalogTable) catalogBaseTable);
+                               if (source instanceof StreamTableSource) {
+                                       if (!isStreamingMode && 
!((StreamTableSource) source).isBounded()) {
+                                               throw new 
ValidationException("Cannot query on an unbounded source in batch mode, but " +
+                                                               
tableIdentifier.asSummaryString() + " is unbounded.");
+                                       }
+                                       tableSource = Optional.of(source);
+                               } else {
+                                       throw new ValidationException("Catalog 
tables only support " +
+                                                       "StreamTableSource and 
InputFormatTableSource.");
+                               }
+                       }
+               } catch (Exception e) {
+                       tableSource = Optional.empty();
+               }
+               return tableSource;
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 05b2d81..ce23e8a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -72,7 +72,7 @@ class CatalogSourceTable[T](
     val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
 
     // erase time indicator types in the rowType
-    val erasedRowType = eraseTimeIndicator(rowType, typeFactory)
+    val erasedRowType = eraseTimeIndicator(rowType, typeFactory, tableSource)
 
     val tableSourceTable = new TableSourceTable[T](
       relOptSchema,
@@ -192,20 +192,34 @@ class CatalogSourceTable[T](
    */
   private def eraseTimeIndicator(
       relDataType: RelDataType,
-      factory: FlinkTypeFactory): RelDataType = {
-    val logicalRowType = FlinkTypeFactory.toLogicalRowType(relDataType)
-    val fieldNames = logicalRowType.getFieldNames
-    val fieldTypes = logicalRowType.getFields.map { f =>
-      if (FlinkTypeFactory.isTimeIndicatorType(f.getType)) {
-        val timeIndicatorType = f.getType.asInstanceOf[TimestampType]
-        new TimestampType(
-          timeIndicatorType.isNullable,
-          TimestampKind.REGULAR,
-          timeIndicatorType.getPrecision)
-      } else {
-        f.getType
+      factory: FlinkTypeFactory,
+      tableSource: TableSource[_]): RelDataType = {
+
+    val hasLegacyTimeAttributes =
+      TableSourceValidation.hasRowtimeAttribute(tableSource) ||
+        TableSourceValidation.hasProctimeAttribute(tableSource)
+
+    // If the table source is defined by TableEnvironment.connect() and the 
time attributes are
+    // defined by legacy proctime and rowtime descriptors, we should not erase 
time indicator types
+    if (columnExprs.isEmpty
+      && catalogTable.getSchema.getWatermarkSpecs.isEmpty
+      && hasLegacyTimeAttributes) {
+      relDataType
+    } else {
+      val logicalRowType = FlinkTypeFactory.toLogicalRowType(relDataType)
+      val fieldNames = logicalRowType.getFieldNames
+      val fieldTypes = logicalRowType.getFields.map { f =>
+        if (FlinkTypeFactory.isTimeIndicatorType(f.getType)) {
+          val timeIndicatorType = f.getType.asInstanceOf[TimestampType]
+          new TimestampType(
+            timeIndicatorType.isNullable,
+            TimestampKind.REGULAR,
+            timeIndicatorType.getPrecision)
+        } else {
+          f.getType
+        }
       }
+      factory.buildRelNodeRowType(fieldNames, fieldTypes)
     }
-    factory.buildRelNodeRowType(fieldNames, fieldTypes)
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 9b16bc6..5e7b23f 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -18,3 +18,4 @@ 
org.apache.flink.table.planner.runtime.batch.sql.TestPartitionableSinkFactory
 org.apache.flink.table.planner.utils.TestPartitionableSourceFactory
 org.apache.flink.table.planner.utils.TestFilterableTableSourceFactory
 org.apache.flink.table.planner.utils.TestProjectableTableSourceFactory
+org.apache.flink.table.planner.utils.TestTableSourceWithTimeFactory
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index 837fa08..b0a7f33 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -132,6 +132,64 @@ Calc(select=[name, w$end AS EXPR$1, EXPR$2])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testLegacyRowTimeTableGroupWindow">
+    <Resource name="sql">
+      <![CDATA[
+SELECT name,
+    TUMBLE_END(rowtime, INTERVAL '10' MINUTE),
+    AVG(val)
+FROM rowTimeT WHERE val > 100
+    GROUP BY name, TUMBLE(rowtime, INTERVAL '10' MINUTE)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)], EXPR$2=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$2=[AVG($2)])
+   +- LogicalProject(name=[$2], $f1=[TUMBLE($3, 600000:INTERVAL MINUTE)], 
val=[$1])
+      +- LogicalFilter(condition=[>($1, 100)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
rowTimeT]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[name, w$end AS EXPR$1, EXPR$2])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w$, 
rowtime, 600000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[name, AVG(val) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+   +- Exchange(distribution=[hash[name]])
+      +- Calc(select=[name, rowtime, val], where=[>(val, 100)])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
rowTimeT]], fields=[id, val, name, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLegacyProcTimeTableGroupWindow">
+    <Resource name="sql">
+      <![CDATA[
+SELECT name,
+    TUMBLE_END(proctime, INTERVAL '10' MINUTE),
+    AVG(val)
+FROM procTimeT WHERE val > 100
+    GROUP BY name, TUMBLE(proctime, INTERVAL '10' MINUTE)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)], EXPR$2=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$2=[AVG($2)])
+   +- LogicalProject(name=[$2], $f1=[TUMBLE($3, 600000:INTERVAL MINUTE)], 
val=[$1])
+      +- LogicalFilter(condition=[>($1, 100)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
procTimeT]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[name, w$end AS EXPR$1, EXPR$2])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w$, 
proctime, 600000)], properties=[w$start, w$end, w$proctime], select=[name, 
AVG(val) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS 
w$proctime])
+   +- Exchange(distribution=[hash[name]])
+      +- Calc(select=[name, proctime, val], where=[>(val, 100)])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
procTimeT]], fields=[id, val, name, proctime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testFilterFullyPushDown">
     <Resource name="sql">
       <![CDATA[SELECT * FROM FilterableTable WHERE amount > 2 AND amount < 
10]]>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
index 8a108c9..b3b5efd 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
@@ -21,14 +21,16 @@ package org.apache.flink.table.planner.plan.stream.sql
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.{DataTypes, TableSchema, Types, 
ValidationException}
+import org.apache.flink.table.descriptors.{ConnectorDescriptor, Rowtime, 
Schema}
 import org.apache.flink.table.planner.expressions.utils.Func1
 import org.apache.flink.table.planner.utils.{DateTimeTestUtil, TableTestBase, 
TestFilterableTableSource, TestNestedProjectableTableSource, 
TestPartitionableSourceFactory, TestProjectableTableSource, TestTableSource, 
TestTableSourceWithTime}
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
-
 import org.junit.{Before, Test}
 
+import _root_.java.util.{Collections, Map => JMap}
+
 class TableSourceTest extends TableTestBase {
 
   private val util = streamTestUtil()
@@ -131,6 +133,63 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
+  def testLegacyRowTimeTableGroupWindow(): Unit = {
+    util.tableEnv.connect(
+      new ConnectorDescriptor("TestTableSourceWithTime", 1, false) {
+        override protected def toConnectorProperties: JMap[String, String] = {
+          Collections.emptyMap()
+        }
+      }
+    ).withSchema(
+      new Schema()
+        .field("id", DataTypes.INT())
+        .field("val", DataTypes.BIGINT())
+        .field("name", DataTypes.STRING())
+        .field("rowtime", DataTypes.TIMESTAMP(3))
+        .rowtime(new 
Rowtime().timestampsFromField("rowtime").watermarksPeriodicBounded(1000))
+    ).createTemporaryTable("rowTimeT")
+
+    val sql =
+      """
+        |SELECT name,
+        |    TUMBLE_END(rowtime, INTERVAL '10' MINUTE),
+        |    AVG(val)
+        |FROM rowTimeT WHERE val > 100
+        |    GROUP BY name, TUMBLE(rowtime, INTERVAL '10' MINUTE)
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
+  def testLegacyProcTimeTableGroupWindow(): Unit = {
+    util.tableEnv.connect(
+      new ConnectorDescriptor("TestTableSourceWithTime", 1, false) {
+        override protected def toConnectorProperties: JMap[String, String] = {
+          Collections.emptyMap()
+        }
+      }
+    ).withSchema(
+      new Schema()
+        .field("id", DataTypes.INT())
+        .field("val", DataTypes.BIGINT())
+        .field("name", DataTypes.STRING())
+        .field("proctime", DataTypes.TIMESTAMP(3)).proctime()
+    ).createTemporaryTable("procTimeT")
+
+    val sql =
+      """
+        |SELECT name,
+        |    TUMBLE_END(proctime, INTERVAL '10' MINUTE),
+        |    AVG(val)
+        |FROM procTimeT WHERE val > 100
+        |    GROUP BY name, TUMBLE(proctime, INTERVAL '10' MINUTE)
+      """.stripMargin
+
+    util.verifyPlan(sql)
+  }
+
+  @Test
   def testProcTimeTableSourceSimple(): Unit = {
     val tableSchema = new TableSchema(
       Array("id", "pTime", "val", "name"),
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
index 19870a8..0144117 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
@@ -30,7 +30,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{DataTypes, TableEnvironment, TableSchema, 
Types}
 import org.apache.flink.table.catalog.{CatalogPartitionImpl, 
CatalogPartitionSpec, CatalogTableImpl, ObjectPath}
 import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR, 
CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
+import org.apache.flink.table.descriptors.{DescriptorProperties, Schema, 
SchemaValidator}
 import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall
 import org.apache.flink.table.expressions.{CallExpression, Expression, 
FieldReferenceExpression, ValueLiteralExpression}
 import org.apache.flink.table.factories.{StreamTableSourceFactory, 
TableSourceFactory}
@@ -44,6 +44,7 @@ import 
org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
PreserveWatermarks}
 import org.apache.flink.table.types.DataType
 import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.utils.EncodingUtils
 import org.apache.flink.types.Row
 
 import java.io.{File, FileOutputStream, OutputStreamWriter}
@@ -193,6 +194,67 @@ class TestTableSourceWithTime[T](
   }
 }
 
+class TestTableSourceWithTimeFactory[T] extends StreamTableSourceFactory[T] {
+  override def createStreamTableSource(properties: JMap[String, String]): 
StreamTableSource[T] = {
+    val dp = new DescriptorProperties()
+    dp.putProperties(properties)
+
+    val isBounded = dp.getOptionalBoolean("is-bounded").orElse(false)
+    val tableSchema = dp.getTableSchema(Schema.SCHEMA)
+    val serializedData = dp.getOptionalString("data").orElse(null)
+    val data = if (serializedData != null) {
+      EncodingUtils.decodeStringToObject(serializedData, classOf[List[T]])
+    } else {
+      Seq.empty[T]
+    }
+    val rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(dp)
+    val rowtime = if (rowtimeAttributes.isEmpty) {
+      null
+    } else {
+      rowtimeAttributes.head.getAttributeName
+    }
+    val proctimeAttribute = SchemaValidator.deriveProctimeAttribute(dp)
+    val proctime = if (proctimeAttribute.isPresent) {
+      proctimeAttribute.get()
+    } else {
+      null
+    }
+
+    val serializedMapKeys = dp.getOptionalString("map-keys").orElse(null)
+    val serializedMapVals = dp.getOptionalString("map-vals").orElse(null)
+    val mapping = if (serializedMapKeys != null && serializedMapVals != null) {
+      val mapKeys = EncodingUtils.decodeStringToObject(serializedMapKeys, 
classOf[List[String]])
+      val mapVals = EncodingUtils.decodeStringToObject(serializedMapVals, 
classOf[List[String]])
+      if (mapKeys.length != mapVals.length) {
+        null
+      } else {
+        mapKeys.zip(mapVals).toMap
+      }
+    } else {
+      null
+    }
+
+    val existingTs = dp.getOptionalString("existingTs").orElse(null)
+
+    val returnType = tableSchema.toRowType.asInstanceOf[TypeInformation[T]]
+
+    new TestTableSourceWithTime[T](
+      isBounded, tableSchema, returnType, data, rowtime, proctime, mapping, 
existingTs)
+  }
+
+  override def requiredContext(): JMap[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, "TestTableSourceWithTime")
+    context
+  }
+
+  override def supportedProperties(): JList[String] = {
+    val properties = new util.LinkedList[String]()
+    properties.add("*")
+    properties
+  }
+}
+
 class TestPreserveWMTableSource[T](
     tableSchema: TableSchema,
     returnType: TypeInformation[T],

Reply via email to