This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b61ae9eeba8fa163dbf02426fbffc685fa7ed000
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Mon Dec 23 16:08:07 2019 +0800

    [FLINK-15193][table][docs] Move DDL to the first tab in table connector page
---
 docs/dev/table/connect.md    | 750 ++++++++++++++++++++--------------------
 docs/dev/table/connect.zh.md | 806 +++++++++++++++++++++----------------------
 2 files changed, 778 insertions(+), 778 deletions(-)

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 40861f3..f4c8e39 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -92,6 +92,21 @@ The **table schema** defines the schema of a table that is 
exposed to SQL querie
 The subsequent sections will cover each definition part 
([connector](connect.html#table-connectors), 
[format](connect.html#table-formats), and [schema](connect.html#table-schema)) 
in more detail. The following example shows how to pass them:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+tableEnvironment.sqlUpdate(
+    "CREATE TABLE MyTable (\n" +
+    "  ...    -- declare table schema \n" +
+    ") WITH (\n" +
+    "  'connector.type' = '...',  -- declare connector specific properties\n" +
+    "  ...\n" +
+    "  'update-mode' = 'append',  -- declare update mode\n" +
+    "  'format.type' = '...',     -- declare format specific properties\n" +
+    "  ...\n" +
+    ")");
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 tableEnvironment
@@ -124,21 +139,6 @@ format: ...
 schema: ...
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-tableEnvironment.sqlUpdate(
-    "CREATE TABLE MyTable (\n" +
-    "  ...    -- declare table schema \n" +
-    ") WITH (\n" +
-    "  'connector.type' = '...',  -- declare connector specific properties\n" +
-    "  ...\n" +
-    "  'update-mode' = 'append',  -- declare update mode\n" +
-    "  'format.type' = '...',     -- declare format specific properties\n" +
-    "  ...\n" +
-    ")");
-{% endhighlight %}
-</div>
 </div>
 
 The table's type (`source`, `sink`, or `both`) determines how a table is 
registered. In case of table type `both`, both a table source and table sink 
are registered under the same name. Logically, this means that we can both read 
and write to such a table similarly to a table in a regular DBMS.
@@ -148,6 +148,41 @@ For streaming queries, an [update 
mode](connect.html#update-mode) declares how t
 The following code shows a full example of how to connect to Kafka for reading 
Avro records.
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  -- declare the schema of the table
+  `user` BIGINT,
+  message STRING,
+  ts STRING
+) WITH (
+  -- declare the external system to connect to
+  'connector.type' = 'kafka',
+  'connector.version' = '0.10',
+  'connector.topic' = 'topic_name',
+  'connector.startup-mode' = 'earliest-offset',
+  'connector.properties.zookeeper.connect' = 'localhost:2181',
+  'connector.properties.bootstrap.servers' = 'localhost:9092',
+
+  -- specify the update-mode for streaming tables
+  'update-mode' = 'append',
+
+  -- declare a format for this system
+  'format.type' = 'avro',
+  'format.avro-schema' = '{
+                            "namespace": "org.myorganization",
+                            "type": "record",
+                            "name": "UserMessage",
+                            "fields": [
+                                {"name": "ts", "type": "string"},
+                                {"name": "user", "type": "long"},
+                                {"name": "message", "type": ["string", "null"]}
+                            ]
+                         }'
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 tableEnvironment
@@ -285,41 +320,6 @@ tables:
         data-type: STRING
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  -- declare the schema of the table
-  `user` BIGINT,
-  message STRING,
-  ts STRING
-) WITH (
-  -- declare the external system to connect to
-  'connector.type' = 'kafka',
-  'connector.version' = '0.10',
-  'connector.topic' = 'topic_name',
-  'connector.startup-mode' = 'earliest-offset',
-  'connector.properties.zookeeper.connect' = 'localhost:2181',
-  'connector.properties.bootstrap.servers' = 'localhost:9092',
-
-  -- specify the update-mode for streaming tables
-  'update-mode' = 'append',
-
-  -- declare a format for this system
-  'format.type' = 'avro',
-  'format.avro-schema' = '{
-                            "namespace": "org.myorganization",
-                            "type": "record",
-                            "name": "UserMessage",
-                            "fields": [
-                                {"name": "ts", "type": "string"},
-                                {"name": "user", "type": "long"},
-                                {"name": "message", "type": ["string", "null"]}
-                            ]
-                         }'
-)
-{% endhighlight %}
-</div>
 </div>
 
 In both ways the desired connection properties are converted into normalized, 
string-based key-value pairs. So-called [table 
factories](sourceSinks.html#define-a-tablefactory) create configured table 
sources, table sinks, and corresponding formats from the key-value pairs. All 
table factories that can be found via Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken 
into account when searching for exactly-one matching table factory.
@@ -600,6 +600,16 @@ For streaming queries, it is required to declare how to 
perform the [conversion
 <span class="label label-danger">Attention</span> The documentation of each 
connector states which update modes are supported.
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+ ...
+) WITH (
+ 'update-mode' = 'append'  -- otherwise: 'retract' or 'upsert'
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .connect(...)
@@ -621,16 +631,6 @@ tables:
     update-mode: append    # otherwise: "retract" or "upsert"
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyTable (
- ...
-) WITH (
- 'update-mode' = 'append'  -- otherwise: 'retract' or 'upsert'
-)
-{% endhighlight %}
-</div>
 </div>
 
 See also the [general streaming concepts 
documentation](streaming/dynamic_tables.html#continuous-queries) for more 
information.
@@ -655,6 +655,20 @@ Please note that not all connectors are available in both 
batch and streaming ye
 The file system connector allows for reading and writing from a local or 
distributed filesystem. A filesystem can be defined as:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'filesystem',                -- required: specify to 
connector type
+  'connector.path' = 'file:///path/to/whatever',  -- required: path to a file 
or directory
+  'format.type' = '...',                          -- required: file system 
connector requires to specify a format,
+  ...                                             -- currently only 'csv' 
format is supported.
+                                                  -- Please refer to Table 
Formats section for more details.
+)                                               
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .connect(
@@ -689,20 +703,6 @@ format:                               # required: file 
system connector requires
                                       # Please refer to Table Formats section 
for more details.
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'connector.type' = 'filesystem',                -- required: specify to 
connector type
-  'connector.path' = 'file:///path/to/whatever',  -- required: path to a file 
or directory
-  'format.type' = '...',                          -- required: file system 
connector requires to specify a format,
-  ...                                             -- currently only 'csv' 
format is supported.
-                                                  -- Please refer to Table 
Formats section for more details.
-)                                               
-{% endhighlight %}
-</div>
 </div>
 
 The file system connector itself is included in Flink and does not require an 
additional dependency. A corresponding format needs to be specified for reading 
and writing rows from and to a file system.
@@ -720,6 +720,44 @@ The file system connector itself is included in Flink and 
does not require an ad
 The Kafka connector allows for reading and writing from and to an Apache Kafka 
topic. It can be defined as follows:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'kafka',       
+
+  'connector.version' = '0.11',     -- required: valid connector versions are
+                                    -- "0.8", "0.9", "0.10", "0.11", and 
"universal"
+
+  'connector.topic' = 'topic_name', -- required: topic name from which the 
table is read
+
+  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: 
specify the ZooKeeper connection string
+  'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: 
specify the Kafka server connection string
+  'connector.properties.group.id' = 'testGroup', --optional: required in Kafka 
consumer, specify consumer group
+  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes 
are "earliest-offset", 
+                                                   -- "latest-offset", 
"group-offsets", 
+                                                   -- or "specific-offsets"
+
+  -- optional: used in case of startup mode with specific offsets
+  'connector.specific-offsets' = 
'partition:0,offset:42;partition:1,offset:300',
+
+  'connector.sink-partitioner' = '...',  -- optional: output partitioning from 
Flink's partitions 
+                                         -- into Kafka's partitions valid are 
"fixed" 
+                                         -- (each Flink partition ends up in 
at most one Kafka partition),
+                                         -- "round-robin" (a Flink partition 
is distributed to 
+                                         -- Kafka partitions round-robin)
+                                         -- "custom" (use a custom 
FlinkKafkaPartitioner subclass)
+  -- optional: used in case of sink partitioner custom
+  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
+  
+  'format.type' = '...',                 -- required: Kafka connector requires 
to specify a format,
+  ...                                    -- the supported formats are 'csv', 
'json' and 'avro'.
+                                         -- Please refer to Table Formats 
section for more details.
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .connect(
@@ -806,44 +844,6 @@ connector:
                            # Please refer to Table Formats section for more 
details.
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'connector.type' = 'kafka',       
-
-  'connector.version' = '0.11',     -- required: valid connector versions are
-                                    -- "0.8", "0.9", "0.10", "0.11", and 
"universal"
-
-  'connector.topic' = 'topic_name', -- required: topic name from which the 
table is read
-
-  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: 
specify the ZooKeeper connection string
-  'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: 
specify the Kafka server connection string
-  'connector.properties.group.id' = 'testGroup', --optional: required in Kafka 
consumer, specify consumer group
-  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes 
are "earliest-offset", 
-                                                   -- "latest-offset", 
"group-offsets", 
-                                                   -- or "specific-offsets"
-
-  -- optional: used in case of startup mode with specific offsets
-  'connector.specific-offsets' = 
'partition:0,offset:42;partition:1,offset:300',
-
-  'connector.sink-partitioner' = '...',  -- optional: output partitioning from 
Flink's partitions 
-                                         -- into Kafka's partitions valid are 
"fixed" 
-                                         -- (each Flink partition ends up in 
at most one Kafka partition),
-                                         -- "round-robin" (a Flink partition 
is distributed to 
-                                         -- Kafka partitions round-robin)
-                                         -- "custom" (use a custom 
FlinkKafkaPartitioner subclass)
-  -- optional: used in case of sink partitioner custom
-  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
-  
-  'format.type' = '...',                 -- required: Kafka connector requires 
to specify a format,
-  ...                                    -- the supported formats are 'csv', 
'json' and 'avro'.
-                                         -- Please refer to Table Formats 
section for more details.
-)
-{% endhighlight %}
-</div>
 </div>
 
 **Specify the start reading position:** By default, the Kafka source will 
start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions, which correspond to the 
configurations in section [Kafka Consumers Start Position Configuration]({{ 
site.baseurl 
}}/dev/connectors/kafka.html#kafka-consumers-start-position-configuration).
@@ -875,29 +875,91 @@ For append-only queries, the connector can also operate 
in [append mode](#update
 The connector can be defined as follows:
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java/Scala" markdown="1">
-{% highlight java %}
-.connect(
-  new Elasticsearch()
-    .version("6")                      // required: valid connector versions 
are "6"
-    .host("localhost", 9200, "http")   // required: one or more Elasticsearch 
hosts to connect to
-    .index("MyUsers")                  // required: Elasticsearch index
-    .documentType("user")              // required: Elasticsearch document type
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'elasticsearch', -- required: specify this table type is 
elasticsearch
+  
+  'connector.version' = '6',          -- required: valid connector versions 
are "6"
+  
+  'connector.hosts' = 'http://host_name:9092;http://host_name:9093',  -- 
required: one or more Elasticsearch hosts to connect to
 
-    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" 
by default)
-                              //   e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
-    .keyNullLiteral("n/a")    // optional: representation for null fields in 
keys ("null" by default)
+  'connector.index' = 'MyUsers',       -- required: Elasticsearch index
 
-    // optional: failure handling strategy in case a request to Elasticsearch 
fails (fail by default)
-    .failureHandlerFail()          // optional: throws an exception if a 
request fails and causes a job failure
-    .failureHandlerIgnore()        //   or ignores failures and drops the 
request
-    .failureHandlerRetryRejected() //   or re-adds requests that have failed 
due to queue capacity saturation
-    .failureHandlerCustom(...)     //   or custom failure handling with a 
ActionRequestFailureHandler subclass
+  'connector.document-type' = 'user',  -- required: Elasticsearch document type
 
-    // optional: configure how to buffer elements before sending them in bulk 
to the cluster for efficiency
-    .disableFlushOnCheckpoint()    // optional: disables flushing on 
checkpoint (see notes below!)
-    .bulkFlushMaxActions(42)       // optional: maximum number of actions to 
buffer for each bulk request
-    .bulkFlushMaxSize("42 mb")     // optional: maximum size of buffered 
actions in bytes per bulk request
+  'update-mode' = 'append',            -- optional: update mode when used as 
table sink.           
+
+  'connector.key-delimiter' = '$',     -- optional: delimiter for composite 
keys ("_" by default)
+                                       -- e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
+
+  'connector.key-null-literal' = 'n/a',  -- optional: representation for null 
fields in keys ("null" by default)
+
+  'connector.failure-handler' = '...',   -- optional: failure handling 
strategy in case a request to 
+                                         -- Elasticsearch fails ("fail" by 
default).
+                                         -- valid strategies are 
+                                         -- "fail" (throws an exception if a 
request fails and
+                                         -- thus causes a job failure), 
+                                         -- "ignore" (ignores failures and 
drops the request),
+                                         -- "retry-rejected" (re-adds requests 
that have failed due 
+                                         -- to queue capacity saturation), 
+                                         -- or "custom" for failure handling 
with a
+                                         -- ActionRequestFailureHandler 
subclass
+
+  -- optional: configure how to buffer elements before sending them in bulk to 
the cluster for efficiency
+  'connector.flush-on-checkpoint' = 'true',   -- optional: disables flushing 
on checkpoint (see notes below!)
+                                              -- ("true" by default)
+  'connector.bulk-flush.max-actions' = '42',  -- optional: maximum number of 
actions to buffer 
+                                              -- for each bulk request
+  'connector.bulk-flush.max-size' = '42 mb',  -- optional: maximum size of 
buffered actions in bytes
+                                              -- per bulk request
+                                              -- (only MB granularity is 
supported)
+  'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval 
(in milliseconds)
+  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff 
strategy ("disabled" by default)
+                                                      -- valid strategies are 
"disabled", "constant",
+                                                      -- or "exponential"
+  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum 
number of retries
+  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay 
between each backoff attempt
+                                                      -- (in milliseconds)
+
+  -- optional: connection properties to be used during REST communication to 
Elasticsearch
+  'connector.connection-max-retry-timeout' = '3',     -- optional: maximum 
timeout (in milliseconds)
+                                                      -- between retries
+  'connector.connection-path-prefix' = '/v1'          -- optional: prefix 
string to be added to every
+                                                      -- REST communication
+                                                      
+  'format.type' = '...',   -- required: Elasticsearch connector requires to 
specify a format,
+  ...                      -- currently only 'json' format is supported.
+                           -- Please refer to Table Formats section for more 
details.
+)
+{% endhighlight %}
+</div>
+
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+.connect(
+  new Elasticsearch()
+    .version("6")                      // required: valid connector versions 
are "6"
+    .host("localhost", 9200, "http")   // required: one or more Elasticsearch 
hosts to connect to
+    .index("MyUsers")                  // required: Elasticsearch index
+    .documentType("user")              // required: Elasticsearch document type
+
+    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" 
by default)
+                              //   e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
+    .keyNullLiteral("n/a")    // optional: representation for null fields in 
keys ("null" by default)
+
+    // optional: failure handling strategy in case a request to Elasticsearch 
fails (fail by default)
+    .failureHandlerFail()          // optional: throws an exception if a 
request fails and causes a job failure
+    .failureHandlerIgnore()        //   or ignores failures and drops the 
request
+    .failureHandlerRetryRejected() //   or re-adds requests that have failed 
due to queue capacity saturation
+    .failureHandlerCustom(...)     //   or custom failure handling with a 
ActionRequestFailureHandler subclass
+
+    // optional: configure how to buffer elements before sending them in bulk 
to the cluster for efficiency
+    .disableFlushOnCheckpoint()    // optional: disables flushing on 
checkpoint (see notes below!)
+    .bulkFlushMaxActions(42)       // optional: maximum number of actions to 
buffer for each bulk request
+    .bulkFlushMaxSize("42 mb")     // optional: maximum size of buffered 
actions in bytes per bulk request
                                    //   (only MB granularity is supported)
     .bulkFlushInterval(60000L)     // optional: bulk flush interval (in 
milliseconds)
 
@@ -1000,68 +1062,6 @@ connector:
                                 # Please refer to Table Formats section for 
more details.
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'connector.type' = 'elasticsearch', -- required: specify this table type is 
elasticsearch
-  
-  'connector.version' = '6',          -- required: valid connector versions 
are "6"
-  
-  'connector.hosts' = 'http://host_name:9092;http://host_name:9093',  -- 
required: one or more Elasticsearch hosts to connect to
-
-  'connector.index' = 'MyUsers',       -- required: Elasticsearch index
-
-  'connector.document-type' = 'user',  -- required: Elasticsearch document type
-
-  'update-mode' = 'append',            -- optional: update mode when used as 
table sink.           
-
-  'connector.key-delimiter' = '$',     -- optional: delimiter for composite 
keys ("_" by default)
-                                       -- e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
-
-  'connector.key-null-literal' = 'n/a',  -- optional: representation for null 
fields in keys ("null" by default)
-
-  'connector.failure-handler' = '...',   -- optional: failure handling 
strategy in case a request to 
-                                         -- Elasticsearch fails ("fail" by 
default).
-                                         -- valid strategies are 
-                                         -- "fail" (throws an exception if a 
request fails and
-                                         -- thus causes a job failure), 
-                                         -- "ignore" (ignores failures and 
drops the request),
-                                         -- "retry-rejected" (re-adds requests 
that have failed due 
-                                         -- to queue capacity saturation), 
-                                         -- or "custom" for failure handling 
with a
-                                         -- ActionRequestFailureHandler 
subclass
-
-  -- optional: configure how to buffer elements before sending them in bulk to 
the cluster for efficiency
-  'connector.flush-on-checkpoint' = 'true',   -- optional: disables flushing 
on checkpoint (see notes below!)
-                                              -- ("true" by default)
-  'connector.bulk-flush.max-actions' = '42',  -- optional: maximum number of 
actions to buffer 
-                                              -- for each bulk request
-  'connector.bulk-flush.max-size' = '42 mb',  -- optional: maximum size of 
buffered actions in bytes
-                                              -- per bulk request
-                                              -- (only MB granularity is 
supported)
-  'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval 
(in milliseconds)
-  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff 
strategy ("disabled" by default)
-                                                      -- valid strategies are 
"disabled", "constant",
-                                                      -- or "exponential"
-  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum 
number of retries
-  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay 
between each backoff attempt
-                                                      -- (in milliseconds)
-
-  -- optional: connection properties to be used during REST communication to 
Elasticsearch
-  'connector.connection-max-retry-timeout' = '3',     -- optional: maximum 
timeout (in milliseconds)
-                                                      -- between retries
-  'connector.connection-path-prefix' = '/v1'          -- optional: prefix 
string to be added to every
-                                                      -- REST communication
-                                                      
-  'format.type' = '...',   -- required: Elasticsearch connector requires to 
specify a format,
-  ...                      -- currently only 'json' format is supported.
-                           -- Please refer to Table Formats section for more 
details.
-)
-{% endhighlight %}
-</div>
 </div>
 
 **Bulk flushing:** For more information about characteristics of the optional 
flushing parameters see the [corresponding low-level documentation]({{ 
site.baseurl }}/dev/connectors/elasticsearch.html).
@@ -1091,6 +1091,38 @@ For append-only queries, the connector can also operate 
in [append mode](#update
 The connector can be defined as follows:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  hbase_rowkey_name rowkey_type,
+  hbase_column_family_name1 ROW<...>,
+  hbase_column_family_name2 ROW<...>
+) WITH (
+  'connector.type' = 'hbase', -- required: specify this table type is hbase
+  
+  'connector.version' = '1.4.3',          -- required: valid connector 
versions are "1.4.3"
+  
+  'connector.table-name' = 'hbase_table_name',  -- required: hbase table name
+  
+  'connector.zookeeper.quorum' = 'localhost:2181', -- required: HBase 
Zookeeper quorum configuration
+  'connector.zookeeper.znode.parent' = '/test',    -- optional: the root dir 
in Zookeeper for HBase cluster.
+                                                   -- The default value is 
"/hbase".
+
+  'connector.write.buffer-flush.max-size' = '10mb', -- optional: writing 
option, determines how many size in memory of buffered
+                                                    -- rows to insert per 
round trip. This can help performance on writing to JDBC
+                                                    -- database. The default 
value is "2mb".
+
+  'connector.write.buffer-flush.max-rows' = '1000', -- optional: writing 
option, determines how many rows to insert per round trip.
+                                                    -- This can help 
performance on writing to JDBC database. No default value,
+                                                    -- i.e. the default 
flushing is not depends on the number of buffered rows.
+
+  'connector.write.buffer-flush.interval' = '2s',   -- optional: writing 
option, sets a flush interval flushing buffered requesting
+                                                    -- if the interval passes, 
in milliseconds. Default value is "0s", which means
+                                                    -- no asynchronous flush 
thread will be scheduled.
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .connect(
@@ -1137,38 +1169,6 @@ connector:
                                  # no asynchronous flush thread will be 
scheduled.
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  hbase_rowkey_name rowkey_type,
-  hbase_column_family_name1 ROW<...>,
-  hbase_column_family_name2 ROW<...>
-) WITH (
-  'connector.type' = 'hbase', -- required: specify this table type is hbase
-  
-  'connector.version' = '1.4.3',          -- required: valid connector 
versions are "1.4.3"
-  
-  'connector.table-name' = 'hbase_table_name',  -- required: hbase table name
-  
-  'connector.zookeeper.quorum' = 'localhost:2181', -- required: HBase 
Zookeeper quorum configuration
-  'connector.zookeeper.znode.parent' = '/test',    -- optional: the root dir 
in Zookeeper for HBase cluster.
-                                                   -- The default value is 
"/hbase".
-
-  'connector.write.buffer-flush.max-size' = '10mb', -- optional: writing 
option, determines how many size in memory of buffered
-                                                    -- rows to insert per 
round trip. This can help performance on writing to JDBC
-                                                    -- database. The default 
value is "2mb".
-
-  'connector.write.buffer-flush.max-rows' = '1000', -- optional: writing 
option, determines how many rows to insert per round trip.
-                                                    -- This can help 
performance on writing to JDBC database. No default value,
-                                                    -- i.e. the default 
flushing is not depends on the number of buffered rows.
-
-  'connector.write.buffer-flush.interval' = '2s',   -- optional: writing 
option, sets a flush interval flushing buffered requesting
-                                                    -- if the interval passes, 
in milliseconds. Default value is "0s", which means
-                                                    -- no asynchronous flush 
thread will be scheduled.
-)
-{% endhighlight %}
-</div>
 </div>
 
 **Columns:** All the column families in HBase table must be declared as `ROW` 
type, the field name maps to the column family name, and the nested field names 
map to the column qualifier names. There is no need to declare all the families 
and qualifiers in the schema, users can declare what's necessary. Except the 
`ROW` type fields, the only one field of atomic type (e.g. `STRING`, `BIGINT`) 
will be recognized as row key of the table. There's no constraints on the name 
of row key field. 
@@ -1206,52 +1206,6 @@ To use JDBC connector, need to choose an actual driver 
to use. Here are drivers
 The connector can be defined as follows:
 
 <div class="codetabs" markdown="1">
-<div data-lang="YAML" markdown="1">
-{% highlight yaml %}
-connector:
-  type: jdbc
-  url: "jdbc:mysql://localhost:3306/flink-test"     # required: JDBC DB url
-  table: "jdbc_table_name"        # required: jdbc table name
-  driver: "com.mysql.jdbc.Driver" # optional: the class name of the JDBC 
driver to use to connect to this URL.
-                                  # If not set, it will automatically be 
derived from the URL.
-
-  username: "name"                # optional: jdbc user name and password
-  password: "password"
-  
-  read: # scan options, optional, used when reading from table
-    partition: # These options must all be specified if any of them is 
specified. In addition, partition.num must be specified. They
-               # describe how to partition the table when reading in parallel 
from multiple tasks. partition.column must be a numeric,
-               # date, or timestamp column from the table in question. Notice 
that lowerBound and upperBound are just used to decide
-               # the partition stride, not for filtering the rows in table. So 
all rows in the table will be partitioned and returned.
-               # This option applies only to reading.
-      column: "column_name" # optional, name of the column used for 
partitioning the input.
-      num: 50               # optional, the number of partitions.
-      lower-bound: 500      # optional, the smallest value of the first 
partition.
-      upper-bound: 1000     # optional, the largest value of the last 
partition.
-    fetch-size: 100         # optional, Gives the reader a hint as to the 
number of rows that should be fetched
-                            # from the database when reading per round trip. 
If the value specified is zero, then
-                            # the hint is ignored. The default value is zero.
-  
-  lookup: # lookup options, optional, used in temporary join
-    cache:
-      max-rows: 5000 # optional, max number of rows of lookup cache, over this 
value, the oldest rows will
-                     # be eliminated. "cache.max-rows" and "cache.ttl" options 
must all be specified if any
-                     # of them is specified. Cache is not enabled as default.
-      ttl: "10s"     # optional, the max time to live for each rows in lookup 
cache, over this time, the oldest rows
-                     # will be expired. "cache.max-rows" and "cache.ttl" 
options must all be specified if any of
-                     # them is specified. Cache is not enabled as default.
-    max-retries: 3   # optional, max retry times if lookup database failed
-  
-  write: # sink options, optional, used when writing into table
-      flush:
-        max-rows: 5000 # optional, flush max size (includes all append, upsert 
and delete records), 
-                       # over this number of records, will flush data. The 
default value is "5000".
-        interval: "2s" # optional, flush interval mills, over this time, 
asynchronous threads will flush data.
-                       # The default value is "0s", which means no 
asynchronous flush thread will be scheduled. 
-      max-retries: 3   # optional, max retry times if writing records to 
database failed.
-{% endhighlight %}
-</div>
-
 <div data-lang="DDL" markdown="1">
 {% highlight sql %}
 CREATE TABLE MyUserTable (
@@ -1303,6 +1257,52 @@ CREATE TABLE MyUserTable (
 )
 {% endhighlight %}
 </div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+connector:
+  type: jdbc
+  url: "jdbc:mysql://localhost:3306/flink-test"     # required: JDBC DB url
+  table: "jdbc_table_name"        # required: jdbc table name
+  driver: "com.mysql.jdbc.Driver" # optional: the class name of the JDBC 
driver to use to connect to this URL.
+                                  # If not set, it will automatically be 
derived from the URL.
+
+  username: "name"                # optional: jdbc user name and password
+  password: "password"
+  
+  read: # scan options, optional, used when reading from table
+    partition: # These options must all be specified if any of them is 
specified. In addition, partition.num must be specified. They
+               # describe how to partition the table when reading in parallel 
from multiple tasks. partition.column must be a numeric,
+               # date, or timestamp column from the table in question. Notice 
that lowerBound and upperBound are just used to decide
+               # the partition stride, not for filtering the rows in table. So 
all rows in the table will be partitioned and returned.
+               # This option applies only to reading.
+      column: "column_name" # optional, name of the column used for 
partitioning the input.
+      num: 50               # optional, the number of partitions.
+      lower-bound: 500      # optional, the smallest value of the first 
partition.
+      upper-bound: 1000     # optional, the largest value of the last 
partition.
+    fetch-size: 100         # optional, Gives the reader a hint as to the 
number of rows that should be fetched
+                            # from the database when reading per round trip. 
If the value specified is zero, then
+                            # the hint is ignored. The default value is zero.
+  
+  lookup: # lookup options, optional, used in temporary join
+    cache:
+      max-rows: 5000 # optional, max number of rows of lookup cache, over this 
value, the oldest rows will
+                     # be eliminated. "cache.max-rows" and "cache.ttl" options 
must all be specified if any
+                     # of them is specified. Cache is not enabled as default.
+      ttl: "10s"     # optional, the max time to live for each rows in lookup 
cache, over this time, the oldest rows
+                     # will be expired. "cache.max-rows" and "cache.ttl" 
options must all be specified if any of
+                     # them is specified. Cache is not enabled as default.
+    max-retries: 3   # optional, max retry times if lookup database failed
+  
+  write: # sink options, optional, used when writing into table
+      flush:
+        max-rows: 5000 # optional, flush max size (includes all append, upsert 
and delete records), 
+                       # over this number of records, will flush data. The 
default value is "5000".
+        interval: "2s" # optional, flush interval mills, over this time, 
asynchronous threads will flush data.
+                       # The default value is "0s", which means no 
asynchronous flush thread will be scheduled. 
+      max-retries: 3   # optional, max retry times if writing records to 
database failed.
+{% endhighlight %}
+</div>
 </div>
 
 **Upsert sink:** Flink automatically extracts valid keys from a query. For 
example, a query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key 
of the fields `a` and `b`. If a JDBC table is used as upsert sink, please make 
sure keys of the query is one of the unique key sets or primary key of the 
underlying database. This can guarantee the output result is as expected.
@@ -1351,6 +1351,36 @@ schema is interpreted as a field renaming in the format.
 The CSV format can be used as follows:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- optional: define the schema 
explicitly using type information.
+  'format.fields.0.data-type' = 'FLOAT',  -- This overrides default behavior 
that uses table's schema as format schema.
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.data-type' = 'TIMESTAMP(3)',
+
+  'format.field-delimiter' = ';',         -- optional: field delimiter 
character (',' by default)
+  'format.line-delimiter' = '\r\n',       -- optional: line delimiter ("\n" by 
default; otherwise
+                                          -- "\r" or "\r\n" are allowed)
+  'format.quote-character' = '''',        -- optional: quote character for 
enclosing field values ('"' by default)
+  'format.allow-comments' = 'true',       -- optional: ignores comment lines 
that start with "#"
+                                          -- (disabled by default);
+                                          -- if enabled, make sure to also 
ignore parse errors to allow empty rows
+  'format.ignore-parse-errors' = 'true',  -- optional: skip fields and rows 
with parse errors instead of failing;
+                                          -- fields are set to null in case of 
errors
+  'format.array-element-delimiter' = '|', -- optional: the array element 
delimiter string for separating
+                                          -- array and row element values (";" 
by default)
+  'format.escape-character' = '\\',       -- optional: escape character for 
escaping values (disabled by default)
+  'format.null-literal' = 'n/a'           -- optional: null literal string 
that is interpreted as a
+                                          -- null value (disabled by default)
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .withFormat(
@@ -1427,36 +1457,6 @@ format:
                                #   null value (disabled by default)
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'format.type' = 'csv',                  -- required: specify the schema type
-
-  'format.fields.0.name' = 'lon',         -- optional: define the schema 
explicitly using type information.
-  'format.fields.0.data-type' = 'FLOAT',  -- This overrides default behavior 
that uses table's schema as format schema.
-  'format.fields.1.name' = 'rideTime',
-  'format.fields.1.data-type' = 'TIMESTAMP(3)',
-
-  'format.field-delimiter' = ';',         -- optional: field delimiter 
character (',' by default)
-  'format.line-delimiter' = '\r\n',       -- optional: line delimiter ("\n" by 
default; otherwise
-                                          -- "\r" or "\r\n" are allowed)
-  'format.quote-character' = '''',        -- optional: quote character for 
enclosing field values ('"' by default)
-  'format.allow-comments' = 'true',       -- optional: ignores comment lines 
that start with "#"
-                                          -- (disabled by default);
-                                          -- if enabled, make sure to also 
ignore parse errors to allow empty rows
-  'format.ignore-parse-errors' = 'true',  -- optional: skip fields and rows 
with parse errors instead of failing;
-                                          -- fields are set to null in case of 
errors
-  'format.array-element-delimiter' = '|', -- optional: the array element 
delimiter string for separating
-                                          -- array and row element values (";" 
by default)
-  'format.escape-character' = '\\',       -- optional: escape character for 
escaping values (disabled by default)
-  'format.null-literal' = 'n/a'           -- optional: null literal string 
that is interpreted as a
-                                          -- null value (disabled by default)
-)
-{% endhighlight %}
-</div>
 </div>
 
 The following table lists supported types that can be read and written:
@@ -1519,6 +1519,36 @@ If the format schema is equal to the table schema, the 
schema can also be automa
 The JSON format can be used as follows:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'json',                   -- required: specify the format 
type
+  'format.fail-on-missing-field' = 'true'   -- optional: flag whether to fail 
if a field is missing or not, false by default
+
+  'format.fields.0.name' = 'lon',           -- optional: define the schema 
explicitly using type information.
+  'format.fields.0.data-type' = 'FLOAT',    -- This overrides default behavior 
that uses table's schema as format schema.
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.data-type' = 'TIMESTAMP(3)',
+
+  'format.json-schema' =                    -- or by using a JSON schema which 
parses to DECIMAL and TIMESTAMP.
+    '{                                      -- This also overrides the default 
behavior.
+      "type": "object",
+      "properties": {
+        "lon": {
+          "type": "number"
+        },
+        "rideTime": {
+          "type": "string",
+          "format": "date-time"
+        }
+      }
+    }'
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .withFormat(
@@ -1603,36 +1633,6 @@ format:
     }
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'format.type' = 'json',                   -- required: specify the format 
type
-  'format.fail-on-missing-field' = 'true'   -- optional: flag whether to fail 
if a field is missing or not, false by default
-
-  'format.fields.0.name' = 'lon',           -- optional: define the schema 
explicitly using type information.
-  'format.fields.0.data-type' = 'FLOAT',    -- This overrides default behavior 
that uses table's schema as format schema.
-  'format.fields.1.name' = 'rideTime',
-  'format.fields.1.data-type' = 'TIMESTAMP(3)',
-
-  'format.json-schema' =                    -- or by using a JSON schema which 
parses to DECIMAL and TIMESTAMP.
-    '{                                      -- This also overrides the default 
behavior.
-      "type": "object",
-      "properties": {
-        "lon": {
-          "type": "number"
-        },
-        "rideTime": {
-          "type": "string",
-          "format": "date-time"
-        }
-      }
-    }'
-)
-{% endhighlight %}
-</div>
 </div>
 
 The following table shows the mapping of JSON schema types to Flink SQL types:
