This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 247ab12 [FLINK-16160][table] Fix proctime()/rowtime() doesn't work
for TableEnvironment.connect().createTemporaryTable()
247ab12 is described below
commit 247ab12599e101af4404d87f7f5abb59262483cf
Author: Zhenghua Gao <[email protected]>
AuthorDate: Tue May 19 14:16:57 2020 +0800
[FLINK-16160][table] Fix proctime()/rowtime() doesn't work for
TableEnvironment.connect().createTemporaryTable()
This closes #12218
---
.../flink/table/catalog/ConnectorCatalogTable.java | 2 +-
.../table/descriptors/ConnectTableDescriptor.java | 7 ++-
.../flink/table/sources/TableSourceValidation.java | 9 +++
.../table/planner/catalog/CatalogSchemaTable.java | 50 ++++++++++++++++-
.../planner/plan/schema/CatalogSourceTable.scala | 42 +++++++++-----
.../org.apache.flink.table.factories.TableFactory | 1 +
.../planner/plan/stream/sql/TableSourceTest.xml | 58 ++++++++++++++++++++
.../planner/plan/stream/sql/TableSourceTest.scala | 61 ++++++++++++++++++++-
.../table/planner/utils/testTableSources.scala | 64 +++++++++++++++++++++-
9 files changed, 275 insertions(+), 19 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index 2113a66..f6100a3 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -117,7 +117,7 @@ public class ConnectorCatalogTable<T1, T2> extends
AbstractCatalogTable {
return Optional.empty();
}
- private static <T1> TableSchema calculateSourceSchema(TableSource<T1>
source, boolean isBatch) {
+ public static <T1> TableSchema calculateSourceSchema(TableSource<T1>
source, boolean isBatch) {
TableSchema tableSchema = source.getTableSchema();
if (isBatch) {
return tableSchema;
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
index 4ec3f96..01431f3 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
@@ -140,7 +140,12 @@ public abstract class ConnectTableDescriptor
TableSchema tableSchema = getTableSchema(schemaProperties);
Map<String, String> properties = new HashMap<>(toProperties());
- schemaProperties.keySet().forEach(properties::remove);
+
+ // DescriptorProperties#putTableSchema and getTableSchema are
not symmetrical
+ // We should retain time attribute properties when remove table
schema properties
+ DescriptorProperties descriptorProperties = new
DescriptorProperties();
+ descriptorProperties.putTableSchema(Schema.SCHEMA, tableSchema);
+
descriptorProperties.asMap().keySet().forEach(properties::remove);
CatalogTableImpl catalogTable = new CatalogTableImpl(
tableSchema,
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
index 6bc84d2..a471cf7 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
@@ -75,6 +75,15 @@ public class TableSourceValidation {
return !getRowtimeAttributes(tableSource).isEmpty();
}
+ /**
+ * Checks if the given {@link TableSource} defines a proctime attribute.
+ * @param tableSource The table source to check.
+ * @return true if the given table source defines proctime attribute.
+ */
+ public static boolean hasProctimeAttribute(TableSource<?> tableSource) {
+ return getProctimeAttribute(tableSource).isPresent();
+ }
+
private static void
validateSingleRowtimeAttribute(List<RowtimeAttributeDescriptor>
rowtimeAttributes) {
if (rowtimeAttributes.size() > 1) {
throw new ValidationException("Currently, only a single
rowtime attribute is supported. " +
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
index 942a456..238e84b 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/CatalogSchemaTable.java
@@ -19,14 +19,22 @@
package org.apache.flink.table.planner.catalog;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.factories.TableFactory;
+import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
import org.apache.flink.table.planner.sources.TableSourceUtil;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampKind;
@@ -124,7 +132,7 @@ public class CatalogSchemaTable extends AbstractTable
implements TemporalTable {
return statistic;
}
- private static RelDataType getRowType(RelDataTypeFactory typeFactory,
+ private RelDataType getRowType(RelDataTypeFactory typeFactory,
CatalogBaseTable catalogBaseTable,
boolean isStreamingMode) {
final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory)
typeFactory;
@@ -153,6 +161,24 @@ public class CatalogSchemaTable extends AbstractTable
implements TemporalTable {
}
}
}
+
+ // The following block is a workaround to support tables
defined by TableEnvironment.connect() and
+ // the actual table sources implement
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+ // It should be removed after we remove
DefinedProctimeAttribute/DefinedRowtimeAttributes.
+ Optional<TableSource<?>> sourceOpt = findAndCreateTableSource();
+ if (isStreamingMode
+ &&
tableSchema.getTableColumns().stream().noneMatch(TableColumn::isGenerated)
+ && tableSchema.getWatermarkSpecs().isEmpty()
+ && sourceOpt.isPresent()) {
+ TableSource<?> source = sourceOpt.get();
+ if (TableSourceValidation.hasProctimeAttribute(source)
+ ||
TableSourceValidation.hasRowtimeAttribute(source)) {
+ // If the table is defined by
TableEnvironment.connect(), and use the legacy proctime and rowtime
+ // descriptors, the TableSchema should fallback
to ConnectorCatalogTable#calculateSourceSchema
+ tableSchema =
ConnectorCatalogTable.calculateSourceSchema(source, false);
+ }
+ }
+
return TableSourceUtil.getSourceRowType(flinkTypeFactory,
tableSchema,
scala.Option.empty(),
@@ -168,4 +194,26 @@ public class CatalogSchemaTable extends AbstractTable
implements TemporalTable {
public String getSysEndFieldName() {
return "sys_end";
}
+
+ private Optional<TableSource<?>> findAndCreateTableSource() {
+ Optional<TableSource<?>> tableSource = Optional.empty();
+ try {
+ if (catalogBaseTable instanceof CatalogTableImpl) {
+ TableSource<?> source =
TableFactoryUtil.findAndCreateTableSource((CatalogTable) catalogBaseTable);
+ if (source instanceof StreamTableSource) {
+ if (!isStreamingMode &&
!((StreamTableSource) source).isBounded()) {
+ throw new
ValidationException("Cannot query on an unbounded source in batch mode, but " +
+
tableIdentifier.asSummaryString() + " is unbounded.");
+ }
+ tableSource = Optional.of(source);
+ } else {
+ throw new ValidationException("Catalog
tables only support " +
+ "StreamTableSource and
InputFormatTableSource.");
+ }
+ }
+ } catch (Exception e) {
+ tableSource = Optional.empty();
+ }
+ return tableSource;
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 05b2d81..ce23e8a 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -72,7 +72,7 @@ class CatalogSourceTable[T](
val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
// erase time indicator types in the rowType
- val erasedRowType = eraseTimeIndicator(rowType, typeFactory)
+ val erasedRowType = eraseTimeIndicator(rowType, typeFactory, tableSource)
val tableSourceTable = new TableSourceTable[T](
relOptSchema,
@@ -192,20 +192,34 @@ class CatalogSourceTable[T](
*/
private def eraseTimeIndicator(
relDataType: RelDataType,
- factory: FlinkTypeFactory): RelDataType = {
- val logicalRowType = FlinkTypeFactory.toLogicalRowType(relDataType)
- val fieldNames = logicalRowType.getFieldNames
- val fieldTypes = logicalRowType.getFields.map { f =>
- if (FlinkTypeFactory.isTimeIndicatorType(f.getType)) {
- val timeIndicatorType = f.getType.asInstanceOf[TimestampType]
- new TimestampType(
- timeIndicatorType.isNullable,
- TimestampKind.REGULAR,
- timeIndicatorType.getPrecision)
- } else {
- f.getType
+ factory: FlinkTypeFactory,
+ tableSource: TableSource[_]): RelDataType = {
+
+ val hasLegacyTimeAttributes =
+ TableSourceValidation.hasRowtimeAttribute(tableSource) ||
+ TableSourceValidation.hasProctimeAttribute(tableSource)
+
+ // If the table source is defined by TableEnvironment.connect() and the
time attributes are
+ // defined by legacy proctime and rowtime descriptors, we should not erase
time indicator types
+ if (columnExprs.isEmpty
+ && catalogTable.getSchema.getWatermarkSpecs.isEmpty
+ && hasLegacyTimeAttributes) {
+ relDataType
+ } else {
+ val logicalRowType = FlinkTypeFactory.toLogicalRowType(relDataType)
+ val fieldNames = logicalRowType.getFieldNames
+ val fieldTypes = logicalRowType.getFields.map { f =>
+ if (FlinkTypeFactory.isTimeIndicatorType(f.getType)) {
+ val timeIndicatorType = f.getType.asInstanceOf[TimestampType]
+ new TimestampType(
+ timeIndicatorType.isNullable,
+ TimestampKind.REGULAR,
+ timeIndicatorType.getPrecision)
+ } else {
+ f.getType
+ }
}
+ factory.buildRelNodeRowType(fieldNames, fieldTypes)
}
- factory.buildRelNodeRowType(fieldNames, fieldTypes)
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index 9b16bc6..5e7b23f 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++
b/flink-table/flink-table-planner-blink/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -18,3 +18,4 @@
org.apache.flink.table.planner.runtime.batch.sql.TestPartitionableSinkFactory
org.apache.flink.table.planner.utils.TestPartitionableSourceFactory
org.apache.flink.table.planner.utils.TestFilterableTableSourceFactory
org.apache.flink.table.planner.utils.TestProjectableTableSourceFactory
+org.apache.flink.table.planner.utils.TestTableSourceWithTimeFactory
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index 837fa08..b0a7f33 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -132,6 +132,64 @@ Calc(select=[name, w$end AS EXPR$1, EXPR$2])
]]>
</Resource>
</TestCase>
+ <TestCase name="testLegacyRowTimeTableGroupWindow">
+ <Resource name="sql">
+ <![CDATA[
+SELECT name,
+ TUMBLE_END(rowtime, INTERVAL '10' MINUTE),
+ AVG(val)
+FROM rowTimeT WHERE val > 100
+ GROUP BY name, TUMBLE(rowtime, INTERVAL '10' MINUTE)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)], EXPR$2=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$2=[AVG($2)])
+ +- LogicalProject(name=[$2], $f1=[TUMBLE($3, 600000:INTERVAL MINUTE)],
val=[$1])
+ +- LogicalFilter(condition=[>($1, 100)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
rowTimeT]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[name, w$end AS EXPR$1, EXPR$2])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w$,
rowtime, 600000)], properties=[w$start, w$end, w$rowtime, w$proctime],
select=[name, AVG(val) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+ +- Exchange(distribution=[hash[name]])
+ +- Calc(select=[name, rowtime, val], where=[>(val, 100)])
+ +- TableSourceScan(table=[[default_catalog, default_database,
rowTimeT]], fields=[id, val, name, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLegacyProcTimeTableGroupWindow">
+ <Resource name="sql">
+ <![CDATA[
+SELECT name,
+ TUMBLE_END(proctime, INTERVAL '10' MINUTE),
+ AVG(val)
+FROM procTimeT WHERE val > 100
+ GROUP BY name, TUMBLE(proctime, INTERVAL '10' MINUTE)
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)], EXPR$2=[$2])
++- LogicalAggregate(group=[{0, 1}], EXPR$2=[AVG($2)])
+ +- LogicalProject(name=[$2], $f1=[TUMBLE($3, 600000:INTERVAL MINUTE)],
val=[$1])
+ +- LogicalFilter(condition=[>($1, 100)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
procTimeT]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[name, w$end AS EXPR$1, EXPR$2])
++- GroupWindowAggregate(groupBy=[name], window=[TumblingGroupWindow('w$,
proctime, 600000)], properties=[w$start, w$end, w$proctime], select=[name,
AVG(val) AS EXPR$2, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS
w$proctime])
+ +- Exchange(distribution=[hash[name]])
+ +- Calc(select=[name, proctime, val], where=[>(val, 100)])
+ +- TableSourceScan(table=[[default_catalog, default_database,
procTimeT]], fields=[id, val, name, proctime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testFilterFullyPushDown">
<Resource name="sql">
<![CDATA[SELECT * FROM FilterableTable WHERE amount > 2 AND amount <
10]]>
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
index 8a108c9..b3b5efd 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
@@ -21,14 +21,16 @@ package org.apache.flink.table.planner.plan.stream.sql
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo,
TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{DataTypes, TableSchema, Types,
ValidationException}
+import org.apache.flink.table.descriptors.{ConnectorDescriptor, Rowtime,
Schema}
import org.apache.flink.table.planner.expressions.utils.Func1
import org.apache.flink.table.planner.utils.{DateTimeTestUtil, TableTestBase,
TestFilterableTableSource, TestNestedProjectableTableSource,
TestPartitionableSourceFactory, TestProjectableTableSource, TestTableSource,
TestTableSourceWithTime}
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
-
import org.junit.{Before, Test}
+import _root_.java.util.{Collections, Map => JMap}
+
class TableSourceTest extends TableTestBase {
private val util = streamTestUtil()
@@ -131,6 +133,63 @@ class TableSourceTest extends TableTestBase {
}
@Test
+ def testLegacyRowTimeTableGroupWindow(): Unit = {
+ util.tableEnv.connect(
+ new ConnectorDescriptor("TestTableSourceWithTime", 1, false) {
+ override protected def toConnectorProperties: JMap[String, String] = {
+ Collections.emptyMap()
+ }
+ }
+ ).withSchema(
+ new Schema()
+ .field("id", DataTypes.INT())
+ .field("val", DataTypes.BIGINT())
+ .field("name", DataTypes.STRING())
+ .field("rowtime", DataTypes.TIMESTAMP(3))
+ .rowtime(new
Rowtime().timestampsFromField("rowtime").watermarksPeriodicBounded(1000))
+ ).createTemporaryTable("rowTimeT")
+
+ val sql =
+ """
+ |SELECT name,
+ | TUMBLE_END(rowtime, INTERVAL '10' MINUTE),
+ | AVG(val)
+ |FROM rowTimeT WHERE val > 100
+ | GROUP BY name, TUMBLE(rowtime, INTERVAL '10' MINUTE)
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
+ def testLegacyProcTimeTableGroupWindow(): Unit = {
+ util.tableEnv.connect(
+ new ConnectorDescriptor("TestTableSourceWithTime", 1, false) {
+ override protected def toConnectorProperties: JMap[String, String] = {
+ Collections.emptyMap()
+ }
+ }
+ ).withSchema(
+ new Schema()
+ .field("id", DataTypes.INT())
+ .field("val", DataTypes.BIGINT())
+ .field("name", DataTypes.STRING())
+ .field("proctime", DataTypes.TIMESTAMP(3)).proctime()
+ ).createTemporaryTable("procTimeT")
+
+ val sql =
+ """
+ |SELECT name,
+ | TUMBLE_END(proctime, INTERVAL '10' MINUTE),
+ | AVG(val)
+ |FROM procTimeT WHERE val > 100
+ | GROUP BY name, TUMBLE(proctime, INTERVAL '10' MINUTE)
+ """.stripMargin
+
+ util.verifyPlan(sql)
+ }
+
+ @Test
def testProcTimeTableSourceSimple(): Unit = {
val tableSchema = new TableSchema(
Array("id", "pTime", "val", "name"),
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
index 19870a8..0144117 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSources.scala
@@ -30,7 +30,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, TableEnvironment, TableSchema,
Types}
import org.apache.flink.table.catalog.{CatalogPartitionImpl,
CatalogPartitionSpec, CatalogTableImpl, ObjectPath}
import
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR,
CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
+import org.apache.flink.table.descriptors.{DescriptorProperties, Schema,
SchemaValidator}
import
org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall
import org.apache.flink.table.expressions.{CallExpression, Expression,
FieldReferenceExpression, ValueLiteralExpression}
import org.apache.flink.table.factories.{StreamTableSourceFactory,
TableSourceFactory}
@@ -44,6 +44,7 @@ import
org.apache.flink.table.sources.tsextractors.ExistingField
import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps,
PreserveWatermarks}
import org.apache.flink.table.types.DataType
import
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.utils.EncodingUtils
import org.apache.flink.types.Row
import java.io.{File, FileOutputStream, OutputStreamWriter}
@@ -193,6 +194,67 @@ class TestTableSourceWithTime[T](
}
}
+class TestTableSourceWithTimeFactory[T] extends StreamTableSourceFactory[T] {
+ override def createStreamTableSource(properties: JMap[String, String]):
StreamTableSource[T] = {
+ val dp = new DescriptorProperties()
+ dp.putProperties(properties)
+
+ val isBounded = dp.getOptionalBoolean("is-bounded").orElse(false)
+ val tableSchema = dp.getTableSchema(Schema.SCHEMA)
+ val serializedData = dp.getOptionalString("data").orElse(null)
+ val data = if (serializedData != null) {
+ EncodingUtils.decodeStringToObject(serializedData, classOf[List[T]])
+ } else {
+ Seq.empty[T]
+ }
+ val rowtimeAttributes = SchemaValidator.deriveRowtimeAttributes(dp)
+ val rowtime = if (rowtimeAttributes.isEmpty) {
+ null
+ } else {
+ rowtimeAttributes.head.getAttributeName
+ }
+ val proctimeAttribute = SchemaValidator.deriveProctimeAttribute(dp)
+ val proctime = if (proctimeAttribute.isPresent) {
+ proctimeAttribute.get()
+ } else {
+ null
+ }
+
+ val serializedMapKeys = dp.getOptionalString("map-keys").orElse(null)
+ val serializedMapVals = dp.getOptionalString("map-vals").orElse(null)
+ val mapping = if (serializedMapKeys != null && serializedMapVals != null) {
+ val mapKeys = EncodingUtils.decodeStringToObject(serializedMapKeys,
classOf[List[String]])
+ val mapVals = EncodingUtils.decodeStringToObject(serializedMapVals,
classOf[List[String]])
+ if (mapKeys.length != mapVals.length) {
+ null
+ } else {
+ mapKeys.zip(mapVals).toMap
+ }
+ } else {
+ null
+ }
+
+ val existingTs = dp.getOptionalString("existingTs").orElse(null)
+
+ val returnType = tableSchema.toRowType.asInstanceOf[TypeInformation[T]]
+
+ new TestTableSourceWithTime[T](
+ isBounded, tableSchema, returnType, data, rowtime, proctime, mapping,
existingTs)
+ }
+
+ override def requiredContext(): JMap[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR_TYPE, "TestTableSourceWithTime")
+ context
+ }
+
+ override def supportedProperties(): JList[String] = {
+ val properties = new util.LinkedList[String]()
+ properties.add("*")
+ properties
+ }
+}
+
class TestPreserveWMTableSource[T](
tableSchema: TableSchema,
returnType: TypeInformation[T],