This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 04035ce [FLINK-11234] [table] Fix ExternalTableCatalogBuilder unable to build a batch-only table 04035ce is described below commit 04035ce8b7927136c7f1e43f0514a055ce45e56a Author: EronWright <eronwri...@gmail.com> AuthorDate: Sun Dec 30 19:59:47 2018 -0800 [FLINK-11234] [table] Fix ExternalTableCatalogBuilder unable to build a batch-only table - fix the logic in supportsBatch to properly declare a batch-only table - adjust CommonTestData to provide batch-only or streaming-only tables This closes #7386. --- .../apache/flink/table/catalog/ExternalCatalogTable.scala | 4 ++-- .../apache/flink/table/runtime/utils/CommonTestData.scala | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala index 45414ee..ce57070 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala @@ -270,8 +270,8 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc * Explicitly declares this external table for supporting only batch environments. */ def supportsBatch(): ExternalCatalogTableBuilder = { - isBatch = false - isStreaming = true + isBatch = true + isStreaming = false this } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala index 64fcc8a..1209595 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala @@ -85,7 +85,9 @@ object CommonTestData { .withSchema(schemaDesc1) if (isStreaming) { - externalTableBuilder1.inAppendMode() + externalTableBuilder1.supportsStreaming().inAppendMode() + } else { + externalTableBuilder1.supportsBatch() } val csvRecord2 = Seq( @@ -126,7 +128,9 @@ object CommonTestData { .withSchema(schemaDesc2) if (isStreaming) { - externalTableBuilder2.inAppendMode() + externalTableBuilder2.supportsStreaming().inAppendMode() + } else { + externalTableBuilder2.supportsBatch() } val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp") @@ -145,7 +149,9 @@ object CommonTestData { .withSchema(schemaDesc3) if (isStreaming) { - externalTableBuilder3.inAppendMode() + externalTableBuilder3.supportsStreaming().inAppendMode() + } else { + externalTableBuilder3.supportsBatch() } val catalog = new InMemoryExternalCatalog("test")