@@ -1714,6 +1714,27 @@ The [Apache Avro](https://avro.apache.org/) format 
allows to read and write Avro
 The Avro format can be used as follows:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'avro',                                 -- required: specify 
the schema type
+  'format.record-class' = 'org.organization.types.User',  -- required: define 
the schema either by using an Avro specific record class
+
+  'format.avro-schema' =                                  -- or by using an 
Avro schema
+    '{
+      "type": "record",
+      "name": "test",
+      "fields" : [
+        {"name": "a", "type": "long"},
+        {"name": "b", "type": "string"}
+      ]
+    }'
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .withFormat(
@@ -1780,27 +1801,6 @@ format:
     }
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'format.type' = 'avro',                                 -- required: specify 
the schema type
-  'format.record-class' = 'org.organization.types.User',  -- required: define 
the schema either by using an Avro specific record class
-
-  'format.avro-schema' =                                  -- or by using an 
Avro schema
-    '{
-      "type": "record",
-      "name": "test",
-      "fields" : [
-        {"name": "a", "type": "long"},
-        {"name": "b", "type": "string"}
-      ]
-    }'
-)
-{% endhighlight %}
-</div>
 </div>
 
 Avro types are mapped to the corresponding SQL data types. Union types are 
only supported for specifying nullability otherwise they are converted to an 
`ANY` type. The following table shows the mapping:
@@ -1844,6 +1844,28 @@ replaced by a proper RFC-compliant version. Use the 
RFC-compliant CSV format whe
 Use the old one for stream/batch filesystem operations for now.
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- optional: declare ordered format 
fields explicitly. This will overrides
+  'format.fields.0.data-type' = 'STRING', --  the default behavior that uses 
table's schema as format schema.
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.data-type' = 'TIMESTAMP(3)',
+
+  'format.field-delimiter' = ',',         -- optional: string delimiter "," by 
default
+  'format.line-delimiter' = '\n',         -- optional: string delimiter "\n" 
by default
+  'format.quote-character' = '"',         -- optional: single character for 
string values, empty by default
+  'format.comment-prefix' = '#',          -- optional: string to indicate 
comments, empty by default
+  'format.ignore-first-line' = 'false',   -- optional: boolean flag to ignore 
the first line, by default it is not skipped
+  'format.ignore-parse-errors' = 'true'   -- optional: skip records with parse 
error instead of failing by default
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .withFormat(
@@ -1893,28 +1915,6 @@ format:
   ignore-parse-errors: true  # optional: skip records with parse error instead 
of failing by default
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'format.type' = 'csv',                  -- required: specify the schema type
-
-  'format.fields.0.name' = 'lon',         -- optional: declare ordered format 
fields explicitly. This will overrides
-  'format.fields.0.data-type' = 'STRING', --  the default behavior that uses 
table's schema as format schema.
-  'format.fields.1.name' = 'rideTime',
-  'format.fields.1.data-type' = 'TIMESTAMP(3)',
-
-  'format.field-delimiter' = ',',         -- optional: string delimiter "," by 
default
-  'format.line-delimiter' = '\n',         -- optional: string delimiter "\n" 
by default
-  'format.quote-character' = '"',         -- optional: single character for 
string values, empty by default
-  'format.comment-prefix' = '#',          -- optional: string to indicate 
comments, empty by default
-  'format.ignore-first-line' = 'false',   -- optional: boolean flag to ignore 
the first line, by default it is not skipped
-  'format.ignore-parse-errors' = 'true'   -- optional: skip records with parse 
error instead of failing by default
-)
-{% endhighlight %}
-</div>
 </div>
 
 The old CSV format is included in Flink and does not require additional 
