This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 88458eb  [FLINK-13600][table] Rework TableEnvironment.connect() class 
hierarchy
88458eb is described below

commit 88458ebe41d170a43e5621f4af007dc8e25b7115
Author: Timo Walther <twal...@apache.org>
AuthorDate: Tue Aug 6 15:04:51 2019 +0200

    [FLINK-13600][table] Rework TableEnvironment.connect() class hierarchy
    
    This closes #9382.
---
 .../table/catalog/hive/HiveCatalogITCase.java      |   4 +-
 .../catalog/hive/HiveCatalogUseBlinkITCase.java    |   4 +-
 .../apache/flink/table/api/TableEnvironment.java   |   4 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   4 +-
 .../flink/table/catalog/CatalogTableBuilder.java   |  92 +++-----------
 .../table/descriptors/BatchTableDescriptor.java    |  16 ++-
 .../table/descriptors/ConnectTableDescriptor.java  |  78 ++++--------
 .../table/descriptors/StreamTableDescriptor.java   |  85 ++-----------
 .../flink/table/api/TableEnvironmentTest.java      |  74 +++++++++++
 .../table/utils/ConnectorDescriptorMock.java}      |  31 +++--
 .../apache/flink/table/utils/ExecutorMock.java}    |  26 ++--
 .../flink/table/utils/FormatDescriptorMock.java}   |  31 +++--
 .../org/apache/flink/table/utils/PlannerMock.java} |  37 ++++--
 .../flink/table/utils/TableEnvironmentMock.java    |  99 +++++++++++++++
 .../flink/table/utils/TableSourceFactoryMock.java  |  75 +++++++++++
 .../apache/flink/table/utils/TableSourceMock.java} |  34 +++--
 .../org.apache.flink.table.factories.TableFactory  |   1 +
 .../flink/table/catalog/ExternalCatalogTable.java  |   4 +-
 .../table/descriptors/RegistrableDescriptor.java   |  52 --------
 .../table/descriptors/SchematicDescriptor.java     |   9 +-
 .../table/descriptors/StreamableDescriptor.java    |  73 -----------
 .../flink/table/descriptors/TableDescriptor.java   | 139 ++++++++++++++++++++-
 .../table/descriptors/ConnectTableDescriptor.java  | 131 -------------------
 .../table/descriptors/StreamTableDescriptor.java   | 112 -----------------
 .../catalog/ExternalCatalogTableBuilder.scala      |  77 +-----------
 .../table/descriptors/TableDescriptorTest.scala    |   2 +-
 .../table/descriptors/TestTableDescriptor.scala    |  44 ++-----
 .../flink/table/utils/MockTableEnvironment.scala   |   6 +-
 28 files changed, 580 insertions(+), 764 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index df702ad..0898ac3 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -109,7 +109,7 @@ public class HiveCatalogITCase {
                                schema)
                        .withFormat(format)
                        .inAppendMode()
-                       .withComment(null)
+                       .withComment("Comment.")
                        .build();
 
                Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), 
"test.csv");
@@ -120,7 +120,7 @@ public class HiveCatalogITCase {
                                schema)
                                .withFormat(format)
                                .inAppendMode()
-                               .withComment(null)
+                               .withComment("Comment.")
                                .build();
 
                hiveCatalog.createTable(
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
index 6f669d0..8308efb 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
@@ -115,7 +115,7 @@ public class HiveCatalogUseBlinkITCase {
                                                schema)
                                                .withFormat(format)
                                                .inAppendMode()
-                                               .withComment(null)
+                                               .withComment("Comment.")
                                                .build();
 
                Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), 
"test.csv");
@@ -138,7 +138,7 @@ public class HiveCatalogUseBlinkITCase {
                                                sinkSchema)
                                                .withFormat(sinkFormat)
                                                .inAppendMode()
