This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
The following commit(s) were added to refs/heads/main by this push:
new 6f5ef7c [FLINK-37527] Add `KuduSource` documentation
6f5ef7c is described below
commit 6f5ef7c7b559ab9d5aa612974283b7698fbc868c
Author: Ferenc Csaky <[email protected]>
AuthorDate: Fri Mar 21 10:42:38 2025 +0100
[FLINK-37527] Add `KuduSource` documentation
---
docs/content/docs/connectors/datastream/kudu.md | 105 ++++++++++++++++++------
1 file changed, 80 insertions(+), 25 deletions(-)
diff --git a/docs/content/docs/connectors/datastream/kudu.md
b/docs/content/docs/connectors/datastream/kudu.md
index 556822c..e479340 100644
--- a/docs/content/docs/connectors/datastream/kudu.md
+++ b/docs/content/docs/connectors/datastream/kudu.md
@@ -44,10 +44,11 @@ The current version of the connector is built with Kudu
client version **1.17.1*
## Reading from Kudu
-There are two specific ways of reading a Kudu table into a `DataStream`:
+The connector provides the following ways of reading a Kudu table into a
`DataStream`:
1. Using the `KuduCatalog` and Table API programmatically.
-2. Using the `KuduRowInputFormat` directly.
+2. Using the `KuduSource` class.
+3. Using the `KuduRowInputFormat` directly.
### Kudu Catalog
@@ -70,6 +71,56 @@ Table table = tEnv.sqlQuery("SELECT * FROM MyKuduTable");
DataStream<Row> ds = tEnv.toDataStream(table);
```
+### Kudu Source
+
+{{< hint info >}}
+This part describes the Kudu Source usage, which is based on the new [data
source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
+{{< /hint >}}
+
+The Kudu Source provides a builder class that helps in the construction of the
object.
+The below code snippet shows how to build a Kudu Source to read data from an
existing Kudu table.
+The `KuduReaderConfig` class provides a way to configure Kudu-specific options
that controls the read behavior.
+
+```java
+KuduTableInfo tableInfo = KuduTableInfo.forTable("my_kudu_table");
+KuduReaderConfig readerConfig = KuduReaderConfig.Builder
+ .setMasters("localhost:7051")
+ .build();
+
+KuduSource<Row> source =
+ KuduSource.<Row>builder()
+ .setTableInfo(tableInfo)
+ .setReaderConfig(readerConfig)
+ .setRowResultConverter(new RowResultRowConverter())
+ .build();
+
+env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kudu Source");
+```
+
+{{< hint info >}}
+It is also possible to create a non-existing Kudu table. To learn more about
that, see the [Create Kudu table]({{< ref "docs/dev/connectors/datastream/kudu"
>}}#create-kudu-table) section.
+{{< /hint >}}
+
+#### Boundedness
+
+Although Kudu is a bounded source, it can still be useful to run in a
streaming manner, when the job does not stop until a failure or if it is
stopped/cancelled.
+By default `KuduSource` is running in bounded mode, but setting
`.setBoundedness(Boundedness)` to `CONTINUOUS_UNBOUNDED` will trigger streaming
mode.
+
+In `CONTINUOUS_UNBOUNDED` mode, the source follows a CDC-like behavior. This
means that at job start it will perform a snapshot of the source table and mark
that snapshot time.
+From that point onward the source will perform differential scans
periodically, so it will only process the changes made in that specific period.
+The duration of this period is controlled by the
`.setDiscoveryPeriod(Duration)` property. The following example shows how to
read a Kudu table in a streaming fashion and read updates in 1 minute periods.
+
+```java
+KuduSource<Row> source =
+ KuduSource.<Row>builder()
+ .setTableInfo(...)
+ .setReaderConfig(...)
+ .setRowResultConverter(new RowResultRowConverter())
+ .setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED)
+ .setDiscoveryPeriod(Duration.ofMinutes(1))
+ .build();
+```
+
### Kudu Row Input Format
We can also create a `DataStream` by using the `KuduRowInputFormat` directly.
In this case we have to manually provide all information about our table:
@@ -79,7 +130,7 @@ KuduTableInfo tableInfo = ...
KuduReaderConfig readerConfig = ...
KuduRowInputFormat inputFormat = new KuduRowInputFormat(readerConfig,
tableInfo);
-DataStream<Row> ds = env.createInput(inputFormat);
+env.createInput(inputFormat);
```
## Writing to Kudu
@@ -114,28 +165,9 @@ KuduSink<Row> sink = KuduSink.<Row>builder()
ds.sinkTo(sink);
```
-#### Create Kudu table
-
-If the Kudu table does not exist, we can pass the schema and configuration to
`KuduTableInfo`, and the sink will try to create the table based on that.
-
-```java
-KuduTableInfo tableInfo = KuduTableInfo
- .forTable("new-table")
- .createTableIfNotExists(
- () ->
- Arrays.asList(
- new ColumnSchema
- .ColumnSchemaBuilder("first", Type.INT32)
- .key(true)
- .build(),
- new ColumnSchema
- .ColumnSchemaBuilder("second", Type.STRING)
- .build()
- ),
- () -> new CreateTableOptions()
- .setNumReplicas(3)
- .addHashPartitions(Lists.newArrayList("first"), 2));
-```
+{{< hint info >}}
+It is also possible to create a non-existing Kudu table. To learn more about
that, see the [Create Kudu table]({{< ref "docs/dev/connectors/datastream/kudu"
>}}#create-kudu-table) section.
+{{< /hint >}}
### Kudu Operation Mapping
@@ -160,3 +192,26 @@ There are pre-defined operation mappers for POJO, Flink
`Row`, and Flink `Tuple`
* `RowOperationMapper`/`TupleOperationMapper`: The mapping is based on
position. The `i`th field of the `Row`/`Tuple`
corresponds to the column of the table at the `i`th position in the
`columnNames` array.
+## Create Kudu table
+
+If a table does not exist on the Kudu side, you can pass the desired schema
and configuration to `KuduTableInfo` via `createTableIfNotExists(...)`.
+This way either the source or the sink will try to create the table.
+
+```java
+KuduTableInfo tableInfo = KuduTableInfo
+ .forTable("new-table")
+ .createTableIfNotExists(
+ () ->
+ Arrays.asList(
+ new ColumnSchema
+ .ColumnSchemaBuilder("first", Type.INT32)
+ .key(true)
+ .build(),
+ new ColumnSchema
+ .ColumnSchemaBuilder("second", Type.STRING)
+ .build()
+ ),
+ () -> new CreateTableOptions()
+ .setNumReplicas(3)
+ .addHashPartitions(Lists.newArrayList("first"), 2));
+```