dependencies.
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index 31f2732..f4c8e39 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -92,6 +92,21 @@ The **table schema** defines the schema of a table that is 
exposed to SQL querie
 The subsequent sections will cover each definition part 
([connector](connect.html#table-connectors), 
[format](connect.html#table-formats), and [schema](connect.html#table-schema)) 
in more detail. The following example shows how to pass them:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+tableEnvironment.sqlUpdate(
+    "CREATE TABLE MyTable (\n" +
+    "  ...    -- declare table schema \n" +
+    ") WITH (\n" +
+    "  'connector.type' = '...',  -- declare connector specific properties\n" +
+    "  ...\n" +
+    "  'update-mode' = 'append',  -- declare update mode\n" +
+    "  'format.type' = '...',     -- declare format specific properties\n" +
+    "  ...\n" +
+    ")");
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 tableEnvironment
@@ -124,21 +139,6 @@ format: ...
 schema: ...
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-tableEnvironment.sqlUpdate(
-    "CREATE TABLE MyTable (\n" +
-    "  ...    -- declare table schema \n" +
-    ") WITH (\n" +
-    "  'connector.type' = '...',  -- declare connector specific properties\n" +
-    "  ...\n" +
-    "  'update-mode' = 'append',  -- declare update mode\n" +
-    "  'format.type' = '...',     -- declare format specific properties\n" +
-    "  ...\n" +
-    ")");
-{% endhighlight %}
-</div>
 </div>
 
 The table's type (`source`, `sink`, or `both`) determines how a table is 
registered. In case of table type `both`, both a table source and table sink 
are registered under the same name. Logically, this means that we can both read 
and write to such a table similarly to a table in a regular DBMS.
@@ -148,6 +148,41 @@ For streaming queries, an [update 
mode](connect.html#update-mode) declares how t
 The following code shows a full example of how to connect to Kafka for reading 
Avro records.
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  -- declare the schema of the table
+  `user` BIGINT,
+  message STRING,
+  ts STRING
+) WITH (
+  -- declare the external system to connect to
+  'connector.type' = 'kafka',
+  'connector.version' = '0.10',
+  'connector.topic' = 'topic_name',
+  'connector.startup-mode' = 'earliest-offset',
+  'connector.properties.zookeeper.connect' = 'localhost:2181',
+  'connector.properties.bootstrap.servers' = 'localhost:9092',
+
+  -- specify the update-mode for streaming tables
+  'update-mode' = 'append',
+
+  -- declare a format for this system
+  'format.type' = 'avro',
+  'format.avro-schema' = '{
+                            "namespace": "org.myorganization",
+                            "type": "record",
+                            "name": "UserMessage",
+                            "fields": [
+                                {"name": "ts", "type": "string"},
+                                {"name": "user", "type": "long"},
+                                {"name": "message", "type": ["string", "null"]}
+                            ]
+                         }'
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 tableEnvironment
@@ -285,41 +320,6 @@ tables:
         data-type: STRING
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  -- declare the schema of the table
-  `user` BIGINT,
-  message STRING,
-  ts STRING
-) WITH (
-  -- declare the external system to connect to
-  'connector.type' = 'kafka',
-  'connector.version' = '0.10',
-  'connector.topic' = 'topic_name',
-  'connector.startup-mode' = 'earliest-offset',
-  'connector.properties.zookeeper.connect' = 'localhost:2181',
-  'connector.properties.bootstrap.servers' = 'localhost:9092',
-
-  -- specify the update-mode for streaming tables
-  'update-mode' = 'append',
-
-  -- declare a format for this system
-  'format.type' = 'avro',
-  'format.avro-schema' = '{
-                            "namespace": "org.myorganization",
-                            "type": "record",
-                            "name": "UserMessage",
-                            "fields": [
-                                {"name": "ts", "type": "string"},
-                                {"name": "user", "type": "long"},
-                                {"name": "message", "type": ["string", "null"]}
-                            ]
-                         }'
-)
-{% endhighlight %}
-</div>
 </div>
 
 In both ways the desired connection properties are converted into normalized, 
string-based key-value pairs. So-called [table 
factories](sourceSinks.html#define-a-tablefactory) create configured table 
sources, table sinks, and corresponding formats from the key-value pairs. All 
table factories that can be found via Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken 
into account when searching for exactly-one matching table factory.
@@ -600,6 +600,16 @@ For streaming queries, it is required to declare how to 
perform the [conversion
 <span class="label label-danger">Attention</span> The documentation of each 
connector states which update modes are supported.
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+ ...
+) WITH (
+ 'update-mode' = 'append'  -- otherwise: 'retract' or 'upsert'
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .connect(...)
@@ -621,16 +631,6 @@ tables:
     update-mode: append    # otherwise: "retract" or "upsert"
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyTable (
- ...
-) WITH (
- 'update-mode' = 'append'  -- otherwise: 'retract' or 'upsert'
-)
-{% endhighlight %}
-</div>
 </div>
 
 See also the [general streaming concepts 
documentation](streaming/dynamic_tables.html#continuous-queries) for more 
information.
@@ -655,6 +655,20 @@ Please note that not all connectors are available in both 
batch and streaming ye
 The file system connector allows for reading and writing from a local or 
distributed filesystem. A filesystem can be defined as:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'filesystem',                -- required: specify to 
connector type
+  'connector.path' = 'file:///path/to/whatever',  -- required: path to a file 
or directory
+  'format.type' = '...',                          -- required: file system 
connector requires to specify a format,
+  ...                                             -- currently only 'csv' 
format is supported.
+                                                  -- Please refer to Table 
Formats section for more details.
+)                                               
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .connect(
@@ -689,20 +703,6 @@ format:                               # required: file 
system connector requires
                                       # Please refer to Table Formats section 
for more details.
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'connector.type' = 'filesystem',                -- required: specify to 
connector type
-  'connector.path' = 'file:///path/to/whatever',  -- required: path to a file 
or directory
-  'format.type' = '...',                          -- required: file system 
connector requires to specify a format,
-  ...                                             -- currently only 'csv' 
format is supported.
-                                                  -- Please refer to Table 
Formats section for more details.
-)
-{% endhighlight %}
-</div>
 </div>
 
 The file system connector itself is included in Flink and does not require an 
additional dependency. A corresponding format needs to be specified for reading 
and writing rows from and to a file system.
@@ -720,6 +720,44 @@ The file system connector itself is included in Flink and 
does not require an ad
 The Kafka connector allows for reading and writing from and to an Apache Kafka 
topic. It can be defined as follows:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'kafka',       
+
+  'connector.version' = '0.11',     -- required: valid connector versions are
+                                    -- "0.8", "0.9", "0.10", "0.11", and 
"universal"
+
+  'connector.topic' = 'topic_name', -- required: topic name from which the 
table is read
+
+  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: 
specify the ZooKeeper connection string
+  'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: 
specify the Kafka server connection string
+  'connector.properties.group.id' = 'testGroup', --optional: required in Kafka 
consumer, specify consumer group
+  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes 
are "earliest-offset", 
+                                                   -- "latest-offset", 
"group-offsets", 
+                                                   -- or "specific-offsets"
+
+  -- optional: used in case of startup mode with specific offsets
+  'connector.specific-offsets' = 
'partition:0,offset:42;partition:1,offset:300',
+
+  'connector.sink-partitioner' = '...',  -- optional: output partitioning from 
Flink's partitions 
+                                         -- into Kafka's partitions valid are 
"fixed" 
+                                         -- (each Flink partition ends up in 
at most one Kafka partition),
+                                         -- "round-robin" (a Flink partition 
is distributed to 
+                                         -- Kafka partitions round-robin)
+                                         -- "custom" (use a custom 
FlinkKafkaPartitioner subclass)
+  -- optional: used in case of sink partitioner custom
+  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
+  
+  'format.type' = '...',                 -- required: Kafka connector requires 
to specify a format,
+  ...                                    -- the supported formats are 'csv', 
'json' and 'avro'.
+                                         -- Please refer to Table Formats 
section for more details.
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .connect(
@@ -756,7 +794,7 @@ The Kafka connector allows for reading and writing from and 
to an Apache Kafka t
     .version("0.11")  # required: valid connector versions are
                       # "0.8", "0.9", "0.10", "0.11", and "universal"
     .topic("...")     # required: topic name from which the table is read
-
+    
     # optional: connector specific properties
     .property("zookeeper.connect", "localhost:2181")
     .property("bootstrap.servers", "localhost:9092")
@@ -794,56 +832,18 @@ connector:
   startup-mode: ...                                               # optional: 
valid modes are "earliest-offset", "latest-offset",
                                                                   # 
"group-offsets", or "specific-offsets"
   specific-offsets: partition:0,offset:42;partition:1,offset:300  # optional: 
used in case of startup mode with specific offsets
-
+  
   sink-partitioner: ...    # optional: output partitioning from Flink's 
partitions into Kafka's partitions
                            # valid are "fixed" (each Flink partition ends up 
in at most one Kafka partition),
                            # "round-robin" (a Flink partition is distributed 
to Kafka partitions round-robin)
                            # "custom" (use a custom FlinkKafkaPartitioner 
subclass)
   sink-partitioner-class: org.mycompany.MyPartitioner  # optional: used in 
case of sink partitioner custom
-
+  
   format:                  # required: Kafka connector requires to specify a 
format,
     ...                    # the supported formats are "csv", "json" and 
"avro".
                            # Please refer to Table Formats section for more 
details.
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'connector.type' = 'kafka',
-
-  'connector.version' = '0.11',     -- required: valid connector versions are
-                                    -- "0.8", "0.9", "0.10", "0.11", and 
"universal"
-
-  'connector.topic' = 'topic_name', -- required: topic name from which the 
table is read
-
-  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: 
specify the ZooKeeper connection string
-  'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: 
specify the Kafka server connection string
-  'connector.properties.group.id' = 'testGroup', --optional: required in Kafka 
consumer, specify consumer group
-  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes 
are "earliest-offset",
-                                                   -- "latest-offset", 
"group-offsets",
-                                                   -- or "specific-offsets"
-
-  -- optional: used in case of startup mode with specific offsets
-  'connector.specific-offsets' = 
'partition:0,offset:42;partition:1,offset:300',
-
-  'connector.sink-partitioner' = '...',  -- optional: output partitioning from 
Flink's partitions
-                                         -- into Kafka's partitions valid are 
"fixed"
-                                         -- (each Flink partition ends up in 
at most one Kafka partition),
-                                         -- "round-robin" (a Flink partition 
is distributed to
-                                         -- Kafka partitions round-robin)
-                                         -- "custom" (use a custom 
FlinkKafkaPartitioner subclass)
-  -- optional: used in case of sink partitioner custom
-  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
-
-  'format.type' = '...',                 -- required: Kafka connector requires 
to specify a format,
-  ...                                    -- the supported formats are 'csv', 
'json' and 'avro'.
-                                         -- Please refer to Table Formats 
section for more details.
-)
-{% endhighlight %}
-</div>
 </div>
 
 **Specify the start reading position:** By default, the Kafka source will 
