This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1e8ae7ad16 [doc](flink-connector)improve flink connector doc (#22143)
1e8ae7ad16 is described below
commit 1e8ae7ad16fee79c2fd01285ecdb8c73011e679e
Author: wudi <[email protected]>
AuthorDate: Tue Jul 25 15:58:35 2023 +0800
[doc](flink-connector)improve flink connector doc (#22143)
---
docs/en/docs/ecosystem/flink-doris-connector.md | 446 ++++++++++++---------
docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 264 +++++++-----
2 files changed, 419 insertions(+), 291 deletions(-)
diff --git a/docs/en/docs/ecosystem/flink-doris-connector.md
b/docs/en/docs/ecosystem/flink-doris-connector.md
index 7a07793ec4..b8f385469d 100644
--- a/docs/en/docs/ecosystem/flink-doris-connector.md
+++ b/docs/en/docs/ecosystem/flink-doris-connector.md
@@ -28,12 +28,7 @@ under the License.
-
-The Flink Doris Connector can support operations (read, insert, modify,
delete) data stored in Doris through Flink.
-
-Github: https://github.com/apache/doris-flink-connector
-
-* `Doris` table can be mapped to `DataStream` or `Table`.
+* [Flink Doris Connector](https://github.com/apache/doris-flink-connector) can
support data stored in Doris through Flink operations (read, insert, modify,
delete). This document introduces how to operate Doris through Datastream and
SQL through Flink.
>**Note:**
>
@@ -50,124 +45,108 @@ Github: https://github.com/apache/doris-flink-connector
| 1.3.0 | 1.16 | 1.0+ | 8 | - |
| 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- |
-## Build and Install
-
-Ready to work
-
-1. Modify the `custom_env.sh.tpl` file and rename it to `custom_env.sh`
+## USE
-2. Execute following command in source dir:
-`sh build.sh`
-Enter the flink version you need to compile according to the prompt.
+### Maven
-After the compilation is successful, the target jar package will be generated
in the `dist` directory, such as: `flink-doris-connector-1.3.0-SNAPSHOT.jar`.
-Copy this file to `classpath` in `Flink` to use `Flink-Doris-Connector`. For
example, `Flink` running in `Local` mode, put this file in the `lib/` folder.
`Flink` running in `Yarn` cluster mode, put this file in the pre-deployment
package.
-
-
-## Using Maven
-
-Add flink-doris-connector Maven dependencies
+Add flink-doris-connector
```
<!-- flink-doris-connector -->
<dependency>
- <groupId>org.apache.doris</groupId>
- <artifactId>flink-doris-connector-1.16</artifactId>
- <version>1.3.0</version>
-</dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>flink-doris-connector-1.16</artifactId>
+ <version>1.4.0</version>
+</dependency>
```
-**Notes**
+**Remark**
-1. Please replace the corresponding Connector and Flink dependency versions
according to different Flink versions. Version 1.3.0 only supports Flink1.16
+1. Please replace the corresponding Connector and Flink dependent versions
according to different Flink versions.
2. You can also download the relevant version jar package from
[here](https://repo.maven.apache.org/maven2/org/apache/doris/).
-## How to use
-
-There are three ways to use Flink Doris Connector.
+### compile
-* SQL
-* DataStream
+When compiling, you can run `sh build.sh` directly. For details, please refer
to
[here](https://github.com/apache/doris-flink-connector/blob/master/README.md).
-### Parameters Configuration
+After the compilation is successful, the target jar package will be generated
in the `dist` directory, such as: `flink-doris-connector-1.5.0-SNAPSHOT.jar`.
+Copy this file to `classpath` of `Flink` to use `Flink-Doris-Connector`. For
example, `Flink` running in `Local` mode, put this file in the `lib/` folder.
`Flink` running in `Yarn` cluster mode, put this file into the pre-deployment
package.
-Flink Doris Connector Sink writes data to Doris by the `Stream load`, and also
supports the configurations of `Stream load`, For specific parameters, please
refer to [here](../data-operate/import/import-way/stream-load-manual.md).
+## Instructions
-* SQL configured by `sink.properties.` in the `WITH`
-* DataStream configured by
`DorisExecutionOptions.builder().setStreamLoadProp(Properties)`
+### read
-
-### SQL
-
-* Source
+####SQL
```sql
+-- doris source
CREATE TABLE flink_doris_source (
- name STRING,
- age INT,
- price DECIMAL(5,2),
- sale DOUBLE
- )
- WITH (
- 'connector' = 'doris',
- 'fenodes' = 'FE_IP:8030',
- 'table.identifier' = 'database.table',
- 'username' = 'root',
- 'password' = 'password'
+ name STRING,
+ age INT,
+ price DECIMAL(5,2),
+ sale DOUBLE
+ )
+ WITH (
+ 'connector' = 'doris',
+ 'fenodes' = 'FE_IP:8030',
+ 'table.identifier' = 'database.table',
+ 'username' = 'root',
+ 'password' = 'password'
);
```
-* Sink
+####DataStream
-```sql
--- enable checkpoint
-SET 'execution.checkpointing.interval' = '10s';
-CREATE TABLE flink_doris_sink (
- name STRING,
- age INT,
- price DECIMAL(5,2),
- sale DOUBLE
- )
- WITH (
- 'connector' = 'doris',
- 'fenodes' = 'FE_IP:8030',
- 'table.identifier' = 'db.table',
- 'username' = 'root',
- 'password' = 'password',
- 'sink.label-prefix' = 'doris_label'
-);
-```
+```java
+DorisOptions.Builder builder = DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("password");
-* Insert
+DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
+ .setDorisOptions(builder.build())
+ .setDorisReadOptions(DorisReadOptions.builder().build())
+ .setDeserializer(new SimpleListDeserializationSchema())
+ .build();
-```sql
-INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
+env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris
source").print();
```
-### DataStream
+### write
-* Source
+####SQL
-```java
-DorisOptions.Builder builder = DorisOptions.builder()
- .setFenodes("FE_IP:8030")
- .setTableIdentifier("db.table")
- .setUsername("root")
- .setPassword("password");
+```sql
+--enable checkpoint
+SET 'execution.checkpointing.interval' = '10s';
-DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
- .setDorisOptions(builder.build())
- .setDorisReadOptions(DorisReadOptions.builder().build())
- .setDeserializer(new SimpleListDeserializationSchema())
- .build();
+-- doris sink
+CREATE TABLE flink_doris_sink (
+ name STRING,
+ age INT,
+ price DECIMAL(5,2),
+ sale DOUBLE
+ )
+ WITH (
+ 'connector' = 'doris',
+ 'fenodes' = 'FE_IP:8030',
+ 'table.identifier' = 'db.table',
+ 'username' = 'root',
+ 'password' = 'password',
+ 'sink.label-prefix' = 'doris_label'
+);
-env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris
source").print();
+-- submit insert job
+INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
```
-* Sink
+####DataStream
-**String Stream**
+DorisSink writes data to Doris through StreamLoad, and DataStream supports
different serialization methods when writing
+
+**String data stream (SimpleStringSerializer)**
```java
// enable checkpoint
@@ -178,37 +157,29 @@ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:8030")
- .setTableIdentifier("db.table")
- .setUsername("root")
- .setPassword("password");
-
-Properties properties = new Properties();
-/**
-json format to streamload
-properties.setProperty("format", "json");
-properties.setProperty("read_json_by_line", "true");
-**/
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("password");
-DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
- .setStreamLoadProp(properties);
+ .setDeletable(false);
builder.setDorisReadOptions(DorisReadOptions.builder().build())
- .setDorisExecutionOptions(executionBuilder.build())
- .setSerializer(new SimpleStringSerializer()) //serialize according to
string
- .setDorisOptions(dorisBuilder.build());
-
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setSerializer(new SimpleStringSerializer()) //serialize according to
string
+ .setDorisOptions(dorisBuilder.build());
//mock string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
-DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
+DataStreamSource<Tuple2<String, Integer>> source = env. fromCollection(data);
source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" +
t.f1)
- .sinkTo(builder.build());
+ .sinkTo(builder.build());
```
-**RowData Stream**
+**RowData data stream (RowDataSerializer)**
```java
// enable checkpoint
@@ -220,105 +191,163 @@ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DorisSink.Builder<RowData> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:8030")
- .setTableIdentifier("db.table")
- .setUsername("root")
- .setPassword("password");
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("password");
// json format to streamload
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
-DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
- .setStreamLoadProp(properties); //streamload params
+ .setDeletable(false)
+ .setStreamLoadProp(properties); //streamload params
-//flink rowdata‘s schema
+//flink rowdata's schema
String[] fields = {"city", "longitude", "latitude", "destroy_date"};
DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(),
DataTypes.DOUBLE(), DataTypes.DATE()};
builder.setDorisReadOptions(DorisReadOptions.builder().build())
- .setDorisExecutionOptions(executionBuilder.build())
- .setSerializer(RowDataSerializer.builder() //serialize according to
rowdata
- .setFieldNames(fields)
- .setType("json") //json format
- .setFieldType(types).build())
- .setDorisOptions(dorisBuilder.build());
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setSerializer(RowDataSerializer.builder() //serialize according to
rowdata
+ .setFieldNames(fields)
+ .setType("json") //json format
+ .setFieldType(types).build())
+ .setDorisOptions(dorisBuilder.build());
//mock rowdata source
-DataStream<RowData> source = env.fromElements("")
- .map(new MapFunction<String, RowData>() {
- @Override
- public RowData map(String value) throws Exception {
- GenericRowData genericRowData = new GenericRowData(4);
- genericRowData.setField(0, StringData.fromString("beijing"));
- genericRowData.setField(1, 116.405419);
- genericRowData.setField(2, 39.916927);
- genericRowData.setField(3, LocalDate.now().toEpochDay());
- return genericRowData;
- }
- });
-
-source.sinkTo(builder.build());
+DataStream<RowData> source = env. fromElements("")
+ .map(new MapFunction<String, RowData>() {
+ @Override
+ public RowData map(String value) throws Exception {
+ GenericRowData genericRowData = new GenericRowData(4);
+ genericRowData.setField(0, StringData.fromString("beijing"));
+ genericRowData.setField(1, 116.405419);
+ genericRowData.setField(2, 39.916927);
+ genericRowData.setField(3, LocalDate.now().toEpochDay());
+ return genericRowData;
+ }
+ });
+
+source. sinkTo(builder. build());
```
-**SchemaChange Stream**
+**SchemaChange data stream (JsonDebeziumSchemaSerializer)**
+
```java
// enable checkpoint
env.enableCheckpointing(10000);
Properties props = new Properties();
-props.setProperty("format", "json");
+props. setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
-DorisOptions dorisOptions = DorisOptions.builder()
- .setFenodes("127.0.0.1:8030")
- .setTableIdentifier("test.t1")
- .setUsername("root")
- .setPassword("").build();
+DorisOptions dorisOptions = DorisOptions. builder()
+ .setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("test.t1")
+ .setUsername("root")
+ .setPassword("").build();
-DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
-executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID())
- .setStreamLoadProp(props).setDeletable(true);
+DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+executionBuilder.setLabelPrefix("label-prefix")
+ .setStreamLoadProp(props).setDeletable(true);
DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
- .setDorisExecutionOptions(executionBuilder.build())
- .setDorisOptions(dorisOptions)
-
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
+ .setDorisExecutionOptions(executionBuilder.build())
+ .setDorisOptions(dorisOptions)
+
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
-env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL
Source")//.print();
- .sinkTo(builder.build());
+env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
+ .sinkTo(builder.build());
```
-refer:
[CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java)
-
-
-### General
-
-| Key | Default Value | Required | Comment
|
-| -------------------------------- | ----------------- |
------------------------------------------------------------ |
-------------------------------- |
-| fenodes | -- | Y | Doris FE
http address, support multiple addresses, separated by commas |
-| table.identifier | -- | Y | Doris
table identifier, eg, db1.tbl1 |
-| username | -- | Y | Doris
username |
-| password | -- | Y | Doris
password |
-| doris.request.retries | 3 | N |
Number of retries to send requests to Doris |
-| doris.request.connect.timeout.ms | 30000 | N |
Connection timeout for sending requests to Doris
|
-| doris.request.read.timeout.ms | 30000 | N | Read
timeout for sending request to Doris |
-| doris.request.query.timeout.s | 3600 | N | Query
the timeout time of doris, the default is 1 hour, -1 means no timeout limit
|
-| doris.request.tablet.size | Integer.MAX_VALUE | N | The number of
Doris Tablets corresponding to an Partition. The smaller this value is set, the
more partitions will be generated. This will increase the parallelism on the
flink side, but at the same time will cause greater pressure on Doris. |
-| doris.batch.size | 1024 | N | The
maximum number of rows to read data from BE at one time. Increasing this value
can reduce the number of connections between Flink and Doris. Thereby reducing
the extra time overhead caused by network delay. |
-| doris.exec.mem.limit | 2147483648 | N | Memory
limit for a single query. The default is 2GB, in bytes. |
-| doris.deserialize.arrow.async | false | N |
Whether to support asynchronous conversion of Arrow format to RowBatch required
for flink-doris-connector iteration |
-| doris.deserialize.queue.size | 64 | N |
Asynchronous conversion of the internal processing queue in Arrow format takes
effect when doris.deserialize.arrow.async is true |
-| doris.read.field | -- | N | List of column
names in the Doris table, separated by commas |
-| doris.filter.query | -- | N | Filter
expression of the query, which is transparently transmitted to Doris. Doris
uses this expression to complete source-side data filtering. |
-| sink.label-prefix | -- | Y | The label prefix used by stream load imports.
In the 2pc scenario, global uniqueness is required to ensure the EOS semantics
of Flink. |
-| sink.properties.* | -- | N | The stream load
parameters.<br /> <br /> eg:<br /> sink.properties.column_separator' = ','<br
/> <br /> Setting 'sink.properties.escape_delimiters' = 'true' if you want to
use a control char as a separator, so that such as '\\x01' will translate to
binary 0x01<br /><br />Support JSON format import, you need to enable both
'sink.properties.format' ='json' and 'sink.properties.read_json_by_line'
='true' |
-| sink.enable-delete | true | N | Whether to
enable deletion. This option requires Doris table to enable batch delete
function (0.15+ version is enabled by default), and only supports Uniq model.|
-| sink.enable-2pc | true | N | Whether to
enable two-phase commit (2pc), the default is true, to ensure Exactly-Once
semantics. For two-phase commit, please refer to
[here](../data-operate/import/import-way/stream-load-manual.md). |
-| sink.max-retries | 3 | N | In the 2pc
scenario, the number of retries after the commit phase fails.
|
-| sink.buffer-size | 1048576(1MB) | N | Write
data cache buffer size, in bytes. It is not recommended to modify, the default
configuration is sufficient.
|
-| sink.buffer-count | 3 | N | The
number of write data cache buffers, it is not recommended to modify, the
default configuration is sufficient.
+Reference:
[CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java)
+### Lookup Join
+
+```sql
+CREATE TABLE fact_table (
+ `id` BIGINT,
+ `name` STRING,
+ `city` STRING,
+ `process_time` as proctime()
+) WITH (
+ 'connector' = 'kafka',
+ ...
+);
+
+create table dim_city(
+ `city` STRING,
+ `level` INT ,
+ `province` STRING,
+ `country` STRING
+) WITH (
+ 'connector' = 'doris',
+ 'fenodes' = '127.0.0.1:8030',
+ 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
+ 'table.identifier' = 'dim.dim_city',
+ 'username' = 'root',
+ 'password' = ''
+);
+
+SELECT a.id, a.name, a.city, c.province, c.country,c.level
+FROM fact_table a
+LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
+ON a.city = c.city
+```
+
+## configuration
+
+### General configuration items
+
+| Key | Default Value | Required | Comment
|
+| -------------------------------- | ------------- | -------- |
------------------------------------------------------------ |
+| fenodes | -- | Y | Doris FE http
address, multiple addresses are supported, separated by commas |
+| table.identifier | -- | Y | Doris table
name, such as: db.tbl |
+| username | -- | Y | username to
access Doris |
+| password | -- | Y | Password to
access Doris |
+| doris.request.retries | 3 | N | Number of
retries to send requests to Doris |
+| doris.request.connect.timeout.ms | 30000 | N | Connection
timeout for sending requests to Doris |
+| doris.request.read.timeout.ms | 30000 | N | Read timeout
for sending requests to Doris |
+
+### Source configuration item
+
+| Key | Default Value | Required | Comment
|
+| ----------------------------- | ------------------ | -------- |
------------------------------------------------------------ |
+| doris.request.query.timeout.s | 3600 | N | The timeout
time for querying Doris, the default value is 1 hour, -1 means no timeout limit
|
+| doris.request.tablet.size | Integer. MAX_VALUE | N | The number
of Doris Tablets corresponding to a Partition. The smaller this value is set,
the more Partitions will be generated. This improves the parallelism on the
Flink side, but at the same time puts more pressure on Doris. |
+| doris.batch.size | 1024 | N | The maximum
number of rows to read data from BE at a time. Increasing this value reduces
the number of connections established between Flink and Doris. Thereby reducing
the additional time overhead caused by network delay. |
+| doris.exec.mem.limit | 2147483648 | N | Memory limit
for a single query. The default is 2GB, in bytes |
+| doris.deserialize.arrow.async | FALSE | N | Whether to
support asynchronous conversion of Arrow format to RowBatch needed for
flink-doris-connector iterations |
+| doris.deserialize.queue.size | 64 | N | Asynchronous
conversion of internal processing queue in Arrow format, effective when
doris.deserialize.arrow.async is true |
+| doris.read.field | -- | N | Read the
list of column names of the Doris table, separated by commas |
+| doris.filter.query | -- | N | The
expression to filter the read data, this expression is transparently passed to
Doris. Doris uses this expression to complete source-side data filtering. For
example age=18. |
+
+### Sink configuration items
+
+| Key | Default Value | Required | Comment
|
+| ------------------ | ------------- | -------- |
------------------------------------------------------------ |
+| sink.label-prefix | -- | Y | The label prefix used by
Stream load import. In the 2pc scenario, global uniqueness is required to
ensure Flink's EOS semantics. |
+| sink.properties.* | -- | N | Import parameters for Stream
Load. <br/>For example: 'sink.properties.column_separator' = ', ' defines
column delimiters, 'sink.properties.escape_delimiters' = 'true' special
characters as delimiters, '\x01' will be converted to binary 0x01
<br/><br/>JSON format import<br/>'sink.properties.format' = 'json'
'sink.properties. read_json_by_line' = 'true'<br/>Detailed parameters refer to
[here](../data-operate/import/import-way/stream-load-ma [...]
+| sink.enable-delete | TRUE | N | Whether to enable delete.
This option requires the Doris table to enable the batch delete function (Doris
0.15+ version is enabled by default), and only supports the Unique model. |
+| sink.enable-2pc | TRUE | N | Whether to enable two-phase
commit (2pc), the default is true, to ensure Exactly-Once semantics. For
two-phase commit, please refer to
[here](../data-operate/import/import-way/stream-load-manual.md). |
+| sink.buffer-size | 1MB | N | The size of the write data
cache buffer, in bytes. It is not recommended to modify, the default
configuration is enough |
+| sink.buffer-count | 3 | N | The number of write data
buffers. It is not recommended to modify, the default configuration is enough |
+| sink.max-retries | 3 | N | Maximum number of retries
after Commit failure, default 3 |
+
+### Lookup Join configuration item
+
+| Key | Default Value | Required | Comment
|
+| --------------------------------- | ------------- | -------- |
------------------------------------------------------------ |
+| jdbc-url | -- | Y | jdbc
connection information |
+| lookup.cache.max-rows | -1 | N | The maximum
number of rows in the lookup cache, the default value is -1, and the cache is
not enabled |
+| lookup.cache.ttl | 10s | N | The maximum
time of lookup cache, the default is 10s |
+| lookup.max-retries | 1 | N | The number of
retries after a lookup query fails |
+| lookup.jdbc.async | false | N | Whether to
enable asynchronous lookup, the default is false |
+| lookup.jdbc.read.batch.size | 128 | N | Under
asynchronous lookup, the maximum batch size for each query |
+| lookup.jdbc.read.batch.queue-size | 256 | N | The size of
the intermediate buffer queue during asynchronous lookup |
+| lookup.jdbc.read.thread-size | 3 | N | The number of
jdbc threads for lookup in each task |
## Doris & Flink Column Type Mapping
@@ -342,7 +371,7 @@ refer:
[CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/
| TIME | DOUBLE |
| HLL | Unsupported datatype |
-## An example of using Flink CDC to access Doris (supports
insert/update/delete events)
+## An example of using Flink CDC to access Doris
```sql
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE cdc_mysql_source (
@@ -359,7 +388,7 @@ CREATE TABLE cdc_mysql_source (
'table-name' = 'table'
);
--- Support delete event synchronization (sink.enable-delete='true'), requires
Doris table to enable batch delete function
+-- Support synchronous insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
@@ -372,20 +401,22 @@ WITH (
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
- 'sink.enable-delete' = 'true',
+ 'sink.enable-delete' = 'true', -- Synchronize delete events
'sink.label-prefix' = 'doris_label'
);
insert into doris_sink select id,name from cdc_mysql_source;
```
-## Use Flink CDC to access multi-table or database
+## Use FlinkCDC to access multi-table or whole database example
+
### grammar
-```
+
+```shell
<FLINK_HOME>/bin/flink run \
- -c org.apache.doris.flink.tools.cdc.CdcTools\
- lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
- mysql-sync-database \
+ -c org.apache.doris.flink.tools.cdc.CdcTools \
+ lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar\
+ <mysql-sync-database|oracle-sync-database> \
--database <doris-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <doris-table-prefix>] \
@@ -393,6 +424,7 @@ insert into doris_sink select id,name from cdc_mysql_source;
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf
<mysql-cdc-source-conf> ...] \
+ --oracle-conf <oracle-cdc-source-conf> [--oracle-conf
<oracle-cdc-source-conf> ...] \
--sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
[--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
```
@@ -403,19 +435,21 @@ insert into doris_sink select id,name from
cdc_mysql_source;
- **--table-suffix** Same as above, the suffix name of the Doris table.
- **--including-tables** MySQL tables that need to be synchronized, you can
use "|" to separate multiple tables, and support regular expressions. For
example --including-tables table1|tbl.* is to synchronize table1 and all tables
beginning with tbl.
- **--excluding-tables** Tables that do not need to be synchronized, the usage
is the same as above.
-- **--mysql-conf** MySQL CDCSource configuration, for example --mysql-conf
hostname=127.0.0.1 , you can find it in
[here](https://ververica.github.io/flink-cdc-connectors/master
/content/connectors/mysql-cdc.html) to view all configurations of MySQL-CDC,
where hostname/username/password/database-name are required.
-- **--sink-conf** All configurations of Doris Sink, you can view the complete
configuration items
[here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9).
+- **--mysql-conf** MySQL CDCSource configuration, eg --mysql-conf
hostname=127.0.0.1 , you can see all configuration MySQL-CDC in
[here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html),
where hostname/username/password/database-name is required.
+- **--oracle-conf** Oracle CDCSource configuration, for example --oracle-conf
hostname=127.0.0.1, you can view all configurations of Oracle-CDC in
[here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html),
where hostname/username/password/database-name/schema-name is required.
+- **--sink-conf** All configurations of Doris Sink, you can view the complete
configuration items in
[here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9).
- **--table-conf** The configuration item of the Doris table, that is, the
content contained in properties. For example --table-conf replication_num=1
->Note: flink-sql-connector-mysql-cdc-2.3.0.jar needs to be added in the
$FLINK_HOME/lib directory
+>Note: When synchronizing, you need to add the corresponding Flink CDC
dependencies in the $FLINK_HOME/lib directory, such as
flink-sql-connector-mysql-cdc-${version}.jar,
flink-sql-connector-oracle-cdc-${version}.jar
-### Example
-```
+### MySQL synchronization example
+
+```shell
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s\
-Dparallelism.default=1\
-c org.apache.doris.flink.tools.cdc.CdcTools\
- lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
+ lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
mysql-sync-database\
--database test_db \
--mysql-conf hostname=127.0.0.1 \
@@ -431,7 +465,33 @@ insert into doris_sink select id,name from
cdc_mysql_source;
--table-conf replication_num=1
```
+### Oracle synchronization example
+
+```shell
+<FLINK_HOME>/bin/flink run \
+ -Dexecution.checkpointing.interval=10s \
+ -Dparallelism.default=1 \
+ -c org.apache.doris.flink.tools.cdc.CdcTools \
+ ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\
+ oracle-sync-database \
+ --database test_db \
+ --oracle-conf hostname=127.0.0.1 \
+ --oracle-conf port=1521 \
+ --oracle-conf username=admin \
+ --oracle-conf password="password" \
+ --oracle-conf database-name=XE \
+ --oracle-conf schema-name=ADMIN \
+ --including-tables "tbl1|tbl2" \
+ --sink-conf fenodes=127.0.0.1:8030 \
+ --sink-conf username=root \
+ --sink-conf password=\
+ --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
+ --sink-conf sink.label-prefix=label \
+ --table-conf replication_num=1
+```
+
## Use FlinkCDC to update Key column
+
Generally, in a business database, the number is used as the primary key of
the table, such as the Student table, the number (id) is used as the primary
key, but with the development of the business, the number corresponding to the
data may change.
In this scenario, using FlinkCDC + Doris Connector to synchronize data can
automatically update the data in the Doris primary key column.
### Principle
@@ -537,6 +597,8 @@ At this time, it cannot be started from the checkpoint, and
the expiration time
This is because the concurrent import of the same library exceeds 100, which
can be solved by adjusting the parameter `max_running_txn_num_per_db` of
fe.conf. For details, please refer to
[max_running_txn_num_per_db](https://doris.apache.org/zh-CN/docs/dev/admin-manual/config/fe-config/#max_running_txn_num_per_db)
+At the same time, if a task frequently modifies the label and restarts, it may
also cause this error. In the 2pc scenario (Duplicate/Aggregate model), the
label of each task needs to be unique, and when restarting from the checkpoint,
the Flink task will actively abort the txn that has been successfully
precommitted before and has not been committed. Frequently modifying the label
and restarting will cause a large number of txn that have successfully
precommitted to fail to be aborted, o [...]
+
7. **How to ensure the order of a batch of data when Flink writes to the Uniq
model?**
You can add sequence column configuration to ensure that, for details, please
refer to
[sequence](https://doris.apache.org/zh-CN/docs/dev/data-operate/update-delete/sequence-column-manual)
diff --git a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
index a5f7b2977f..ad1947aaee 100644
--- a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
+++ b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
@@ -30,12 +30,7 @@ under the License.
-
-Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。
-
-代码库地址:https://github.com/apache/doris-flink-connector
-
-* 可以将 `Doris` 表映射为 `DataStream` 或者 `Table`。
+[Flink Doris Connector](https://github.com/apache/doris-flink-connector)
可以支持通过 Flink 操作(读取、插入、修改、删除) Doris
中存储的数据。本文档介绍如何通过Flink如果通过Datastream和SQL操作Doris。
>**注意:**
>
@@ -52,21 +47,9 @@ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改
| 1.3.0 | 1.16 | 1.0+ | 8 | - |
| 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- |
-## 编译与安装
-
-准备工作
-
-1. 修改`custom_env.sh.tpl`文件,重命名为`custom_env.sh`
+## 使用
-2. 在源码目录下执行:
-`sh build.sh`
-根据提示输入你需要的 flink 版本进行编译。
-
-编译成功后,会在 `dist` 目录生成目标jar包,如:`flink-doris-connector-1.3.0-SNAPSHOT.jar`。
-将此文件复制到 `Flink` 的 `classpath` 中即可使用 `Flink-Doris-Connector` 。例如, `Local` 模式运行的
`Flink` ,将此文件放入 `lib/` 文件夹下。 `Yarn` 集群模式运行的 `Flink` ,则将此文件放入预部署包中。
-
-
-## 使用 Maven 管理
+### Maven
添加 flink-doris-connector
@@ -75,7 +58,7 @@ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
- <version>1.3.0</version>
+ <version>1.4.0</version>
</dependency>
```
@@ -85,25 +68,21 @@ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改
2.也可从[这里](https://repo.maven.apache.org/maven2/org/apache/doris/)下载相关版本jar包。
-## 使用方法
-
-Flink 读写 Doris 数据主要有两种方式
-
-* SQL
-* DataStream
+### 编译
-### 参数配置
+编译时,可直接运行`sh
build.sh`,具体可参考[这里](https://github.com/apache/doris-flink-connector/blob/master/README.md)。
-Flink Doris Connector Sink 的内部实现是通过 `Stream Load` 服务向 Doris 写入数据, 同时也支持
`Stream Load`
请求参数的配置设置,具体参数可参考[这里](../data-operate/import/import-way/stream-load-manual.md),配置方法如下:
+编译成功后,会在 `dist` 目录生成目标jar包,如:`flink-doris-connector-1.5.0-SNAPSHOT.jar`。
+将此文件复制到 `Flink` 的 `classpath` 中即可使用 `Flink-Doris-Connector` 。例如, `Local` 模式运行的
`Flink` ,将此文件放入 `lib/` 文件夹下。 `Yarn` 集群模式运行的 `Flink` ,则将此文件放入预部署包中。
-* SQL 使用 `WITH` 参数 `sink.properties.` 配置
-* DataStream
使用方法`DorisExecutionOptions.builder().setStreamLoadProp(Properties)`配置
+## 使用方法
-### SQL
+### 读取
-* Source
+#### SQL
```sql
+-- doris source
CREATE TABLE flink_doris_source (
name STRING,
age INT,
@@ -119,11 +98,33 @@ CREATE TABLE flink_doris_source (
);
```
-* Sink
+#### DataStream
+
+```java
+DorisOptions.Builder builder = DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("password");
+
+DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
+ .setDorisOptions(builder.build())
+ .setDorisReadOptions(DorisReadOptions.builder().build())
+ .setDeserializer(new SimpleListDeserializationSchema())
+ .build();
+
+env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris
source").print();
+```
+
+### 写入
+
+#### SQL
```sql
-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';
+
+-- doris sink
CREATE TABLE flink_doris_sink (
name STRING,
age INT,
@@ -138,37 +139,16 @@ CREATE TABLE flink_doris_sink (
'password' = 'password',
'sink.label-prefix' = 'doris_label'
);
-```
-
-* Insert
-```sql
+-- submit insert job
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
```
-### DataStream
-
-* Source
-
-```java
-DorisOptions.Builder builder = DorisOptions.builder()
- .setFenodes("FE_IP:8030")
- .setTableIdentifier("db.table")
- .setUsername("root")
- .setPassword("password");
-
-DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
- .setDorisOptions(builder.build())
- .setDorisReadOptions(DorisReadOptions.builder().build())
- .setDeserializer(new SimpleListDeserializationSchema())
- .build();
-
-env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris
source").print();
-```
+#### DataStream
-* Sink
+DorisSink是通过StreamLoad想Doris写入数据,DataStream写入时,支持不同的序列化方法
-**String 数据流**
+**String 数据流(SimpleStringSerializer)**
```java
// enable checkpoint
@@ -183,16 +163,15 @@ dorisBuilder.setFenodes("FE_IP:8030")
.setUsername("root")
.setPassword("password");
-
DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
-executionBuilder.setLabelPrefix("label-doris"); //streamload label prefix
+executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
+ .setDeletable(false);
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer()) //serialize according to
string
.setDorisOptions(dorisBuilder.build());
-
//mock string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
@@ -202,7 +181,7 @@ source.map((MapFunction<Tuple2<String, Integer>, String>) t
-> t.f0 + "\t" + t.f
.sinkTo(builder.build());
```
-**RowData 数据流**
+**RowData 数据流(RowDataSerializer)**
```java
// enable checkpoint
@@ -224,6 +203,7 @@ properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
+ .setDeletable(false)
.setStreamLoadProp(properties); //streamload params
//flink rowdata‘s schema
@@ -255,7 +235,8 @@ DataStream<RowData> source = env.fromElements("")
source.sinkTo(builder.build());
```
-**SchemaChange 数据流**
+**SchemaChange 数据流(JsonDebeziumSchemaSerializer)**
+
```java
// enable checkpoint
env.enableCheckpointing(10000);
@@ -270,7 +251,7 @@ DorisOptions dorisOptions = DorisOptions.builder()
.setPassword("").build();
DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
-executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID())
+executionBuilder.setLabelPrefix("label-prefix")
.setStreamLoadProp(props).setDeletable(true);
DorisSink.Builder<String> builder = DorisSink.builder();
@@ -279,40 +260,96 @@
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisOptions(dorisOptions)
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
-env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL
Source")//.print();
+env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.sinkTo(builder.build());
```
参考:
[CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java)
+### Lookup Join
+
+```sql
+CREATE TABLE fact_table (
+ `id` BIGINT,
+ `name` STRING,
+ `city` STRING,
+ `process_time` as proctime()
+) WITH (
+ 'connector' = 'kafka',
+ ...
+);
+
+create table dim_city(
+ `city` STRING,
+ `level` INT ,
+ `province` STRING,
+ `country` STRING
+) WITH (
+ 'connector' = 'doris',
+ 'fenodes' = '127.0.0.1:8030',
+ 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
+ 'table.identifier' = 'dim.dim_city',
+ 'username' = 'root',
+ 'password' = ''
+);
+
+SELECT a.id, a.name, a.city, c.province, c.country,c.level
+FROM fact_table a
+LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
+ON a.city = c.city
+```
+
## 配置
### 通用配置项
-| Key | Default Value | Required | Comment
|
-| -------------------------------- | ------------------ | -------- |
------------------------------------------------------------ |
-| fenodes | -- | Y | Doris FE
http 地址 |
-| table.identifier | -- | Y | Doris
表名,如:db.tbl |
-| username | -- | Y | 访问 Doris
的用户名 |
-| password | -- | Y | 访问 Doris
的密码 |
-| doris.request.retries | 3 | N | 向 Doris
发送请求的重试次数 |
-| doris.request.connect.timeout.ms | 30000 | N | 向 Doris
发送请求的连接超时时间 |
-| doris.request.read.timeout.ms | 30000 | N | 向 Doris
发送请求的读取超时时间 |
-| doris.request.query.timeout.s | 3600 | N | 查询 Doris
的超时时间,默认值为1小时,-1表示无超时限制 |
-| doris.request.tablet.size | Integer. MAX_VALUE | N | 一个
Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对
Doris 造成更大的压力。 |
-| doris.batch.size | 1024 | N | 一次从 BE
读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 |
-| doris.exec.mem.limit | 2147483648 | N |
单个查询的内存限制。默认为 2GB,单位为字节 |
-| doris.deserialize.arrow.async | FALSE | N | 是否支持异步转换
Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch |
-| doris.deserialize.queue.size | 64 | N | 异步转换
Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
-| doris.read.field | -- | N | 读取 Doris
表的列名列表,多列之间使用逗号分隔 |
-| doris.filter.query | -- | N |
过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。 |
-| sink.label-prefix | -- | Y | Stream
load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。 |
-| sink.properties.* | -- | N | Stream
Load 的导入参数。<br/>例如: 'sink.properties.column_separator' = ', ' 定义列分隔符,
'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01
<br/><br/>JSON格式导入<br/>'sink.properties.format' = 'json'
'sink.properties.read_json_by_line' = 'true' |
-| sink.enable-delete | TRUE | N |
是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。 |
-| sink.enable-2pc | TRUE | N |
是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考[这里](../data-operate/import/import-way/stream-load-manual.md)。
|
-| sink.buffer-size | 1MB | N |
写数据缓存buffer大小,单位字节。不建议修改,默认配置即可 |
-| sink.buffer-count | 3 | N |
写数据缓存buffer个数。不建议修改,默认配置即可 |
-| sink.max-retries | 3 | N |
Commit失败后的最大重试次数,默认3次 |
+| Key | Default Value | Required | Comment
|
+| -------------------------------- | ------------- | -------- |
----------------------------------------------- |
+| fenodes | -- | Y | Doris FE http
地址, 支持多个地址,使用逗号分隔 |
+| table.identifier | -- | Y | Doris
表名,如:db.tbl |
+| username | -- | Y | 访问 Doris 的用户名
|
+| password | -- | Y | 访问 Doris 的密码
|
+| doris.request.retries | 3 | N | 向 Doris
发送请求的重试次数 |
+| doris.request.connect.timeout.ms | 30000 | N | 向 Doris
发送请求的连接超时时间 |
+| doris.request.read.timeout.ms | 30000 | N | 向 Doris
发送请求的读取超时时间 |
+
+### Source 配置项
+
+| Key | Default Value | Required | Comment
|
+| ----------------------------- | ------------------ | -------- |
------------------------------------------------------------ |
+| doris.request.query.timeout.s | 3600 | N | 查询 Doris
的超时时间,默认值为1小时,-1表示无超时限制 |
+| doris.request.tablet.size | Integer. MAX_VALUE | N | 一个 Partition
对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris
造成更大的压力。 |
+| doris.batch.size | 1024 | N | 一次从 BE
读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 |
+| doris.exec.mem.limit | 2147483648 | N |
单个查询的内存限制。默认为 2GB,单位为字节 |
+| doris.deserialize.arrow.async | FALSE | N | 是否支持异步转换
Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch |
+| doris.deserialize.queue.size | 64 | N | 异步转换 Arrow
格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
+| doris.read.field | -- | N | 读取 Doris
表的列名列表,多列之间使用逗号分隔 |
+| doris.filter.query | -- | N |
过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。比如 age=18。 |
+
+### Sink 配置项
+
+| Key | Default Value | Required | Comment
|
+| ------------------ | ------------- | -------- |
------------------------------------------------------------ |
+| sink.label-prefix | -- | Y | Stream
load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。 |
+| sink.properties.* | -- | N | Stream Load 的导入参数。<br/>例如:
'sink.properties.column_separator' = ', ' 定义列分隔符,
'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01
<br/><br/>JSON格式导入<br/>'sink.properties.format' = 'json'
'sink.properties.read_json_by_line' =
'true'<br/>详细参数参考[这里](../data-operate/import/import-way/stream-load-manual.md)。
|
+| sink.enable-delete | TRUE | N | 是否启用删除。此选项需要 Doris
表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。 |
+| sink.enable-2pc | TRUE | N |
是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考[这里](../data-operate/import/import-way/stream-load-manual.md)。
|
+| sink.buffer-size | 1MB | N |
写数据缓存buffer大小,单位字节。不建议修改,默认配置即可 |
+| sink.buffer-count | 3 | N | 写数据缓存buffer个数。不建议修改,默认配置即可
|
+| sink.max-retries | 3 | N | Commit失败后的最大重试次数,默认3次
|
+
+### Lookup Join 配置项
+
+| Key | Default Value | Required | Comment
|
+| --------------------------------- | ------------- | -------- |
------------------------------------------ |
+| jdbc-url | -- | Y | jdbc连接信息
|
+| lookup.cache.max-rows | -1 | N |
lookup缓存的最大行数,默认值-1,不开启缓存 |
+| lookup.cache.ttl | 10s | N |
lookup缓存的最大时间,默认10s |
+| lookup.max-retries | 1 | N |
lookup查询失败后的重试次数 |
+| lookup.jdbc.async | false | N |
是否开启异步的lookup,默认false |
+| lookup.jdbc.read.batch.size | 128 | N |
异步lookup下,每次查询的最大批次大小 |
+| lookup.jdbc.read.batch.queue-size | 256 | N |
异步lookup时,中间缓冲队列的大小 |
+| lookup.jdbc.read.thread-size | 3 | N |
每个task中lookup的jdbc线程数 |
## Doris 和 Flink 列类型映射关系
@@ -336,7 +373,7 @@ env.fromSource(mySqlSource,
WatermarkStrategy.noWatermarks(), "MySQL Source")//.
| TIME | DOUBLE |
| HLL | Unsupported datatype |
-## 使用FlinkSQL通过CDC接入Doris示例(支持Insert/Update/Delete事件)
+## 使用FlinkSQL通过CDC接入Doris示例
```sql
-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';
@@ -355,7 +392,7 @@ CREATE TABLE cdc_mysql_source (
'table-name' = 'table'
);
--- 支持删除事件同步(sink.enable-delete='true'),需要 Doris 表开启批量删除功能
+-- 支持同步insert/update/delete事件
CREATE TABLE doris_sink (
id INT,
name STRING
@@ -368,7 +405,7 @@ WITH (
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
- 'sink.enable-delete' = 'true',
+ 'sink.enable-delete' = 'true', -- 同步删除事件
'sink.label-prefix' = 'doris_label'
);
@@ -377,11 +414,11 @@ insert into doris_sink select id,name from
cdc_mysql_source;
## 使用FlinkCDC接入多表或整库示例
### 语法
-```
+```shell
<FLINK_HOME>/bin/flink run \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
- mysql-sync-database \
+ <mysql-sync-database|oracle-sync-database> \
--database <doris-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <doris-table-prefix>] \
@@ -389,6 +426,7 @@ insert into doris_sink select id,name from cdc_mysql_source;
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf>
...] \
+ --oracle-conf <oracle-cdc-source-conf> [--oracle-conf
<oracle-cdc-source-conf> ...] \
--sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
[--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
```
@@ -400,13 +438,14 @@ insert into doris_sink select id,name from
cdc_mysql_source;
- **--including-tables** 需要同步的MySQL表,可以使用"|" 分隔多个表,并支持正则表达式。
比如--including-tables table1|tbl.*就是同步table1和所有以tbl开头的表。
- **--excluding-tables** 不需要同步的表,用法同上。
- **--mysql-conf** MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1
,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html)查看所有配置MySQL-CDC,其中hostname/username/password/database-name
是必需的。
+- **--oracle-conf** Oracle CDCSource 配置,例如--oracle-conf hostname=127.0.0.1
,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html)查看所有配置Oracle-CDC,其中hostname/username/password/database-name/schema-name
是必需的。
- **--sink-conf** Doris Sink
的所有配置,可以在[这里](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9)查看完整的配置项。
- **--table-conf** Doris表的配置项,即properties中包含的内容。 例如 --table-conf
replication_num=1
->注:需要在$FLINK_HOME/lib 目录下添加flink-sql-connector-mysql-cdc-2.3.0.jar
+>注:同步时需要在$FLINK_HOME/lib 目录下添加对应的Flink CDC依赖,比如
flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar
-### 示例
-```
+### MySQL同步示例
+```shell
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
@@ -427,8 +466,33 @@ insert into doris_sink select id,name from
cdc_mysql_source;
--table-conf replication_num=1
```
+### Oracle同步示例
+
+```shell
+<FLINK_HOME>/bin/flink run \
+ -Dexecution.checkpointing.interval=10s \
+ -Dparallelism.default=1 \
+ -c org.apache.doris.flink.tools.cdc.CdcTools \
+ ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
+ oracle-sync-database \
+ --database test_db \
+ --oracle-conf hostname=127.0.0.1 \
+ --oracle-conf port=1521 \
+ --oracle-conf username=admin \
+ --oracle-conf password="password" \
+ --oracle-conf database-name=XE \
+ --oracle-conf schema-name=ADMIN \
+ --including-tables "tbl1|tbl2" \
+ --sink-conf fenodes=127.0.0.1:8030 \
+ --sink-conf username=root \
+ --sink-conf password=\
+ --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
+ --sink-conf sink.label-prefix=label \
+ --table-conf replication_num=1
+```
## 使用FlinkCDC更新Key列
+
一般在业务数据库中,会使用编号来作为表的主键,比如Student表,会使用编号(id)来作为主键,但是随着业务的发展,数据对应的编号有可能是会发生变化的。
在这种场景下,使用FlinkCDC + Doris Connector同步数据,便可以自动更新Doris主键列的数据。
### 原理
@@ -532,7 +596,9 @@ Exactly-Once场景下,Flink Job重启时必须从最新的Checkpoint/Savepoint
6. **errCode = 2, detailMessage = current running txns on db 10006 is 100,
larger than limit 100**
-这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数 `max_running_txn_num_per_db` 来解决。具体可参考
[max_running_txn_num_per_db](https://doris.apache.org/zh-CN/docs/dev/admin-manual/config/fe-config/#max_running_txn_num_per_db)
+这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数 `max_running_txn_num_per_db` 来解决,具体可参考
[max_running_txn_num_per_db](https://doris.apache.org/zh-CN/docs/dev/admin-manual/config/fe-config/#max_running_txn_num_per_db)。
+
+同时,一个任务频繁修改label重启,也可能会导致这个错误。2pc场景下(Duplicate/Aggregate模型),每个任务的label需要唯一,并且从checkpoint重启时,flink任务才会主动abort掉之前已经precommit成功,没有commit的txn,频繁修改label重启,会导致大量precommit成功的txn无法被abort,占用事务。在Unique模型下也可关闭2pc,可以实现幂等写入。
7. **Flink写入Uniq模型时,如何保证一批数据的有序性?**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]