This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new ae83244  [FLINK-11234] [table] Fix ExternalTableCatalogBuilder unable 
to build a batch-only table
ae83244 is described below

commit ae832444320d546275ab1214df7255324d3b8fa9
Author: EronWright <eronwri...@gmail.com>
AuthorDate: Sun Dec 30 19:59:47 2018 -0800

    [FLINK-11234] [table] Fix ExternalTableCatalogBuilder unable to build a 
batch-only table
    
    - fix the logic in supportsBatch to properly declare a batch-only table
    - adjust CommonTestData to provide batch-only or streaming-only tables
    
    This closes #7386.
---
 .../apache/flink/table/catalog/ExternalCatalogTable.scala    |  4 ++--
 .../apache/flink/table/runtime/utils/CommonTestData.scala    | 12 +++++++++---
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 45414ee..ce57070 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -270,8 +270,8 @@ class ExternalCatalogTableBuilder(private val 
connectorDescriptor: ConnectorDesc
     * Explicitly declares this external table for supporting only batch 
environments.
     */
   def supportsBatch(): ExternalCatalogTableBuilder = {
-    isBatch = false
-    isStreaming = true
+    isBatch = true
+    isStreaming = false
     this
   }
 
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index 64fcc8a..1209595 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -85,7 +85,9 @@ object CommonTestData {
       .withSchema(schemaDesc1)
 
     if (isStreaming) {
-      externalTableBuilder1.inAppendMode()
+      externalTableBuilder1.supportsStreaming().inAppendMode()
+    } else {
+      externalTableBuilder1.supportsBatch()
     }
 
     val csvRecord2 = Seq(
@@ -126,7 +128,9 @@ object CommonTestData {
       .withSchema(schemaDesc2)
 
     if (isStreaming) {
-      externalTableBuilder2.inAppendMode()
+      externalTableBuilder2.supportsStreaming().inAppendMode()
+    } else {
+      externalTableBuilder2.supportsBatch()
     }
 
     val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
@@ -145,7 +149,9 @@ object CommonTestData {
       .withSchema(schemaDesc3)
 
     if (isStreaming) {
-      externalTableBuilder3.inAppendMode()
+      externalTableBuilder3.supportsStreaming().inAppendMode()
+    } else {
+      externalTableBuilder3.supportsBatch()
     }
 
     val catalog = new InMemoryExternalCatalog("test")

Reply via email to