start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions, which correspond to the 
configurations in section [Kafka Consumers Start Position Configuration]({{ 
site.baseurl 
}}/dev/connectors/kafka.html#kafka-consumers-start-position-configuration).
@@ -875,45 +875,107 @@ For append-only queries, the connector can also operate 
in [append mode](#update
 The connector can be defined as follows:
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java/Scala" markdown="1">
-{% highlight java %}
-.connect(
-  new Elasticsearch()
-    .version("6")                      // required: valid connector versions 
are "6"
-    .host("localhost", 9200, "http")   // required: one or more Elasticsearch 
hosts to connect to
-    .index("MyUsers")                  // required: Elasticsearch index
-    .documentType("user")              // required: Elasticsearch document type
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'elasticsearch', -- required: specify this table type is 
elasticsearch
+  
+  'connector.version' = '6',          -- required: valid connector versions 
are "6"
+  
+  'connector.hosts' = 'http://host_name:9092;http://host_name:9093',  -- 
required: one or more Elasticsearch hosts to connect to
 
-    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" 
by default)
-                              //   e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
-    .keyNullLiteral("n/a")    // optional: representation for null fields in 
keys ("null" by default)
+  'connector.index' = 'MyUsers',       -- required: Elasticsearch index
 
-    // optional: failure handling strategy in case a request to Elasticsearch 
fails (fail by default)
-    .failureHandlerFail()          // optional: throws an exception if a 
request fails and causes a job failure
-    .failureHandlerIgnore()        //   or ignores failures and drops the 
request
-    .failureHandlerRetryRejected() //   or re-adds requests that have failed 
due to queue capacity saturation
-    .failureHandlerCustom(...)     //   or custom failure handling with a 
ActionRequestFailureHandler subclass
+  'connector.document-type' = 'user',  -- required: Elasticsearch document type
 
-    // optional: configure how to buffer elements before sending them in bulk 
to the cluster for efficiency
-    .disableFlushOnCheckpoint()    // optional: disables flushing on 
checkpoint (see notes below!)
-    .bulkFlushMaxActions(42)       // optional: maximum number of actions to 
buffer for each bulk request
-    .bulkFlushMaxSize("42 mb")     // optional: maximum size of buffered 
actions in bytes per bulk request
-                                   //   (only MB granularity is supported)
-    .bulkFlushInterval(60000L)     // optional: bulk flush interval (in 
milliseconds)
+  'update-mode' = 'append',            -- optional: update mode when used as 
table sink.           
 
-    .bulkFlushBackoffConstant()    // optional: use a constant backoff type
-    .bulkFlushBackoffExponential() //   or use an exponential backoff type
-    .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
-    .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff 
attempt (in milliseconds)
+  'connector.key-delimiter' = '$',     -- optional: delimiter for composite 
keys ("_" by default)
+                                       -- e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
 
-    // optional: connection properties to be used during REST communication to 
Elasticsearch
-    .connectionMaxRetryTimeout(3)  // optional: maximum timeout (in 
milliseconds) between retries
-    .connectionPathPrefix("/v1")   // optional: prefix string to be added to 
every REST communication
-)
+  'connector.key-null-literal' = 'n/a',  -- optional: representation for null 
fields in keys ("null" by default)
+
+  'connector.failure-handler' = '...',   -- optional: failure handling 
strategy in case a request to 
+                                         -- Elasticsearch fails ("fail" by 
default).
+                                         -- valid strategies are 
+                                         -- "fail" (throws an exception if a 
request fails and
+                                         -- thus causes a job failure), 
+                                         -- "ignore" (ignores failures and 
drops the request),
+                                         -- "retry-rejected" (re-adds requests 
that have failed due 
+                                         -- to queue capacity saturation), 
+                                         -- or "custom" for failure handling 
with a
+                                         -- ActionRequestFailureHandler 
subclass
+
+  -- optional: configure how to buffer elements before sending them in bulk to 
the cluster for efficiency
+  'connector.flush-on-checkpoint' = 'true',   -- optional: disables flushing 
on checkpoint (see notes below!)
+                                              -- ("true" by default)
+  'connector.bulk-flush.max-actions' = '42',  -- optional: maximum number of 
actions to buffer 
+                                              -- for each bulk request
+  'connector.bulk-flush.max-size' = '42 mb',  -- optional: maximum size of 
buffered actions in bytes
+                                              -- per bulk request
+                                              -- (only MB granularity is 
supported)
+  'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval 
(in milliseconds)
+  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff 
strategy ("disabled" by default)
+                                                      -- valid strategies are 
"disabled", "constant",
+                                                      -- or "exponential"
+  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum 
number of retries
+  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay 
between each backoff attempt
+                                                      -- (in milliseconds)
+
+  -- optional: connection properties to be used during REST communication to 
Elasticsearch
+  'connector.connection-max-retry-timeout' = '3',     -- optional: maximum 
timeout (in milliseconds)
+                                                      -- between retries
+  'connector.connection-path-prefix' = '/v1'          -- optional: prefix 
string to be added to every
+                                                      -- REST communication
+                                                      
+  'format.type' = '...',   -- required: Elasticsearch connector requires to 
specify a format,
+  ...                      -- currently only 'json' format is supported.
+                           -- Please refer to Table Formats section for more 
details.
+)
+{% endhighlight %}
+</div>
+
+<div data-lang="Java/Scala" markdown="1">
+{% highlight java %}
+.connect(
+  new Elasticsearch()
+    .version("6")                      // required: valid connector versions 
are "6"
+    .host("localhost", 9200, "http")   // required: one or more Elasticsearch 
hosts to connect to
+    .index("MyUsers")                  // required: Elasticsearch index
+    .documentType("user")              // required: Elasticsearch document type
+
+    .keyDelimiter("$")        // optional: delimiter for composite keys ("_" 
by default)
+                              //   e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
+    .keyNullLiteral("n/a")    // optional: representation for null fields in 
keys ("null" by default)
+
+    // optional: failure handling strategy in case a request to Elasticsearch 
fails (fail by default)
+    .failureHandlerFail()          // optional: throws an exception if a 
request fails and causes a job failure
+    .failureHandlerIgnore()        //   or ignores failures and drops the 
request
+    .failureHandlerRetryRejected() //   or re-adds requests that have failed 
due to queue capacity saturation
+    .failureHandlerCustom(...)     //   or custom failure handling with a 
ActionRequestFailureHandler subclass
+
+    // optional: configure how to buffer elements before sending them in bulk 
to the cluster for efficiency
+    .disableFlushOnCheckpoint()    // optional: disables flushing on 
checkpoint (see notes below!)
+    .bulkFlushMaxActions(42)       // optional: maximum number of actions to 
buffer for each bulk request
+    .bulkFlushMaxSize("42 mb")     // optional: maximum size of buffered 
actions in bytes per bulk request
+                                   //   (only MB granularity is supported)
+    .bulkFlushInterval(60000L)     // optional: bulk flush interval (in 
milliseconds)
+
+    .bulkFlushBackoffConstant()    // optional: use a constant backoff type
+    .bulkFlushBackoffExponential() //   or use an exponential backoff type
+    .bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
+    .bulkFlushBackoffDelay(30000L) // optional: delay between each backoff 
attempt (in milliseconds)
+
+    // optional: connection properties to be used during REST communication to 
Elasticsearch
+    .connectionMaxRetryTimeout(3)  // optional: maximum timeout (in 
milliseconds) between retries
+    .connectionPathPrefix("/v1")   // optional: prefix string to be added to 
every REST communication
+)
 .withFormat(                      // required: Elasticsearch connector 
requires to specify a format,
   ...                             // currently only Json format is supported.
                                   // Please refer to Table Formats section for 
more details.
-)
+)    
 {% endhighlight %}
 </div>
 