-                                               .withComment(null)
+                                               .withComment("Comment.")
                                                .build();
 
                hiveCatalog.createTable(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 8f8364f..eff0c2b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.ExternalCatalog;
+import org.apache.flink.table.descriptors.ConnectTableDescriptor;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.descriptors.TableDescriptor;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
@@ -254,7 +254,7 @@ public interface TableEnvironment {
         *
         * @param connectorDescriptor connector descriptor describing the 
external system
         */
-       TableDescriptor connect(ConnectorDescriptor connectorDescriptor);
+       ConnectTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
 
        /**
         * Gets the names of all catalogs registered in this environment.
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 0b5d5fe..2979296 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -45,9 +45,9 @@ import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.descriptors.ConnectTableDescriptor;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
-import org.apache.flink.table.descriptors.TableDescriptor;
 import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
@@ -234,7 +234,7 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
        }
 
        @Override
-       public TableDescriptor connect(ConnectorDescriptor connectorDescriptor) 
{
+       public ConnectTableDescriptor connect(ConnectorDescriptor 
connectorDescriptor) {
                return new StreamTableDescriptor(this, connectorDescriptor);
        }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
index 01f46bd..1760e19 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableBuilder.java
@@ -22,24 +22,13 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.descriptors.ConnectorFormatDescriptor;
 import org.apache.flink.table.descriptors.Descriptor;
 import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.FormatDescriptor;
-import org.apache.flink.table.descriptors.Metadata;
-import org.apache.flink.table.descriptors.Statistics;
-import org.apache.flink.table.descriptors.StreamableDescriptor;
 import org.apache.flink.table.descriptors.TableDescriptor;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.Optional;
-
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A builder for creating a {@link CatalogTable}.
@@ -58,8 +47,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *       new ExternalSystemXYZ()
  *         .version("0.11"),
  *       new TableSchema.Builder()
- *                .fields(names, dataTypes)
- *                .build())
+ *         .fields(names, dataTypes)
+ *         .build())
  *   .withFormat(
  *     new Json()
  *       .jsonSchema("{...}")
@@ -69,73 +58,36 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * </code>
  */
 @PublicEvolving
-public class CatalogTableBuilder
-               extends TableDescriptor
-               implements ConnectorFormatDescriptor<CatalogTableBuilder>, 
StreamableDescriptor<CatalogTableBuilder> {
+public final class CatalogTableBuilder extends 
TableDescriptor<CatalogTableBuilder> {
 
-       private final ConnectorDescriptor connectorDescriptor;
        private final TableSchema tableSchema;
-       private final boolean isGeneric;
 
        private String comment;
 
-       private Optional<FormatDescriptor> formatDescriptor = Optional.empty();
-       private Optional<Statistics> statisticsDescriptor = Optional.empty();
-       private Optional<Metadata> metadataDescriptor = Optional.empty();
-       private Optional<String> updateMode = Optional.empty();
+       private final boolean isGeneric;
+
        private Map<String, String> properties = Collections.emptyMap();
 
-       /**
-        *
-        * @param connectorDescriptor descriptor of the connector
-        * @param tableSchema schema of the table
-        */
        public CatalogTableBuilder(ConnectorDescriptor connectorDescriptor, 
TableSchema tableSchema) {
-               this.connectorDescriptor = checkNotNull(connectorDescriptor);
-               this.tableSchema = checkNotNull(tableSchema);
+               super(connectorDescriptor);
+               this.tableSchema = Preconditions.checkNotNull(tableSchema);
 
-               // We don't support non generic table currently
+               // We don't support non-generic table currently
                this.isGeneric = true;
        }
 
-       @Override
-       public CatalogTableBuilder withFormat(FormatDescriptor format) {
-               this.formatDescriptor = Optional.of(checkNotNull(format));
-               return this;
-       }
-
-       @Override
-       public CatalogTableBuilder inAppendMode() {
-               updateMode = Optional.of(UPDATE_MODE_VALUE_APPEND);
-               return this;
-       }
-
-       @Override
-       public CatalogTableBuilder inRetractMode() {
-               updateMode = Optional.of(UPDATE_MODE_VALUE_RETRACT);
-               return this;
-       }
-
-       @Override
-       public CatalogTableBuilder inUpsertMode() {
-               updateMode = Optional.of(UPDATE_MODE_VALUE_UPSERT);
-               return this;
-       }
-
        public CatalogTableBuilder withComment(String comment) {
-               this.comment = comment;
+               this.comment = Preconditions.checkNotNull(comment, "Comment 
must not be null.");
                return this;
        }
 
        public CatalogTableBuilder withProperties(Map<String, String> 
properties) {
-               this.properties = checkNotNull(properties);
+               this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
                return this;
        }
 
        /**
-        * Build a {@link CatalogTable}.
-        *
-        * @return catalog table
+        * Builds a {@link CatalogTable}.
         */
        public CatalogTable build() {
                return new CatalogTableImpl(
@@ -145,28 +97,12 @@ public class CatalogTableBuilder
        }
 
        @Override
-       public Map<String, String> toProperties() {
+       protected Map<String, String> additionalProperties() {
                DescriptorProperties descriptorProperties = new 
DescriptorProperties();
-               
descriptorProperties.putProperties(connectorDescriptor.toProperties());
-
-               if (formatDescriptor.isPresent()) {
-                       
descriptorProperties.putProperties(formatDescriptor.get().toProperties());
-               }
-
-               if (statisticsDescriptor.isPresent()) {
-                       
descriptorProperties.putProperties(statisticsDescriptor.get().toProperties());
-               }
-
-               if (metadataDescriptor.isPresent()) {
-                       
descriptorProperties.putProperties(metadataDescriptor.get().toProperties());
-               }
 
-               if (updateMode.isPresent()) {
-                       descriptorProperties.putString(UPDATE_MODE, 
updateMode.get());
-               }
+               descriptorProperties.putBoolean(CatalogConfig.IS_GENERIC, 
isGeneric);
 
                descriptorProperties.putProperties(this.properties);
-               descriptorProperties.putString(CatalogConfig.IS_GENERIC, 
String.valueOf(isGeneric));
 
                return descriptorProperties.asMap();
        }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
index cd0b0e2..cd01801 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
@@ -22,14 +22,20 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.TableEnvironment;
 
 /**
- * Descriptor for specifying a table source and/or sink in a batch environment.
+ * Describes a table connected from a batch environment.
+ *
+ * <p>This class just exists for backwards compatibility use {@link 
ConnectTableDescriptor} for
+ * declarations.
  */
 @PublicEvolving
-public class BatchTableDescriptor extends 
ConnectTableDescriptor<BatchTableDescriptor> {
+public final class BatchTableDescriptor extends ConnectTableDescriptor {
 
-       public BatchTableDescriptor(
-               TableEnvironment tableEnv,
-               ConnectorDescriptor connectorDescriptor) {
+       public BatchTableDescriptor(TableEnvironment tableEnv, 
ConnectorDescriptor connectorDescriptor) {
                super(tableEnv, connectorDescriptor);
        }
+
+       @Override
+       public BatchTableDescriptor withSchema(Schema schema) {
+               return (BatchTableDescriptor) super.withSchema(schema);
+       }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
index a046a06..ffc56ab 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
@@ -20,32 +20,42 @@ package org.apache.flink.table.descriptors;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+import java.util.Collections;
 import java.util.Map;
-import java.util.Optional;
 
 /**
- * Common class for table's created with {@link 
TableEnvironment#connect(ConnectorDescriptor connectorDescriptor)}.
+ * Describes a table connected from {@link 
TableEnvironment#connect(ConnectorDescriptor)}.
+ *
+ * <p>It can access {@link TableEnvironment} for fluently registering the 
table.
  */
 @PublicEvolving
-public abstract class ConnectTableDescriptor<D extends 
ConnectTableDescriptor<D>>
-       extends TableDescriptor
-       implements SchematicDescriptor<D>, RegistrableDescriptor {
-
-       private Optional<FormatDescriptor> formatDescriptor = Optional.empty();
-       private Optional<Schema> schemaDescriptor = Optional.empty();
+public abstract class ConnectTableDescriptor
+       extends TableDescriptor<ConnectTableDescriptor>
+       implements SchematicDescriptor<ConnectTableDescriptor> {
 
        private final TableEnvironment tableEnv;
-       private final ConnectorDescriptor connectorDescriptor;
+
+       private @Nullable Schema schemaDescriptor;
 
        public ConnectTableDescriptor(TableEnvironment tableEnv, 
ConnectorDescriptor connectorDescriptor) {
+               super(connectorDescriptor);
                this.tableEnv = tableEnv;
-               this.connectorDescriptor = connectorDescriptor;
+       }
+
+       /**
+        * Specifies the resulting table schema.
+        */
+       @Override
+       public ConnectTableDescriptor withSchema(Schema schema) {
+               schemaDescriptor = Preconditions.checkNotNull(schema, "Schema 
must not be null.");
+               return this;
        }
 
        /**
@@ -54,7 +64,6 @@ public abstract class ConnectTableDescriptor<D extends 
ConnectTableDescriptor<D>
         *
         * @param name table name to be registered in the table environment
         */
-       @Override
        public void registerTableSource(String name) {
                Preconditions.checkNotNull(name);
                TableSource<?> tableSource = 
TableFactoryUtil.findAndCreateTableSource(this);
@@ -67,7 +76,6 @@ public abstract class ConnectTableDescriptor<D extends 
ConnectTableDescriptor<D>
         *
         * @param name table name to be registered in the table environment
         */
-       @Override
        public void registerTableSink(String name) {
                Preconditions.checkNotNull(name);
                TableSink<?> tableSink = 
TableFactoryUtil.findAndCreateTableSink(this);
@@ -80,52 +88,16 @@ public abstract class ConnectTableDescriptor<D extends 
ConnectTableDescriptor<D>
         *
         * @param name table name to be registered in the table environment
         */
-       @Override
        public void registerTableSourceAndSink(String name) {
                registerTableSource(name);
                registerTableSink(name);
        }
 
-       /**
-        * Specifies the format that defines how to read data from a connector.
-        */
-       @Override
-       public D withFormat(FormatDescriptor format) {
-               formatDescriptor = 
Optional.of(Preconditions.checkNotNull(format));
-               return (D) this;
-       }
-
-       /**
-        * Specifies the resulting table schema.
-        */
        @Override
-       public D withSchema(Schema schema) {
-               schemaDescriptor = 
Optional.of(Preconditions.checkNotNull(schema));
-               return (D) this;
-       }
-
-       /**
-        * Converts this descriptor into a set of properties.
-        */
-       @Override
-       public Map<String, String> toProperties() {
-               DescriptorProperties properties = new DescriptorProperties();
-
-               // this performs only basic validation
-               // more validation can only happen within a factory
-               if (connectorDescriptor.isFormatNeeded() && 
!formatDescriptor.isPresent()) {
-                       throw new ValidationException(String.format("The 
connector %s requires a format description.", connectorDescriptor.toString()));
-               } else if (!connectorDescriptor.isFormatNeeded() && 
formatDescriptor.isPresent()) {
-                       throw new ValidationException(
-                               String.format("The connector %s does not 
require a format description " +
-                               "but %s found.", 
connectorDescriptor.toString(), formatDescriptor.get().toString()));
+       protected Map<String, String> additionalProperties() {
+               if (schemaDescriptor != null) {
+                       return schemaDescriptor.toProperties();
                }
-
-               properties.putProperties(connectorDescriptor.toProperties());
-
-               formatDescriptor.ifPresent(s -> 
properties.putProperties(s.toProperties()));
-               schemaDescriptor.ifPresent(s -> 
properties.putProperties(s.toProperties()));
-
-               return properties.asMap();
+               return Collections.emptyMap();
        }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java
index 456bbff..8e67959 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java
@@ -21,92 +21,21 @@ package org.apache.flink.table.descriptors;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.TableEnvironment;
 
-import java.util.Map;
-import java.util.Optional;
-
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT;
-
 /**
- * Descriptor for specifying a table source and/or sink in a streaming 
environment.
+ * Describes a table connected from a streaming environment.
+ *
+ * <p>This class just exists for backwards compatibility use {@link 
ConnectTableDescriptor} for
+ * declarations.
  */
 @PublicEvolving
-public class StreamTableDescriptor
-       extends ConnectTableDescriptor<StreamTableDescriptor>
-       implements StreamableDescriptor<StreamTableDescriptor> {
-
-       private Optional<String> updateMode = Optional.empty();
+public final class StreamTableDescriptor extends ConnectTableDescriptor {
 
        public StreamTableDescriptor(TableEnvironment tableEnv, 
ConnectorDescriptor connectorDescriptor) {
                super(tableEnv, connectorDescriptor);
        }
 
-       /**
-        * Declares how to perform the conversion between a dynamic table and 
an external connector.
-        *
-        * <p>In append mode, a dynamic table and an external connector only 
exchange INSERT messages.
-        *
-        * @see #inRetractMode()
-        * @see #inUpsertMode()
-        */
        @Override
-       public StreamTableDescriptor inAppendMode() {
-               updateMode = Optional.of(UPDATE_MODE_VALUE_APPEND);
-               return this;
-       }
-
-       /**
-        * Declares how to perform the conversion between a dynamic table and 
an external connector.
-        *
-        * <p>In retract mode, a dynamic table and an external connector 
exchange ADD and RETRACT messages.
-        *
-        * <p>An INSERT change is encoded as an ADD message, a DELETE change as 
a RETRACT message, and an
-        * UPDATE change as a RETRACT message for the updated (previous) row 
and an ADD message for
-        * the updating (new) row.
-        *
-        * <p>In this mode, a key must not be defined as opposed to upsert 
mode. However, every update
-        * consists of two messages which is less efficient.
-        *
-        * @see #inAppendMode()
-        * @see #inUpsertMode()
-        */
-       @Override
-       public StreamTableDescriptor inRetractMode() {
-               updateMode = Optional.of(UPDATE_MODE_VALUE_RETRACT);
-               return this;
-       }
-
-       /**
-        * Declares how to perform the conversion between a dynamic table and 
an external connector.
-        *
-        * <p>In upsert mode, a dynamic table and an external connector 
exchange UPSERT and DELETE messages.
-        *
-        * <p>This mode requires a (possibly composite) unique key by which 
updates can be propagated. The
-        * external connector needs to be aware of the unique key attribute in 
order to apply messages
-        * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. 
DELETE changes as
-        * DELETE messages.
-        *
-        * <p>The main difference to a retract stream is that UPDATE changes 
are encoded with a single
-        * message and are therefore more efficient.
-        *
-        * @see #inAppendMode()
-        * @see #inRetractMode()
-        */
-       @Override
-       public StreamTableDescriptor inUpsertMode() {
-               updateMode = Optional.of(UPDATE_MODE_VALUE_UPSERT);
-               return this;
-       }
-
-       @Override
-       public Map<String, String> toProperties() {
-               DescriptorProperties properties = new DescriptorProperties();
-
-               properties.putProperties(super.toProperties());
-               updateMode.ifPresent(mode -> properties.putString(UPDATE_MODE, 
mode));
-
-               return properties.asMap();
+       public StreamTableDescriptor withSchema(Schema schema) {
+               return (StreamTableDescriptor) super.withSchema(schema);
        }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
new file mode 100644
index 0000000..2c98c21
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ConnectorCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.utils.ConnectorDescriptorMock;
+import org.apache.flink.table.utils.FormatDescriptorMock;
+import org.apache.flink.table.utils.TableEnvironmentMock;
+import org.apache.flink.table.utils.TableSourceFactoryMock;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TableEnvironment}.
+ */
+public class TableEnvironmentTest {
+
+       @Test
+       public void testConnect() throws Exception {
+               final TableEnvironmentMock tableEnv = 
TableEnvironmentMock.getStreamingInstance();
+
+               tableEnv
+                       .connect(new 
ConnectorDescriptorMock(TableSourceFactoryMock.CONNECTOR_TYPE_VALUE, 1, true))
+                       .withFormat(new FormatDescriptorMock("my_format", 1))
+                       .withSchema(new Schema()
+                               .field("my_field_0", "INT")
+                               .field("my_field_1", "BOOLEAN"))
+                       .inAppendMode()
+                       .registerTableSource("my_table");
+
+               final Catalog catalog = 
tableEnv.getCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG)
+                       .orElseThrow(AssertionError::new);
+
+               final CatalogBaseTable table = catalog
+                       .getTable(new 
ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "my_table"));
+
+               assertThat(
+                       table.getSchema(),
+                       equalTo(
+                               TableSchema.builder()
+                                       .field("my_field_0", DataTypes.INT())
+                                       .field("my_field_1", 
DataTypes.BOOLEAN())
+                                       .build()));
+
+               final ConnectorCatalogTable<?, ?> connectorCatalogTable = 
(ConnectorCatalogTable<?, ?>) table;
+
+               assertThat(
+                       connectorCatalogTable.getTableSource().isPresent(),
+                       equalTo(true));
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ConnectorDescriptorMock.java
similarity index 54%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
copy to 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ConnectorDescriptorMock.java
index cd0b0e2..1eecccc 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ConnectorDescriptorMock.java
@@ -16,20 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors;
+package org.apache.flink.table.utils;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.descriptors.ConnectorDescriptor;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
- * Descriptor for specifying a table source and/or sink in a batch environment.
+ * Mocking {@link ConnectorDescriptor} for tests.
  */
-@PublicEvolving
-public class BatchTableDescriptor extends 
ConnectTableDescriptor<BatchTableDescriptor> {
+public class ConnectorDescriptorMock extends ConnectorDescriptor {
+
+       private Map<String, String> connectorProperties = new HashMap<>();
+
+       public ConnectorDescriptorMock(String type, int version, boolean 
formatNeeded) {
+               super(type, version, formatNeeded);
+       }
+
+       public ConnectorDescriptorMock property(String key, String value) {
+               connectorProperties.put(key, value);
+               return this;
+       }
 
-       public BatchTableDescriptor(
-               TableEnvironment tableEnv,
-               ConnectorDescriptor connectorDescriptor) {
-               super(tableEnv, connectorDescriptor);
+       @Override
+       protected Map<String, String> toConnectorProperties() {
+               return connectorProperties;
        }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExecutorMock.java
similarity index 62%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
rename to 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExecutorMock.java
index cd0b0e2..bcd7b7a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/ExecutorMock.java
@@ -16,20 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors;
+package org.apache.flink.table.utils;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.delegation.Executor;
+
+import java.util.List;
 
 /**
- * Descriptor for specifying a table source and/or sink in a batch environment.
+ * Mocking {@link Executor} for tests.
  */
-@PublicEvolving
-public class BatchTableDescriptor extends 
ConnectTableDescriptor<BatchTableDescriptor> {
+public class ExecutorMock implements Executor {
+
+       @Override
+       public void apply(List<Transformation<?>> transformations) {
+               // nothing to do
+       }
 
-       public BatchTableDescriptor(
-               TableEnvironment tableEnv,
-               ConnectorDescriptor connectorDescriptor) {
-               super(tableEnv, connectorDescriptor);
+       @Override
+       public JobExecutionResult execute(String jobName) throws Exception {
+               return null;
        }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FormatDescriptorMock.java
similarity index 56%
rename from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
rename to 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FormatDescriptorMock.java
index 7397d39..44d455b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorFormatDescriptor.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/FormatDescriptorMock.java
@@ -16,18 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors;
+package org.apache.flink.table.utils;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
- * An interface for descriptors that allow to define a format.
+ * Mocking {@link FormatDescriptor} for tests.
  */
-@PublicEvolving
-public interface ConnectorFormatDescriptor<D extends 
ConnectorFormatDescriptor<D>> extends Descriptor {
+public class FormatDescriptorMock extends FormatDescriptor {
+
+       private Map<String, String> formatProperties = new HashMap<>();
+
+       public FormatDescriptorMock(String type, int version) {
+               super(type, version);
+       }
+
+       public FormatDescriptorMock property(String key, String value) {
+               formatProperties.put(key, value);
+               return this;
+       }
 
-       /**
-        * Specifies the format that defines how to read data from a connector.
-        */
-       D withFormat(FormatDescriptor format);
+       @Override
+       protected Map<String, String> toFormatProperties() {
+               return formatProperties;
+       }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
similarity index 52%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
copy to 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
index cd0b0e2..8dee10c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
@@ -16,20 +16,37 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors;
+package org.apache.flink.table.utils;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.List;
 
 /**
- * Descriptor for specifying a table source and/or sink in a batch environment.
+ * Mocking {@link Planner} for tests.
  */
-@PublicEvolving
-public class BatchTableDescriptor extends 
ConnectTableDescriptor<BatchTableDescriptor> {
+public class PlannerMock implements Planner {
+
+       @Override
+       public List<Operation> parse(String statement) {
+               return null;
+       }
+
+       @Override
+       public List<Transformation<?>> translate(List<ModifyOperation> 
modifyOperations) {
+               return null;
+       }
+
+       @Override
+       public String explain(List<Operation> operations, boolean extended) {
+               return null;
+       }
 
-       public BatchTableDescriptor(
-               TableEnvironment tableEnv,
-               ConnectorDescriptor connectorDescriptor) {
-               super(tableEnv, connectorDescriptor);
+       @Override
+       public String[] getCompletionHints(String statement, int position) {
+               return new String[0];
        }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
new file mode 100644
index 0000000..dc8b992
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+
+/**
+ * Mocking {@link TableEnvironment} for tests.
+ */
+public class TableEnvironmentMock extends TableEnvironmentImpl {
+
+       public final CatalogManager catalogManager;
+
+       public final ExecutorMock executor;
+
+       public final FunctionCatalog functionCatalog;
+
+       public final PlannerMock planner;
+
+       protected TableEnvironmentMock(
+                       CatalogManager catalogManager,
+                       TableConfig tableConfig,
+                       ExecutorMock executor,
+                       FunctionCatalog functionCatalog,
+                       PlannerMock planner,
+                       boolean isStreamingMode) {
+               super(catalogManager, tableConfig, executor, functionCatalog, 
planner, isStreamingMode);
+
+               this.catalogManager = catalogManager;
+               this.executor = executor;
+               this.functionCatalog = functionCatalog;
+               this.planner = planner;
+       }
+
+       public static TableEnvironmentMock getStreamingInstance() {
+               return getInstance(true);
+       }
+
+       public static TableEnvironmentMock getBatchInstance() {
+               return getInstance(false);
+       }
+
+       private static TableEnvironmentMock getInstance(boolean 
isStreamingMode) {
+               final CatalogManager catalogManager = createCatalogManager();
+               return new TableEnvironmentMock(
+                       catalogManager,
+                       createTableConfig(),
+                       createExecutor(),
+                       createFunctionCatalog(catalogManager),
+                       createPlanner(),
+                       isStreamingMode);
+       }
+
+       private static CatalogManager createCatalogManager() {
+               return new CatalogManager(
+                       EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
+                       new GenericInMemoryCatalog(
+                               EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
+                               EnvironmentSettings.DEFAULT_BUILTIN_DATABASE));
+       }
+
+       private static TableConfig createTableConfig() {
+               return TableConfig.getDefault();
+       }
+
+       private static ExecutorMock createExecutor() {
+               return new ExecutorMock();
+       }
+
+       private static FunctionCatalog createFunctionCatalog(CatalogManager 
catalogManager) {
+               return new FunctionCatalog(catalogManager);
+       }
+
+       private static PlannerMock createPlanner() {
+               return new PlannerMock();
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java
new file mode 100644
index 0000000..bf93c33
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.StreamTableDescriptorValidator;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Mocking {@link TableSourceFactory} for tests.
+ */
+public class TableSourceFactoryMock implements TableSourceFactory<Row> {
+
+       public static final String CONNECTOR_TYPE_VALUE = 
"table-source-factory-mock";
+
+       @Override
+       public TableSource<Row> createTableSource(Map<String, String> 
properties) {
+               final DescriptorProperties descriptorProperties = new 
DescriptorProperties();
+               descriptorProperties.putProperties(properties);
+               final TableSchema schema = 
descriptorProperties.getTableSchema(Schema.SCHEMA);
+               return new TableSourceMock(schema.toRowDataType(), schema);
+       }
+
+       @Override
+       public TableSource<Row> createTableSource(ObjectPath tablePath, 
CatalogTable table) {
+               return new TableSourceMock(table.getSchema().toRowDataType(), 
table.getSchema());
+       }
+
+       @Override
+       public Map<String, String> requiredContext() {
+               final Map<String, String> context = new HashMap<>();
+               context.put(ConnectorDescriptorValidator.CONNECTOR_TYPE, 
CONNECTOR_TYPE_VALUE);
+               return context;
+       }
+
+       @Override
+       public List<String> supportedProperties() {
+               final List<String> supportedProperties = new ArrayList<>();
+               
supportedProperties.add(StreamTableDescriptorValidator.UPDATE_MODE);
+               
supportedProperties.add(ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION);
+               supportedProperties.add(FormatDescriptorValidator.FORMAT + 
".*");
+               supportedProperties.add(Schema.SCHEMA + ".#." + 
Schema.SCHEMA_NAME);
+               supportedProperties.add(Schema.SCHEMA + ".#." + 
Schema.SCHEMA_TYPE);
+               return supportedProperties;
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceMock.java
similarity index 53%
copy from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
copy to 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceMock.java
index cd0b0e2..25b2d36 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceMock.java
@@ -16,20 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors;
+package org.apache.flink.table.utils;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
 
 /**
- * Descriptor for specifying a table source and/or sink in a batch environment.
+ * Mocking {@link TableSource} for tests.
  */
-@PublicEvolving
-public class BatchTableDescriptor extends 
ConnectTableDescriptor<BatchTableDescriptor> {
+public class TableSourceMock implements TableSource<Row> {
 
-       public BatchTableDescriptor(
-               TableEnvironment tableEnv,
-               ConnectorDescriptor connectorDescriptor) {
-               super(tableEnv, connectorDescriptor);
+       private final DataType producedDataType;
+
+       private final TableSchema tableSchema;
+
+       public TableSourceMock(DataType producedDataType, TableSchema 
tableSchema) {
+               this.producedDataType = producedDataType;
+               this.tableSchema = tableSchema;
+       }
+
+       @Override
+       public DataType getProducedDataType() {
+               return producedDataType;
+       }
+
+       @Override
+       public TableSchema getTableSchema() {
+               return tableSchema;
        }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index e2a6a48..1e92d60 100644
--- 
a/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -15,3 +15,4 @@
 
 org.apache.flink.table.factories.utils.TestPlannerFactory
 org.apache.flink.table.factories.utils.OtherTestPlannerFactory
+org.apache.flink.table.utils.TableSourceFactoryMock
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ExternalCatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ExternalCatalogTable.java
index be24af4..41f8e3f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ExternalCatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ExternalCatalogTable.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.table.descriptors.Descriptor;
 import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.TableDescriptor;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.table.plan.stats.ColumnStats;
 import org.apache.flink.table.plan.stats.TableStats;
@@ -42,7 +42,7 @@ import static 
org.apache.flink.table.descriptors.StatisticsValidator.readColumnS
  * @deprecated use {@link CatalogTable} instead.
  */
 @Deprecated
-public class ExternalCatalogTable extends TableDescriptor {
+public class ExternalCatalogTable implements Descriptor {
 
        /**
         * Flag whether this external table is intended for batch environments.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/RegistrableDescriptor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/RegistrableDescriptor.java
deleted file mode 100644
index f0b8242..0000000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/RegistrableDescriptor.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * An interface for descriptors that allow to register table source and/or 
sinks.
- */
-@PublicEvolving
-public interface RegistrableDescriptor extends Descriptor {
-
-       /**
-        * Searches for the specified table source, configures it accordingly, 
and registers it as
-        * a table under the given name.
-        *
-        * @param name table name to be registered in the table environment
-        */
-       void registerTableSource(String name);
-
-       /**
-        * Searches for the specified table sink, configures it accordingly, 
and registers it as
-        * a table under the given name.
-        *
-        * @param name table name to be registered in the table environment
-        */
-       void registerTableSink(String name);
-
-       /**
-        * Searches for the specified table source and sink, configures them 
accordingly, and registers
-        * them as a table under the given name.
-        *
-        * @param name table name to be registered in the table environment
-        */
-       void registerTableSourceAndSink(String name);
-}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
index 121e49c..6ae395c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/SchematicDescriptor.java
@@ -19,19 +19,14 @@
 package org.apache.flink.table.descriptors;
 
 /**
- * An interface for descriptors that allow to define a format and schema.
+ * An interface for descriptors that allow to define a schema.
  *
- * @deprecated use {@link ConnectorFormatDescriptor}.
+ * @deprecated This class can be dropped once we remove external catalog 
support.
  */
 @Deprecated
 public interface SchematicDescriptor<D extends SchematicDescriptor<D>> extends 
Descriptor {
 
        /**
-        * Specifies the format that defines how to read data from a connector.
-        */
-       D withFormat(FormatDescriptor format);
-
-       /**
         * Specifies the resulting table schema.
         */
        D withSchema(Schema schema);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java
deleted file mode 100644
index 0430e8c..0000000
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * A trait for descriptors that allow to convert between a dynamic table and 
an external connector.
- */
-@PublicEvolving
-public interface StreamableDescriptor<D extends StreamableDescriptor<D>> 
extends Descriptor {
-
-       /**
-        * Declares how to perform the conversion between a dynamic table and 
an external connector.
-        *
-        * <p>In append mode, a dynamic table and an external connector only 
exchange INSERT messages.
-        *
-        * @see #inRetractMode()
-        * @see #inUpsertMode()
-        */
-       D inAppendMode();
-
-       /**
-        * Declares how to perform the conversion between a dynamic table and 
an external connector.
-        *
-        * <p>In retract mode, a dynamic table and an external connector 
exchange ADD and RETRACT messages.
-        *
-        * <p>An INSERT change is encoded as an ADD message, a DELETE change as 
a RETRACT message, and an
-        * UPDATE change as a RETRACT message for the updated (previous) row 
and an ADD message for
-        * the updating (new) row.
-        *
-        * <p>In this mode, a key must not be defined as opposed to upsert 
mode. However, every update
-        * consists of two messages which is less efficient.
-        *
-        * @see #inAppendMode()
-        * @see #inUpsertMode()
-        */
-       D inRetractMode();
-
-       /**
-        * Declares how to perform the conversion between a dynamic table and 
an external connector.
-        *
-        * <p>In upsert mode, a dynamic table and an external connector 
exchange UPSERT and DELETE messages.
-        *
-        * <p>This mode requires a (possibly composite) unique key by which 
updates can be propagated. The
-        * external connector needs to be aware of the unique key attribute in 
order to apply messages
-        * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. 
DELETE changes as
-        * DELETE messages.
-        *
-        * <p>The main difference to a retract stream is that UPDATE changes 
are encoded with a single
-        * message and are therefore more efficient.
-        *
-        * @see #inAppendMode()
-        * @see #inRetractMode()
-        */
-       D inUpsertMode();
-}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/TableDescriptor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/TableDescriptor.java
index 1c12218..0656d33 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/TableDescriptor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/TableDescriptor.java
@@ -19,11 +19,144 @@
 package org.apache.flink.table.descriptors;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT;
+import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT;
 
 /**
-  * Common class for all descriptors describing table sources and sinks.
-  */
+ * Describes a table consisting of a connector (in a given update mode) and a 
format.
+ *
+ * @param <D> return type for builder pattern
+ */
 @PublicEvolving
-public abstract class TableDescriptor extends DescriptorBase {
+public abstract class TableDescriptor<D extends TableDescriptor<D>> extends 
DescriptorBase {
+
+       private final ConnectorDescriptor connectorDescriptor;
+
+       private @Nullable FormatDescriptor formatDescriptor;
+
+       private @Nullable String updateMode;
+
+       protected TableDescriptor(ConnectorDescriptor connectorDescriptor) {
+               this.connectorDescriptor = 
Preconditions.checkNotNull(connectorDescriptor, "Connector must not be null.");
+       }
+
+       /**
+        * Specifies the format that defines how to read data from a connector.
+        */
+       @SuppressWarnings("unchecked")
+       public D withFormat(FormatDescriptor format) {
+               formatDescriptor = Preconditions.checkNotNull(format, "Format 
must not be null.");
+               return (D) this;
+       }
+
+       /**
+        * Declares how to perform the conversion between a dynamic table and 
an external connector.
+        *
+        * <p>In append mode, a dynamic table and an external connector only 
exchange INSERT messages.
+        *
+        * @see #inRetractMode()
+        * @see #inUpsertMode()
+        */
+       @SuppressWarnings("unchecked")
+       public D inAppendMode() {
+               updateMode = UPDATE_MODE_VALUE_APPEND;
+               return (D) this;
+       }
+
+       /**
+        * Declares how to perform the conversion between a dynamic table and 
an external connector.
+        *
+        * <p>In retract mode, a dynamic table and an external connector 
exchange ADD and RETRACT messages.
+        *
+        * <p>An INSERT change is encoded as an ADD message, a DELETE change as 
a RETRACT message, and an
+        * UPDATE change as a RETRACT message for the updated (previous) row 
and an ADD message for
+        * the updating (new) row.
+        *
+        * <p>In this mode, a key must not be defined as opposed to upsert 
mode. However, every update
+        * consists of two messages which is less efficient.
+        *
+        * @see #inAppendMode()
+        * @see #inUpsertMode()
+        */
+       @SuppressWarnings("unchecked")
+       public D inRetractMode() {
+               updateMode = UPDATE_MODE_VALUE_RETRACT;
+               return (D) this;
+       }
+
+       /**
+        * Declares how to perform the conversion between a dynamic table and 
an external connector.
+        *
+        * <p>In upsert mode, a dynamic table and an external connector 
exchange UPSERT and DELETE messages.
+        *
+        * <p>This mode requires a (possibly composite) unique key by which 
updates can be propagated. The
+        * external connector needs to be aware of the unique key attribute in 
order to apply messages
+        * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. 
DELETE changes as
+        * DELETE messages.
+        *
+        * <p>The main difference to a retract stream is that UPDATE changes 
are encoded with a single
+        * message and are therefore more efficient.
+        *
+        * @see #inAppendMode()
+        * @see #inRetractMode()
+        */
+       @SuppressWarnings("unchecked")
+       public D inUpsertMode() {
+               updateMode = UPDATE_MODE_VALUE_UPSERT;
+               return (D) this;
+       }
+
+       /**
+        * Converts this descriptor into a set of properties.
+        */
+       @Override
+       public final Map<String, String> toProperties() {
+               final DescriptorProperties properties = new 
DescriptorProperties();
+
+               // this performs only basic validation
+               // more validation can only happen within a factory
+               if (connectorDescriptor.isFormatNeeded() && formatDescriptor == 
null) {
+                       throw new ValidationException(
+                               String.format(
+                                       "The connector %s requires a format 
description.",
+                                       
connectorDescriptor.getClass().getName()));
+               } else if (!connectorDescriptor.isFormatNeeded() && 
formatDescriptor != null) {
+                       throw new ValidationException(
+                               String.format(
+                                       "The connector %s does not require a 
format description but %s found.",
+                                       
connectorDescriptor.getClass().getName(),
+                                       formatDescriptor.getClass().getName()));
+               }
+
+               properties.putProperties(connectorDescriptor.toProperties());
+
+               if (formatDescriptor != null) {
+                       
properties.putProperties(formatDescriptor.toProperties());
+               }
+
+               if (updateMode != null) {
+                       properties.putString(UPDATE_MODE, updateMode);
+               }
+
+               properties.putProperties(additionalProperties());
+
+               return properties.asMap();
+       }
 
+       /**
+        * Enables adding more specific properties to {@link #toProperties()}.
+        */
+       protected Map<String, String> additionalProperties() {
+               return Collections.emptyMap();
+       }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
deleted file mode 100644
index a046a06..0000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.factories.TableFactoryUtil;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * Common class for table's created with {@link 
TableEnvironment#connect(ConnectorDescriptor connectorDescriptor)}.
- */
-@PublicEvolving
-public abstract class ConnectTableDescriptor<D extends 
ConnectTableDescriptor<D>>
-       extends TableDescriptor
-       implements SchematicDescriptor<D>, RegistrableDescriptor {
-
-       private Optional<FormatDescriptor> formatDescriptor = Optional.empty();
-       private Optional<Schema> schemaDescriptor = Optional.empty();
-
-       private final TableEnvironment tableEnv;
-       private final ConnectorDescriptor connectorDescriptor;
-
-       public ConnectTableDescriptor(TableEnvironment tableEnv, 
ConnectorDescriptor connectorDescriptor) {
-               this.tableEnv = tableEnv;
-               this.connectorDescriptor = connectorDescriptor;
-       }
-
-       /**
-        * Searches for the specified table source, configures it accordingly, 
and registers it as
-        * a table under the given name.
-        *
-        * @param name table name to be registered in the table environment
-        */
-       @Override
-       public void registerTableSource(String name) {
-               Preconditions.checkNotNull(name);
-               TableSource<?> tableSource = 
TableFactoryUtil.findAndCreateTableSource(this);
-               tableEnv.registerTableSource(name, tableSource);
-       }
-
-       /**
-        * Searches for the specified table sink, configures it accordingly, 
and registers it as
-        * a table under the given name.
-        *
-        * @param name table name to be registered in the table environment
-        */
-       @Override
-       public void registerTableSink(String name) {
-               Preconditions.checkNotNull(name);
-               TableSink<?> tableSink = 
TableFactoryUtil.findAndCreateTableSink(this);
-               tableEnv.registerTableSink(name, tableSink);
-       }
-
-       /**
-        * Searches for the specified table source and sink, configures them 
accordingly, and registers
-        * them as a table under the given name.
-        *
-        * @param name table name to be registered in the table environment
-        */
-       @Override
-       public void registerTableSourceAndSink(String name) {
-               registerTableSource(name);
-               registerTableSink(name);
-       }
-
-       /**
-        * Specifies the format that defines how to read data from a connector.
-        */
-       @Override
-       public D withFormat(FormatDescriptor format) {
-               formatDescriptor = 
Optional.of(Preconditions.checkNotNull(format));
-               return (D) this;
-       }
-
-       /**
-        * Specifies the resulting table schema.
-        */
-       @Override
-       public D withSchema(Schema schema) {
-               schemaDescriptor = 
Optional.of(Preconditions.checkNotNull(schema));
-               return (D) this;
-       }
-
-       /**
-        * Converts this descriptor into a set of properties.
-        */
-       @Override
-       public Map<String, String> toProperties() {
-               DescriptorProperties properties = new DescriptorProperties();
-
-               // this performs only basic validation
-               // more validation can only happen within a factory
-               if (connectorDescriptor.isFormatNeeded() && 
!formatDescriptor.isPresent()) {
-                       throw new ValidationException(String.format("The 
connector %s requires a format description.", connectorDescriptor.toString()));
-               } else if (!connectorDescriptor.isFormatNeeded() && 
formatDescriptor.isPresent()) {
-                       throw new ValidationException(
-                               String.format("The connector %s does not 
require a format description " +
-                               "but %s found.", 
connectorDescriptor.toString(), formatDescriptor.get().toString()));
-               }
-
-               properties.putProperties(connectorDescriptor.toProperties());
-
-               formatDescriptor.ifPresent(s -> 
properties.putProperties(s.toProperties()));
-               schemaDescriptor.ifPresent(s -> 
properties.putProperties(s.toProperties()));
-
-               return properties.asMap();
-       }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java
deleted file mode 100644
index 456bbff..0000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.TableEnvironment;
-
-import java.util.Map;
-import java.util.Optional;
-
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT;
-import static 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT;
-
-/**
- * Descriptor for specifying a table source and/or sink in a streaming 
environment.
- */
-@PublicEvolving
-public class StreamTableDescriptor
-       extends ConnectTableDescriptor<StreamTableDescriptor>
-       implements StreamableDescriptor<StreamTableDescriptor> {
-
-       private Optional<String> updateMode = Optional.empty();
-
-       public StreamTableDescriptor(TableEnvironment tableEnv, 
ConnectorDescriptor connectorDescriptor) {
-               super(tableEnv, connectorDescriptor);
-       }
-
-       /**
-        * Declares how to perform the conversion between a dynamic table and 
an external connector.
-        *
-        * <p>In append mode, a dynamic table and an external connector only 
exchange INSERT messages.
-        *
-        * @see #inRetractMode()
-        * @see #inUpsertMode()
-        */
-       @Override
-       public StreamTableDescriptor inAppendMode() {
-               updateMode = Optional.of(UPDATE_MODE_VALUE_APPEND);
-               return this;
-       }
-
-       /**
-        * Declares how to perform the conversion between a dynamic table and 
an external connector.
-        *
-        * <p>In retract mode, a dynamic table and an external connector 
exchange ADD and RETRACT messages.
-        *
-        * <p>An INSERT change is encoded as an ADD message, a DELETE change as 
a RETRACT message, and an
-        * UPDATE change as a RETRACT message for the updated (previous) row 
and an ADD message for
-        * the updating (new) row.
-        *
-        * <p>In this mode, a key must not be defined as opposed to upsert 
mode. However, every update
-        * consists of two messages which is less efficient.
-        *
-        * @see #inAppendMode()
-        * @see #inUpsertMode()
-        */
-       @Override
-       public StreamTableDescriptor inRetractMode() {
-               updateMode = Optional.of(UPDATE_MODE_VALUE_RETRACT);
-               return this;
-       }
-
-       /**
-        * Declares how to perform the conversion between a dynamic table and 
an external connector.
-        *
-        * <p>In upsert mode, a dynamic table and an external connector 
exchange UPSERT and DELETE messages.
-        *
-        * <p>This mode requires a (possibly composite) unique key by which 
updates can be propagated. The
-        * external connector needs to be aware of the unique key attribute in 
order to apply messages
-        * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. 
DELETE changes as
-        * DELETE messages.
-        *
-        * <p>The main difference to a retract stream is that UPDATE changes 
are encoded with a single
-        * message and are therefore more efficient.
-        *
-        * @see #inAppendMode()
-        * @see #inRetractMode()
-        */
-       @Override
-       public StreamTableDescriptor inUpsertMode() {
-               updateMode = Optional.of(UPDATE_MODE_VALUE_UPSERT);
-               return this;
-       }
-
-       @Override
-       public Map<String, String> toProperties() {
-               DescriptorProperties properties = new DescriptorProperties();
-
-               properties.putProperties(super.toProperties());
-               updateMode.ifPresent(mode -> properties.putString(UPDATE_MODE, 
mode));
-
-               return properties.asMap();
-       }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
index 52bfa06..f1ccc11 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTableBuilder.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.catalog
 
 import java.util
 
-import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
 import org.apache.flink.table.descriptors._
 
 /**
@@ -57,27 +56,16 @@ import org.apache.flink.table.descriptors._
   */
 @Deprecated
 @deprecated
-class ExternalCatalogTableBuilder(private val connectorDescriptor: 
ConnectorDescriptor)
-  extends TableDescriptor
-  with SchematicDescriptor[ExternalCatalogTableBuilder]
-  with StreamableDescriptor[ExternalCatalogTableBuilder] {
+class ExternalCatalogTableBuilder(connectorDescriptor: ConnectorDescriptor)
+  extends TableDescriptor[ExternalCatalogTableBuilder](connectorDescriptor)
+  with SchematicDescriptor[ExternalCatalogTableBuilder] {
 
   private var isBatch: Boolean = true
   private var isStreaming: Boolean = true
 
-  private var formatDescriptor: Option[FormatDescriptor] = None
   private var schemaDescriptor: Option[Schema] = None
   private var statisticsDescriptor: Option[Statistics] = None
   private var metadataDescriptor: Option[Metadata] = None
-  private var updateMode: Option[String] = None
-
-  /**
-    * Specifies the format that defines how to read data from a connector.
-    */
-  override def withFormat(format: FormatDescriptor): 
ExternalCatalogTableBuilder = {
-    formatDescriptor = Some(format)
-    this
-  }
 
   /**
     * Specifies the resulting table schema.
@@ -88,57 +76,6 @@ class ExternalCatalogTableBuilder(private val 
connectorDescriptor: ConnectorDesc
   }
 
   /**
-    * Declares how to perform the conversion between a dynamic table and an 
external connector.
-    *
-    * In append mode, a dynamic table and an external connector only exchange 
INSERT messages.
-    *
-    * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
-    */
-  override def inAppendMode(): ExternalCatalogTableBuilder = {
-    updateMode = Some(UPDATE_MODE_VALUE_APPEND)
-    this
-  }
-
-  /**
-    * Declares how to perform the conversion between a dynamic table and an 
external connector.
-    *
-    * In retract mode, a dynamic table and an external connector exchange ADD 
and RETRACT messages.
-    *
-    * An INSERT change is encoded as an ADD message, a DELETE change as a 
RETRACT message, and an
-    * UPDATE change as a RETRACT message for the updated (previous) row and an 
ADD message for
-    * the updating (new) row.
-    *
-    * In this mode, a key must not be defined as opposed to upsert mode. 
However, every update
-    * consists of two messages which is less efficient.
-    *
-    * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
-    */
-  override def inRetractMode(): ExternalCatalogTableBuilder = {
-    updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
-    this
-  }
-
-  /**
-    * Declares how to perform the conversion between a dynamic table and an 
external connector.
-    *
-    * In upsert mode, a dynamic table and an external connector exchange 
UPSERT and DELETE messages.
-    *
-    * This mode requires a (possibly composite) unique key by which updates 
can be propagated. The
-    * external connector needs to be aware of the unique key attribute in 
order to apply messages
-    * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. 
DELETE changes as
-    * DELETE messages.
-    *
-    * The main difference to a retract stream is that UPDATE changes are 
encoded with a single
-    * message and are therefore more efficient.
-    *
-    * @see See also [[inAppendMode()]] and [[inRetractMode()]].
-    */
-  override def inUpsertMode(): ExternalCatalogTableBuilder = {
-    updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
-    this
-  }
-
-  /**
     * Specifies the statistics for this external table.
     */
   def withStatistics(statistics: Statistics): ExternalCatalogTableBuilder = {
@@ -228,17 +165,11 @@ class ExternalCatalogTableBuilder(private val 
connectorDescriptor: ConnectorDesc
 
   // 
----------------------------------------------------------------------------------------------
 
-  /**
-    * Converts this descriptor into a set of properties.
-    */
-  override def toProperties: util.Map[String, String] = {
+  override protected def additionalProperties(): util.Map[String, String] = {
     val properties = new DescriptorProperties()
-    properties.putProperties(connectorDescriptor.toProperties)
-    formatDescriptor.foreach(d => properties.putProperties(d.toProperties))
     schemaDescriptor.foreach(d => properties.putProperties(d.toProperties))
     statisticsDescriptor.foreach(d => properties.putProperties(d.toProperties))
     metadataDescriptor.foreach(d => properties.putProperties(d.toProperties))
-    updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
     properties.asMap()
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
index eaeb582..806106e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
@@ -68,7 +68,7 @@ class TableDescriptorTest extends TableTestBase {
       .field("myfield6", 
Types.OBJECT_ARRAY(TypeExtractor.createTypeInfo(classOf[Person])))
       .fieldDelimiter("#")
 
-    val descriptor: RegistrableDescriptor = if (isStreaming) {
+    val descriptor = if (isStreaming) {
       streamTestUtil().tableEnv
         .connect(connector)
         .withFormat(format)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TestTableDescriptor.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TestTableDescriptor.scala
index cb84953..d1f48eb 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TestTableDescriptor.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TestTableDescriptor.scala
@@ -19,51 +19,23 @@
 package org.apache.flink.table.descriptors
 
 import java.util
-
-import 
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.{UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND, UPDATE_MODE_VALUE_RETRACT, UPDATE_MODE_VALUE_UPSERT}
+import java.util.Collections
 
 class TestTableDescriptor(connector: ConnectorDescriptor)
-  extends TableDescriptor
-  with SchematicDescriptor[TestTableDescriptor]
-  with StreamableDescriptor[TestTableDescriptor] {
+  extends TableDescriptor[TestTableDescriptor](connector)
+  with SchematicDescriptor[TestTableDescriptor] {
 
-  private var formatDescriptor: Option[FormatDescriptor] = None
   private var schemaDescriptor: Option[Schema] = None
-  private var updateMode: Option[String] = None
-
-  override def toProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-
-    properties.putProperties(connector.toProperties)
-    formatDescriptor.foreach(d => properties.putProperties(d.toProperties))
-    schemaDescriptor.foreach(d => properties.putProperties(d.toProperties))
-    updateMode.foreach(mode => properties.putString(UPDATE_MODE, mode))
-
-    properties.asMap()
-  }
-
-  override def withFormat(format: FormatDescriptor): TestTableDescriptor = {
-    this.formatDescriptor = Some(format)
-    this
-  }
 
   override def withSchema(schema: Schema): TestTableDescriptor = {
     this.schemaDescriptor = Some(schema)
     this
   }
 
-  override def inAppendMode(): TestTableDescriptor = {
-    updateMode = Some(UPDATE_MODE_VALUE_APPEND)
-    this
-  }
-
-  override def inRetractMode(): TestTableDescriptor = {
-    updateMode = Some(UPDATE_MODE_VALUE_RETRACT)
-    this
-  }
-
-  override def inUpsertMode(): TestTableDescriptor = {
-    updateMode = Some(UPDATE_MODE_VALUE_UPSERT)
-    this
+  override protected def additionalProperties(): util.Map[String, String] = {
+    schemaDescriptor match {
+      case Some(d) => d.toProperties
+      case None => Collections.emptyMap()
+    }
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index cb4567f..cb16a7e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -22,9 +22,9 @@ import java.util.Optional
 
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, 
TableEnvironment}
+import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.catalog.{Catalog, ExternalCatalog}
-import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
TableDescriptor}
+import org.apache.flink.table.descriptors.{ConnectTableDescriptor, 
ConnectorDescriptor}
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
@@ -52,7 +52,7 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def scan(tablePath: String*): Table = ???
 
-  override def connect(connectorDescriptor: ConnectorDescriptor): 
TableDescriptor = ???
+  override def connect(connectorDescriptor: ConnectorDescriptor): 
ConnectTableDescriptor = ???
 
   override def listCatalogs(): Array[String] = ???
 

Reply via email to