@@ -994,74 +1056,12 @@ connector:
     # optional: connection properties to be used during REST communication to 
Elasticsearch
     connection-max-retry-timeout: 3   # optional: maximum timeout (in 
milliseconds) between retries
     connection-path-prefix: "/v1"     # optional: prefix string to be added to 
every REST communication
-
+    
     format:                     # required: Elasticsearch connector requires 
to specify a format,
       ...                       # currently only "json" format is supported.
                                 # Please refer to Table Formats section for 
more details.
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'connector.type' = 'elasticsearch', -- required: specify this table type is 
elasticsearch
-
-  'connector.version' = '6',          -- required: valid connector versions 
are "6"
-
-  'connector.hosts' = 'http://host_name:9092;http://host_name:9093',  -- 
required: one or more Elasticsearch hosts to connect to
-
-  'connector.index' = 'MyUsers',       -- required: Elasticsearch index
-
-  'connector.document-type' = 'user',  -- required: Elasticsearch document type
-
-  'update-mode' = 'append',            -- optional: update mode when used as 
table sink.
-
-  'connector.key-delimiter' = '$',     -- optional: delimiter for composite 
keys ("_" by default)
-                                       -- e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
-
-  'connector.key-null-literal' = 'n/a',  -- optional: representation for null 
fields in keys ("null" by default)
-
-  'connector.failure-handler' = '...',   -- optional: failure handling 
strategy in case a request to
-                                         -- Elasticsearch fails ("fail" by 
default).
-                                         -- valid strategies are
-                                         -- "fail" (throws an exception if a 
request fails and
-                                         -- thus causes a job failure),
-                                         -- "ignore" (ignores failures and 
drops the request),
-                                         -- "retry-rejected" (re-adds requests 
that have failed due
-                                         -- to queue capacity saturation),
-                                         -- or "custom" for failure handling 
with a
-                                         -- ActionRequestFailureHandler 
subclass
-
-  -- optional: configure how to buffer elements before sending them in bulk to 
the cluster for efficiency
-  'connector.flush-on-checkpoint' = 'true',   -- optional: disables flushing 
on checkpoint (see notes below!)
-                                              -- ("true" by default)
-  'connector.bulk-flush.max-actions' = '42',  -- optional: maximum number of 
actions to buffer
-                                              -- for each bulk request
-  'connector.bulk-flush.max-size' = '42 mb',  -- optional: maximum size of 
buffered actions in bytes
-                                              -- per bulk request
-                                              -- (only MB granularity is 
supported)
-  'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval 
(in milliseconds)
-  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff 
strategy ("disabled" by default)
-                                                      -- valid strategies are 
"disabled", "constant",
-                                                      -- or "exponential"
-  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum 
number of retries
-  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay 
between each backoff attempt
-                                                      -- (in milliseconds)
-
-  -- optional: connection properties to be used during REST communication to 
Elasticsearch
-  'connector.connection-max-retry-timeout' = '3',     -- optional: maximum 
timeout (in milliseconds)
-                                                      -- between retries
-  'connector.connection-path-prefix' = '/v1'          -- optional: prefix 
string to be added to every
-                                                      -- REST communication
-
-  'format.type' = '...',   -- required: Elasticsearch connector requires to 
specify a format,
-  ...                      -- currently only 'json' format is supported.
-                           -- Please refer to Table Formats section for more 
details.
-)
-{% endhighlight %}
-</div>
 </div>
 
 **Bulk flushing:** For more information about characteristics of the optional 
flushing parameters see the [corresponding low-level documentation]({{ 
site.baseurl }}/dev/connectors/elasticsearch.html).
@@ -1091,6 +1091,38 @@ For append-only queries, the connector can also operate 
in [append mode](#update
 The connector can be defined as follows:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  hbase_rowkey_name rowkey_type,
+  hbase_column_family_name1 ROW<...>,
+  hbase_column_family_name2 ROW<...>
+) WITH (
+  'connector.type' = 'hbase', -- required: specify this table type is hbase
+  
+  'connector.version' = '1.4.3',          -- required: valid connector 
versions are "1.4.3"
+  
+  'connector.table-name' = 'hbase_table_name',  -- required: hbase table name
+  
+  'connector.zookeeper.quorum' = 'localhost:2181', -- required: HBase 
Zookeeper quorum configuration
+  'connector.zookeeper.znode.parent' = '/test',    -- optional: the root dir 
in Zookeeper for HBase cluster.
+                                                   -- The default value is 
"/hbase".
+
+  'connector.write.buffer-flush.max-size' = '10mb', -- optional: writing 
option, determines how many size in memory of buffered
+                                                    -- rows to insert per 
round trip. This can help performance on writing to JDBC
+                                                    -- database. The default 
value is "2mb".
+
+  'connector.write.buffer-flush.max-rows' = '1000', -- optional: writing 
option, determines how many rows to insert per round trip.
+                                                    -- This can help 
performance on writing to JDBC database. No default value,
+                                                    -- i.e. the default 
flushing is not depends on the number of buffered rows.
+
+  'connector.write.buffer-flush.interval' = '2s',   -- optional: writing 
option, sets a flush interval flushing buffered requesting
+                                                    -- if the interval passes, 
in milliseconds. Default value is "0s", which means
+                                                    -- no asynchronous flush 
thread will be scheduled.
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .connect(
@@ -1117,14 +1149,14 @@ The connector can be defined as follows:
 connector:
   type: hbase
   version: "1.4.3"               # required: currently only support "1.4.3"
-
+  
   table-name: "hbase_table_name" # required: HBase table name
-
+  
   zookeeper:
     quorum: "localhost:2181"     # required: HBase Zookeeper quorum 
configuration
     znode.parent: "/test"        # optional: the root dir in Zookeeper for 
HBase cluster.
                                  # The default value is "/hbase".
-
+  
   write.buffer-flush:
     max-size: "10mb"             # optional: writing option, determines how 
many size in memory of buffered
                                  # rows to insert per round trip. This can 
help performance on writing to JDBC
@@ -1137,41 +1169,9 @@ connector:
                                  # no asynchronous flush thread will be 
scheduled.
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  hbase_rowkey_name rowkey_type,
-  hbase_column_family_name1 ROW<...>,
-  hbase_column_family_name2 ROW<...>
-) WITH (
-  'connector.type' = 'hbase', -- required: specify this table type is hbase
-
-  'connector.version' = '1.4.3',          -- required: valid connector 
versions are "1.4.3"
-
-  'connector.table-name' = 'hbase_table_name',  -- required: hbase table name
-
-  'connector.zookeeper.quorum' = 'localhost:2181', -- required: HBase 
Zookeeper quorum configuration
-  'connector.zookeeper.znode.parent' = '/test',    -- optional: the root dir 
in Zookeeper for HBase cluster.
-                                                   -- The default value is 
"/hbase".
-
-  'connector.write.buffer-flush.max-size' = '10mb', -- optional: writing 
option, determines how many size in memory of buffered
-                                                    -- rows to insert per 
round trip. This can help performance on writing to JDBC
-                                                    -- database. The default 
value is "2mb".
-
-  'connector.write.buffer-flush.max-rows' = '1000', -- optional: writing 
option, determines how many rows to insert per round trip.
-                                                    -- This can help 
performance on writing to JDBC database. No default value,
-                                                    -- i.e. the default 
flushing is not depends on the number of buffered rows.
-
-  'connector.write.buffer-flush.interval' = '2s',   -- optional: writing 
option, sets a flush interval flushing buffered requesting
-                                                    -- if the interval passes, 
in milliseconds. Default value is "0s", which means
-                                                    -- no asynchronous flush 
thread will be scheduled.
-)
-{% endhighlight %}
-</div>
 </div>
 
-**Columns:** All the column families in HBase table must be declared as `ROW` 
type, the field name maps to the column family name, and the nested field names 
map to the column qualifier names. There is no need to declare all the families 
and qualifiers in the schema, users can declare what's necessary. Except the 
`ROW` type fields, the only one field of atomic type (e.g. `STRING`, `BIGINT`) 
will be recognized as row key of the table. There's no constraints on the name 
of row key field.
+**Columns:** All the column families in HBase table must be declared as `ROW` 
type, the field name maps to the column family name, and the nested field names 
map to the column qualifier names. There is no need to declare all the families 
and qualifiers in the schema, users can declare what's necessary. Except the 
`ROW` type fields, the only one field of atomic type (e.g. `STRING`, `BIGINT`) 
will be recognized as row key of the table. There's no constraints on the name 
of row key field. 
 
 **Temporary join:** Lookup join against HBase do not use any caching; data is 
always queired directly through the HBase client.
 
@@ -1206,69 +1206,23 @@ To use JDBC connector, need to choose an actual driver 
to use. Here are drivers
 The connector can be defined as follows:
 
 <div class="codetabs" markdown="1">
-<div data-lang="YAML" markdown="1">
-{% highlight yaml %}
-connector:
-  type: jdbc
-  url: "jdbc:mysql://localhost:3306/flink-test"     # required: JDBC DB url
-  table: "jdbc_table_name"        # required: jdbc table name
-  driver: "com.mysql.jdbc.Driver" # optional: the class name of the JDBC 
driver to use to connect to this URL.
-                                  # If not set, it will automatically be 
derived from the URL.
-
-  username: "name"                # optional: jdbc user name and password
-  password: "password"
-
-  read: # scan options, optional, used when reading from table
-    partition: # These options must all be specified if any of them is 
specified. In addition, partition.num must be specified. They
-               # describe how to partition the table when reading in parallel 
from multiple tasks. partition.column must be a numeric,
-               # date, or timestamp column from the table in question. Notice 
that lowerBound and upperBound are just used to decide
-               # the partition stride, not for filtering the rows in table. So 
all rows in the table will be partitioned and returned.
-               # This option applies only to reading.
-      column: "column_name" # optional, name of the column used for 
partitioning the input.
-      num: 50               # optional, the number of partitions.
-      lower-bound: 500      # optional, the smallest value of the first 
partition.
-      upper-bound: 1000     # optional, the largest value of the last 
partition.
-    fetch-size: 100         # optional, Gives the reader a hint as to the 
number of rows that should be fetched
-                            # from the database when reading per round trip. 
If the value specified is zero, then
-                            # the hint is ignored. The default value is zero.
-
-  lookup: # lookup options, optional, used in temporary join
-    cache:
-      max-rows: 5000 # optional, max number of rows of lookup cache, over this 
value, the oldest rows will
-                     # be eliminated. "cache.max-rows" and "cache.ttl" options 
must all be specified if any
-                     # of them is specified. Cache is not enabled as default.
-      ttl: "10s"     # optional, the max time to live for each rows in lookup 
cache, over this time, the oldest rows
-                     # will be expired. "cache.max-rows" and "cache.ttl" 
options must all be specified if any of
-                     # them is specified. Cache is not enabled as default.
-    max-retries: 3   # optional, max retry times if lookup database failed
-
-  write: # sink options, optional, used when writing into table
-      flush:
-        max-rows: 5000 # optional, flush max size (includes all append, upsert 
and delete records),
-                       # over this number of records, will flush data. The 
default value is "5000".
-        interval: "2s" # optional, flush interval mills, over this time, 
asynchronous threads will flush data.
-                       # The default value is "0s", which means no 
asynchronous flush thread will be scheduled.
-      max-retries: 3   # optional, max retry times if writing records to 
database failed.
-{% endhighlight %}
-</div>
-
 <div data-lang="DDL" markdown="1">
 {% highlight sql %}
 CREATE TABLE MyUserTable (
   ...
 ) WITH (
   'connector.type' = 'jdbc', -- required: specify this table type is jdbc
-
+  
   'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- required: 
JDBC DB url
-
+  
   'connector.table' = 'jdbc_table_name',  -- required: jdbc table name
-
-  'connector.driver' = 'com.mysql.jdbc.Driver', -- optional: the class name of 
the JDBC driver to use to connect to this URL.
+  
+  'connector.driver' = 'com.mysql.jdbc.Driver', -- optional: the class name of 
the JDBC driver to use to connect to this URL. 
                                                 -- If not set, it will 
automatically be derived from the URL.
 
   'connector.username' = 'name', -- optional: jdbc user name and password
   'connector.password' = 'password',
-
+  
   -- scan options, optional, used when reading from table
 
   -- These options must all be specified if any of them is specified. In 
addition, partition.num must be specified. They
@@ -1280,7 +1234,7 @@ CREATE TABLE MyUserTable (
   'connector.read.partition.num' = '50', -- optional, the number of partitions.
   'connector.read.partition.lower-bound' = '500', -- optional, the smallest 
value of the first partition.
   'connector.read.partition.upper-bound' = '1000', -- optional, the largest 
value of the last partition.
-
+  
   'connector.read.fetch-size' = '100', -- optional, Gives the reader a hint as 
to the number of rows that should be fetched
                                        -- from the database when reading per 
round trip. If the value specified is zero, then
                                        -- the hint is ignored. The default 
value is zero.
@@ -1295,19 +1249,65 @@ CREATE TABLE MyUserTable (
   'connector.lookup.max-retries' = '3', -- optional, max retry times if lookup 
database failed
 
   -- sink options, optional, used when writing into table
-  'connector.write.flush.max-rows' = '5000', -- optional, flush max size 
(includes all append, upsert and delete records),
+  'connector.write.flush.max-rows' = '5000', -- optional, flush max size 
(includes all append, upsert and delete records), 
                                              -- over this number of records, 
will flush data. The default value is "5000".
   'connector.write.flush.interval' = '2s', -- optional, flush interval mills, 
over this time, asynchronous threads will flush data.
-                                           -- The default value is "0s", which 
means no asynchronous flush thread will be scheduled.
+                                           -- The default value is "0s", which 
means no asynchronous flush thread will be scheduled. 
   'connector.write.max-retries' = '3' -- optional, max retry times if writing 
records to database failed
 )
 {% endhighlight %}
 </div>
+
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+connector:
+  type: jdbc
+  url: "jdbc:mysql://localhost:3306/flink-test"     # required: JDBC DB url
+  table: "jdbc_table_name"        # required: jdbc table name
+  driver: "com.mysql.jdbc.Driver" # optional: the class name of the JDBC 
driver to use to connect to this URL.
+                                  # If not set, it will automatically be 
derived from the URL.
+
+  username: "name"                # optional: jdbc user name and password
+  password: "password"
+  
+  read: # scan options, optional, used when reading from table
+    partition: # These options must all be specified if any of them is 
specified. In addition, partition.num must be specified. They
+               # describe how to partition the table when reading in parallel 
from multiple tasks. partition.column must be a numeric,
+               # date, or timestamp column from the table in question. Notice 
that lowerBound and upperBound are just used to decide
+               # the partition stride, not for filtering the rows in table. So 
all rows in the table will be partitioned and returned.
+               # This option applies only to reading.
+      column: "column_name" # optional, name of the column used for 
partitioning the input.
+      num: 50               # optional, the number of partitions.
+      lower-bound: 500      # optional, the smallest value of the first 
partition.
+      upper-bound: 1000     # optional, the largest value of the last 
partition.
+    fetch-size: 100         # optional, Gives the reader a hint as to the 
number of rows that should be fetched
+                            # from the database when reading per round trip. 
If the value specified is zero, then
+                            # the hint is ignored. The default value is zero.
+  
+  lookup: # lookup options, optional, used in temporary join
+    cache:
+      max-rows: 5000 # optional, max number of rows of lookup cache, over this 
value, the oldest rows will
+                     # be eliminated. "cache.max-rows" and "cache.ttl" options 
must all be specified if any
+                     # of them is specified. Cache is not enabled as default.
+      ttl: "10s"     # optional, the max time to live for each rows in lookup 
cache, over this time, the oldest rows
+                     # will be expired. "cache.max-rows" and "cache.ttl" 
options must all be specified if any of
+                     # them is specified. Cache is not enabled as default.
+    max-retries: 3   # optional, max retry times if lookup database failed
+  
+  write: # sink options, optional, used when writing into table
+      flush:
+        max-rows: 5000 # optional, flush max size (includes all append, upsert 
and delete records), 
+                       # over this number of records, will flush data. The 
default value is "5000".
+        interval: "2s" # optional, flush interval mills, over this time, 
asynchronous threads will flush data.
+                       # The default value is "0s", which means no 
asynchronous flush thread will be scheduled. 
+      max-retries: 3   # optional, max retry times if writing records to 
database failed.
+{% endhighlight %}
+</div>
 </div>
 
 **Upsert sink:** Flink automatically extracts valid keys from a query. For 
example, a query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key 
of the fields `a` and `b`. If a JDBC table is used as upsert sink, please make 
sure keys of the query is one of the unique key sets or primary key of the 
underlying database. This can guarantee the output result is as expected.
 
-**Temporary Join:**  JDBC connector can be used in temporal join as a lookup 
source. Currently, only sync lookup mode is supported. The lookup cache options 
(`connector.lookup.cache.max-rows` and `connector.lookup.cache.ttl`) must all 
be specified if any of them is specified. The lookup cache is used to improve 
performance of temporal join JDBC connector by querying the cache first instead 
of send all requests to remote database. But the returned value might not be 
the latest if it is fr [...]
+**Temporary Join:**  JDBC connector can be used in temporal join as a lookup 
source. Currently, only sync lookup mode is supported. The lookup cache options 
(`connector.lookup.cache.max-rows` and `connector.lookup.cache.ttl`) must all 
be specified if any of them is specified. The lookup cache is used to improve 
performance of temporal join JDBC connector by querying the cache first instead 
of send all requests to remote database. But the returned value might not be 
the latest if it is fr [...]
 
 **Writing:** As default, the `connector.write.flush.interval` is `0s` and 
`connector.write.flush.max-rows` is `5000`, which means for low traffic 
queries, the buffered output rows may not be flushed to database for a long 
time. So the interval configuration is recommended to set.
 
@@ -1351,6 +1351,36 @@ schema is interpreted as a field renaming in the format.
 The CSV format can be used as follows:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- optional: define the schema 
explicitly using type information.
+  'format.fields.0.data-type' = 'FLOAT',  -- This overrides default behavior 
that uses table's schema as format schema.
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.data-type' = 'TIMESTAMP(3)',
+
+  'format.field-delimiter' = ';',         -- optional: field delimiter 
character (',' by default)
+  'format.line-delimiter' = '\r\n',       -- optional: line delimiter ("\n" by 
default; otherwise
+                                          -- "\r" or "\r\n" are allowed)
+  'format.quote-character' = '''',        -- optional: quote character for 
enclosing field values ('"' by default)
+  'format.allow-comments' = 'true',       -- optional: ignores comment lines 
that start with "#"
+                                          -- (disabled by default);
+                                          -- if enabled, make sure to also 
ignore parse errors to allow empty rows
+  'format.ignore-parse-errors' = 'true',  -- optional: skip fields and rows 
with parse errors instead of failing;
+                                          -- fields are set to null in case of 
errors
+  'format.array-element-delimiter' = '|', -- optional: the array element 
delimiter string for separating
+                                          -- array and row element values (";" 
by default)
+  'format.escape-character' = '\\',       -- optional: escape character for 
escaping values (disabled by default)
+  'format.null-literal' = 'n/a'           -- optional: null literal string 
that is interpreted as a
+                                          -- null value (disabled by default)
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .withFormat(
@@ -1427,36 +1457,6 @@ format:
                                #   null value (disabled by default)
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'format.type' = 'csv',                  -- required: specify the schema type
-
-  'format.fields.0.name' = 'lon',         -- optional: define the schema 
explicitly using type information.
-  'format.fields.0.data-type' = 'FLOAT',  -- This overrides default behavior 
that uses table's schema as format schema.
-  'format.fields.1.name' = 'rideTime',
-  'format.fields.1.data-type' = 'TIMESTAMP(3)',
-
-  'format.field-delimiter' = ';',         -- optional: field delimiter 
character (',' by default)
-  'format.line-delimiter' = '\r\n',       -- optional: line delimiter ("\n" by 
default; otherwise
-                                          -- "\r" or "\r\n" are allowed)
-  'format.quote-character' = '''',        -- optional: quote character for 
enclosing field values ('"' by default)
-  'format.allow-comments' = 'true',       -- optional: ignores comment lines 
that start with "#"
-                                          -- (disabled by default);
-                                          -- if enabled, make sure to also 
ignore parse errors to allow empty rows
-  'format.ignore-parse-errors' = 'true',  -- optional: skip fields and rows 
with parse errors instead of failing;
-                                          -- fields are set to null in case of 
errors
-  'format.array-element-delimiter' = '|', -- optional: the array element 
delimiter string for separating
-                                          -- array and row element values (";" 
by default)
-  'format.escape-character' = '\\',       -- optional: escape character for 
escaping values (disabled by default)
-  'format.null-literal' = 'n/a'           -- optional: null literal string 
that is interpreted as a
-                                          -- null value (disabled by default)
-)
-{% endhighlight %}
-</div>
 </div>
 
 The following table lists supported types that can be read and written:
@@ -1519,6 +1519,36 @@ If the format schema is equal to the table schema, the 
schema can also be automa
 The JSON format can be used as follows:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'json',                   -- required: specify the format 
type
+  'format.fail-on-missing-field' = 'true'   -- optional: flag whether to fail 
if a field is missing or not, false by default
+
+  'format.fields.0.name' = 'lon',           -- optional: define the schema 
explicitly using type information.
+  'format.fields.0.data-type' = 'FLOAT',    -- This overrides default behavior 
that uses table's schema as format schema.
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.data-type' = 'TIMESTAMP(3)',
+
+  'format.json-schema' =                    -- or by using a JSON schema which 
parses to DECIMAL and TIMESTAMP.
+    '{                                      -- This also overrides the default 
behavior.
+      "type": "object",
+      "properties": {
+        "lon": {
+          "type": "number"
+        },
+        "rideTime": {
+          "type": "string",
+          "format": "date-time"
+        }
+      }
+    }'
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .withFormat(
@@ -1603,36 +1633,6 @@ format:
     }
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'format.type' = 'json',                   -- required: specify the format 
type
-  'format.fail-on-missing-field' = 'true'   -- optional: flag whether to fail 
if a field is missing or not, false by default
-
-  'format.fields.0.name' = 'lon',           -- optional: define the schema 
explicitly using type information.
-  'format.fields.0.data-type' = 'FLOAT',    -- This overrides default behavior 
that uses table's schema as format schema.
-  'format.fields.1.name' = 'rideTime',
-  'format.fields.1.data-type' = 'TIMESTAMP(3)',
-
-  'format.json-schema' =                    -- or by using a JSON schema which 
parses to DECIMAL and TIMESTAMP.
-    '{                                      -- This also overrides the default 
behavior.
-      "type": "object",
-      "properties": {
-        "lon": {
-          "type": "number"
-        },
-        "rideTime": {
-          "type": "string",
-          "format": "date-time"
-        }
-      }
-    }'
-)
-{% endhighlight %}
-</div>
 </div>
 
 The following table shows the mapping of JSON schema types to Flink SQL types:
@@ -1714,6 +1714,27 @@ The [Apache Avro](https://avro.apache.org/) format 
allows to read and write Avro
 The Avro format can be used as follows:
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'avro',                                 -- required: specify 
the schema type
+  'format.record-class' = 'org.organization.types.User',  -- required: define 
the schema either by using an Avro specific record class
+
+  'format.avro-schema' =                                  -- or by using an 
Avro schema
+    '{
+      "type": "record",
+      "name": "test",
+      "fields" : [
+        {"name": "a", "type": "long"},
+        {"name": "b", "type": "string"}
+      ]
+    }'
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .withFormat(
@@ -1780,27 +1801,6 @@ format:
     }
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'format.type' = 'avro',                                 -- required: specify 
the schema type
-  'format.record-class' = 'org.organization.types.User',  -- required: define 
the schema either by using an Avro specific record class
-
-  'format.avro-schema' =                                  -- or by using an 
Avro schema
-    '{
-      "type": "record",
-      "name": "test",
-      "fields" : [
-        {"name": "a", "type": "long"},
-        {"name": "b", "type": "string"}
-      ]
-    }'
-)
-{% endhighlight %}
-</div>
 </div>
 
 Avro types are mapped to the corresponding SQL data types. Union types are 
only supported for specifying nullability otherwise they are converted to an 
`ANY` type. The following table shows the mapping:
@@ -1844,6 +1844,28 @@ replaced by a proper RFC-compliant version. Use the 
RFC-compliant CSV format whe
 Use the old one for stream/batch filesystem operations for now.
 
 <div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'format.type' = 'csv',                  -- required: specify the schema type
+
+  'format.fields.0.name' = 'lon',         -- optional: declare ordered format 
fields explicitly. This will overrides
+  'format.fields.0.data-type' = 'STRING', --  the default behavior that uses 
table's schema as format schema.
+  'format.fields.1.name' = 'rideTime',
+  'format.fields.1.data-type' = 'TIMESTAMP(3)',
+
+  'format.field-delimiter' = ',',         -- optional: string delimiter "," by 
default
+  'format.line-delimiter' = '\n',         -- optional: string delimiter "\n" 
by default
+  'format.quote-character' = '"',         -- optional: single character for 
string values, empty by default
+  'format.comment-prefix' = '#',          -- optional: string to indicate 
comments, empty by default
+  'format.ignore-first-line' = 'false',   -- optional: boolean flag to ignore 
the first line, by default it is not skipped
+  'format.ignore-parse-errors' = 'true'   -- optional: skip records with parse 
error instead of failing by default
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 .withFormat(
@@ -1893,28 +1915,6 @@ format:
   ignore-parse-errors: true  # optional: skip records with parse error instead 
of failing by default
 {% endhighlight %}
 </div>
-
-<div data-lang="DDL" markdown="1">
-{% highlight sql %}
-CREATE TABLE MyUserTable (
-  ...
-) WITH (
-  'format.type' = 'csv',                  -- required: specify the schema type
-
-  'format.fields.0.name' = 'lon',         -- optional: declare ordered format 
fields explicitly. This will overrides
-  'format.fields.0.data-type' = 'STRING', --  the default behavior that uses 
table's schema as format schema.
-  'format.fields.1.name' = 'rideTime',
-  'format.fields.1.data-type' = 'TIMESTAMP(3)',
-
-  'format.field-delimiter' = ',',         -- optional: string delimiter "," by 
default
-  'format.line-delimiter' = '\n',         -- optional: string delimiter "\n" 
by default
-  'format.quote-character' = '"',         -- optional: single character for 
string values, empty by default
-  'format.comment-prefix' = '#',          -- optional: string to indicate 
comments, empty by default
-  'format.ignore-first-line' = 'false',   -- optional: boolean flag to ignore 
the first line, by default it is not skipped
-  'format.ignore-parse-errors' = 'true'   -- optional: skip records with parse 
error instead of failing by default
-)
-{% endhighlight %}
-</div>
 </div>
 
 The old CSV format is included in Flink and does not require additional 
dependencies.

Reply via email to