This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 66ce0157 [Chore] improve dep and add builder comment (#496)
66ce0157 is described below
commit 66ce0157ec56be4e35746701ba2e65763822bac6
Author: wudi <[email protected]>
AuthorDate: Fri Oct 11 14:16:09 2024 +0800
[Chore] improve dep and add builder comment (#496)
---
flink-doris-connector/pom.xml | 10 +-
.../doris/flink/cfg/DorisExecutionOptions.java | 115 +++++++++++++++++++++
.../org/apache/doris/flink/cfg/DorisOptions.java | 54 ++++++++--
.../apache/doris/flink/cfg/DorisReadOptions.java | 101 +++++++++++++++++-
.../org/apache/doris/flink/sink/DorisSink.java | 30 ++++++
.../org/apache/doris/flink/source/DorisSource.java | 24 +++++
6 files changed, 315 insertions(+), 19 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index d773339b..775242b0 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -105,14 +105,6 @@ under the License.
<artifactId>thrift-service</artifactId>
<version>${thrift-service.version}</version>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
@@ -369,7 +361,7 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
- <scope>provided</scope>
+ <scope>test</scope>
</dependency>
<!--Test-->
<dependency>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 7ad8ba97..831a317e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -292,51 +292,112 @@ public class DorisExecutionOptions implements
Serializable {
private WriteMode writeMode = WriteMode.STREAM_LOAD;
private boolean ignoreCommitError = false;
+ /**
+ * Sets the checkInterval to check exception with the interval while
loading, The default is
+ * 0, disabling the checker thread.
+ *
+ * @param checkInterval
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setCheckInterval(Integer checkInterval) {
this.checkInterval = checkInterval;
return this;
}
+ /**
+ * Sets the maxRetries to load data. In batch mode, this parameter is
the number of stream
+ * load retries, In non-batch mode, this parameter is the number of
retries in the commit
+ * phase.
+ *
+ * @param maxRetries
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setMaxRetries(Integer maxRetries) {
this.maxRetries = maxRetries;
return this;
}
+ /**
+ * Sets the buffer size to cache data for stream load. Only valid in
non-batch mode.
+ *
+ * @param bufferSize
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
+ /**
+ * Sets the buffer count to cache data for stream load. Only valid in
non-batch mode.
+ *
+ * @param bufferCount
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferCount(int bufferCount) {
this.bufferCount = bufferCount;
return this;
}
+ /**
+ * Sets the unique label prefix for stream load.
+ *
+ * @param labelPrefix
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setLabelPrefix(String labelPrefix) {
this.labelPrefix = labelPrefix;
return this;
}
+ /**
+ * Sets whether to use cache for stream load. Only valid in non-batch
mode.
+ *
+ * @param useCache
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setUseCache(boolean useCache) {
this.useCache = useCache;
return this;
}
+ /**
+ * Sets the properties for stream load.
+ *
+ * @param streamLoadProp
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setStreamLoadProp(Properties streamLoadProp) {
this.streamLoadProp = streamLoadProp;
return this;
}
+ /**
+ * Sets whether to perform the deletion operation for stream load.
+ *
+ * @param enableDelete
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setDeletable(Boolean enableDelete) {
this.enableDelete = enableDelete;
return this;
}
+ /**
+ * Sets whether to disable 2pc(two-phase commit) for stream load.
+ *
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder disable2PC() {
this.enable2PC = false;
return this;
}
+ /**
+ * Sets whether to force 2pc on. The default uniq model will turn off
2pc.
+ *
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder enable2PC() {
this.enable2PC = true;
// Force open 2pc
@@ -344,6 +405,12 @@ public class DorisExecutionOptions implements Serializable
{
return this;
}
+ /**
+ * Set whether to use batch mode to stream load.
+ *
+ * @param enableBatchMode
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBatchMode(Boolean enableBatchMode) {
this.enableBatchMode = enableBatchMode;
if (enableBatchMode.equals(Boolean.TRUE)) {
@@ -352,41 +419,89 @@ public class DorisExecutionOptions implements
Serializable {
return this;
}
+ /**
+ * Set queue size in batch mode.
+ *
+ * @param flushQueueSize
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setFlushQueueSize(int flushQueueSize) {
this.flushQueueSize = flushQueueSize;
return this;
}
+ /**
+ * Set the flush interval mills for stream load in batch mode.
+ *
+ * @param bufferFlushIntervalMs
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
this.bufferFlushIntervalMs = bufferFlushIntervalMs;
return this;
}
+ /**
+ * Set the max flush rows for stream load in batch mode.
+ *
+ * @param bufferFlushMaxRows
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferFlushMaxRows(int bufferFlushMaxRows) {
this.bufferFlushMaxRows = bufferFlushMaxRows;
return this;
}
+ /**
+ * Set the max flush bytes for stream load in batch mode.
+ *
+ * @param bufferFlushMaxBytes
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferFlushMaxBytes(int bufferFlushMaxBytes) {
this.bufferFlushMaxBytes = bufferFlushMaxBytes;
return this;
}
+ /**
+ * Set Whether to ignore the ignore updateBefore event.
+ *
+ * @param ignoreUpdateBefore
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) {
this.ignoreUpdateBefore = ignoreUpdateBefore;
return this;
}
+ /**
+ * Set the writing mode, only supports STREAM_LOAD and
STREAM_LOAD_BATCH
+ *
+ * @param writeMode
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setWriteMode(WriteMode writeMode) {
this.writeMode = writeMode;
return this;
}
+ /**
+ * Set whether to ignore commit failure errors. This is only valid in
non-batch mode 2pc.
+ * When ignored, data loss may occur.
+ *
+ * @param ignoreCommitError
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setIgnoreCommitError(boolean ignoreCommitError) {
this.ignoreCommitError = ignoreCommitError;
return this;
}
+ /**
+ * Build the {@link DorisExecutionOptions}.
+ *
+ * @return a DorisExecutionOptions with the settings made for this
builder.
+ */
public DorisExecutionOptions build() {
// If format=json is set but read_json_by_line is not set, record
may not be written.
if (streamLoadProp != null
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index bf6c7a28..69273c9e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -101,47 +101,89 @@ public class DorisOptions extends DorisConnectionOptions {
private boolean autoRedirect = true;
private String tableIdentifier;
- /** required, tableIdentifier. */
+ /**
+ * Sets the tableIdentifier for the DorisOptions.
+ *
+ * @param tableIdentifier Doris's database name and table name, such
as db.tbl
+ * @return this DorisOptions.builder.
+ */
public Builder setTableIdentifier(String tableIdentifier) {
this.tableIdentifier = tableIdentifier;
return this;
}
- /** optional, user name. */
+ /**
+ * Sets the username of doris cluster.
+ *
+ * @param username Doris cluster username
+ * @return this DorisOptions.builder.
+ */
public Builder setUsername(String username) {
this.username = username;
return this;
}
- /** optional, password. */
+ /**
+ * Sets the password of doris cluster.
+ *
+ * @param password Doris cluster password
+ * @return this DorisOptions.builder.
+ */
public Builder setPassword(String password) {
this.password = password;
return this;
}
- /** required, Frontend Http Rest url. */
+ /**
+ * Sets the doris frontend http rest url, such as
127.0.0.1:8030,127.0.0.2:8030
+ *
+ * @param fenodes
+ * @return this DorisOptions.builder.
+ */
public Builder setFenodes(String fenodes) {
this.fenodes = fenodes;
return this;
}
- /** optional, Backend Http Port. */
+ /**
+ * Sets the doris backend http rest url, such as
127.0.0.1:8040,127.0.0.2:8040
+ *
+ * @param benodes
+ * @return this DorisOptions.builder.
+ */
public Builder setBenodes(String benodes) {
this.benodes = benodes;
return this;
}
- /** not required, fe jdbc url, for lookup query. */
+ /**
+ * Sets the doris fe jdbc url for lookup query, such as
jdbc:mysql://127.0.0.1:9030
+ *
+ * @param jdbcUrl
+ * @return this DorisOptions.builder.
+ */
public Builder setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
return this;
}
+ /**
+ * Sets the autoRedirect for DorisOptions. If true, stream load will
be written directly to
+ * fe. If false, it will first get the be list and write directly to
be.
+ *
+ * @param autoRedirect
+ * @return this DorisOptions.builder.
+ */
public Builder setAutoRedirect(boolean autoRedirect) {
this.autoRedirect = autoRedirect;
return this;
}
+ /**
+ * Build the {@link DorisOptions}.
+ *
+ * @return a DorisOptions with the settings made for this builder.
+ */
public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
// multi table load, don't need check
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 937d3286..0448d60a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -223,76 +223,169 @@ public class DorisReadOptions implements Serializable {
private Boolean useFlightSql = false;
private Integer flightSqlPort;
+ /**
+ * Sets the readFields for doris table to push down projection, such
as name,age.
+ *
+ * @param readFields
+ * @return this DorisReadOptions.builder.
+ */
public Builder setReadFields(String readFields) {
this.readFields = readFields;
return this;
}
+ /**
+ * Sets the filterQuery for doris table to push down filter, such as
name,age.
+ *
+ * @param filterQuery
+ * @return this DorisReadOptions.builder.
+ */
public Builder setFilterQuery(String filterQuery) {
this.filterQuery = filterQuery;
return this;
}
+ /**
+ * Sets the requestTabletSize for DorisReadOptions. The number of
Doris Tablets
+ * corresponding to a Partition, the smaller this value is set, the
more Partitions will be
+ * generated. This improves the parallelism on the Flink side, but at
the same time puts
+ * more pressure on Doris.
+ *
+ * @param requestTabletSize
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestTabletSize(Integer requestTabletSize) {
this.requestTabletSize = requestTabletSize;
return this;
}
+ /**
+ * Sets the request connect timeout for DorisReadOptions.
+ *
+ * @param requestConnectTimeoutMs
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestConnectTimeoutMs(Integer
requestConnectTimeoutMs) {
this.requestConnectTimeoutMs = requestConnectTimeoutMs;
return this;
}
+ /**
+ * Sets the request read timeout for DorisReadOptions.
+ *
+ * @param requestReadTimeoutMs
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestReadTimeoutMs(Integer requestReadTimeoutMs) {
this.requestReadTimeoutMs = requestReadTimeoutMs;
return this;
}
+ /**
+ * Sets the timeout time for querying Doris for DorisReadOptions.
+ *
+ * @param requesQueryTimeoutS
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestQueryTimeoutS(Integer requesQueryTimeoutS) {
this.requestQueryTimeoutS = requesQueryTimeoutS;
return this;
}
+ /**
+ * Sets the number of retries to send requests to Doris for
DorisReadOptions.
+ *
+ * @param requestRetries
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestRetries(Integer requestRetries) {
this.requestRetries = requestRetries;
return this;
}
+ /**
+ * Sets the read batch size for DorisReadOptions.
+ *
+ * @param requestBatchSize
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestBatchSize(Integer requestBatchSize) {
this.requestBatchSize = requestBatchSize;
return this;
}
+ /**
+ * Sets the Memory limit for a single query for DorisReadOptions.
+ *
+ * @param execMemLimit
+ * @return this DorisReadOptions.builder.
+ */
public Builder setExecMemLimit(Long execMemLimit) {
this.execMemLimit = execMemLimit;
return this;
}
+ /**
+ * Sets the Asynchronous conversion of internal processing queue in
Arrow format
+ *
+ * @param deserializeQueueSize
+ * @return this DorisReadOptions.builder.
+ */
public Builder setDeserializeQueueSize(Integer deserializeQueueSize) {
this.deserializeQueueSize = deserializeQueueSize;
return this;
}
+ /**
+ * Sets Whether to support asynchronous conversion of Arrow format to
RowBatch needed for
+ * connector iterations.
+ *
+ * @param deserializeArrowAsync
+ * @return this DorisReadOptions.builder.
+ */
public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync)
{
this.deserializeArrowAsync = deserializeArrowAsync;
return this;
}
- public Builder setUseFlightSql(Boolean useFlightSql) {
- this.useFlightSql = useFlightSql;
+ /**
+ * Whether to use the legacy source api
+ *
+ * @param useOldApi
+ * @return this DorisReadOptions.builder.
+ */
+ public Builder setUseOldApi(Boolean useOldApi) {
+ this.useOldApi = useOldApi;
return this;
}
- public Builder setUseOldApi(Boolean useOldApi) {
- this.useOldApi = useOldApi;
+ /**
+ * Whether to use arrow flight sql for query, only supports Doris2.1
and above
+ *
+ * @param useFlightSql
+ * @return this DorisReadOptions.builder.
+ */
+ public Builder setUseFlightSql(Boolean useFlightSql) {
+ this.useFlightSql = useFlightSql;
return this;
}
+ /**
+ * Sets the flight sql port for DorisReadOptions.
+ *
+ * @param flightSqlPort
+ * @return this DorisReadOptions.builder.
+ */
public Builder setFlightSqlPort(Integer flightSqlPort) {
this.flightSqlPort = flightSqlPort;
return this;
}
+ /**
+ * Build the {@link DorisReadOptions}.
+ *
+ * @return a DorisReadOptions with the settings made for this builder.
+ */
public DorisReadOptions build() {
return new DorisReadOptions(
readFields,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index fd61d7fd..d8e0d827 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -161,26 +161,56 @@ public class DorisSink<IN>
private DorisExecutionOptions dorisExecutionOptions;
private DorisRecordSerializer<IN> serializer;
+ /**
+ * Sets the DorisOptions for the DorisSink.
+ *
+ * @param dorisOptions the common options of the doris cluster.
+ * @return this DorisSink.Builder.
+ */
public Builder<IN> setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
return this;
}
+ /**
+ * Sets the DorisReadOptions for the DorisSink.
+ *
+ * @param dorisReadOptions the read options of the DorisSink.
+ * @return this DorisSink.Builder.
+ */
public Builder<IN> setDorisReadOptions(DorisReadOptions
dorisReadOptions) {
this.dorisReadOptions = dorisReadOptions;
return this;
}
+ /**
+ * Sets the DorisExecutionOptions for the DorisSink.
+ *
+ * @param dorisExecutionOptions the execution options of the DorisSink.
+ * @return this DorisSink.Builder.
+ */
public Builder<IN> setDorisExecutionOptions(DorisExecutionOptions
dorisExecutionOptions) {
this.dorisExecutionOptions = dorisExecutionOptions;
return this;
}
+ /**
+ * Sets the {@link DorisRecordSerializer serializer} that transforms
incoming records to
+ * DorisRecord
+ *
+ * @param serializer
+ * @return this DorisSink.Builder.
+ */
public Builder<IN> setSerializer(DorisRecordSerializer<IN> serializer)
{
this.serializer = serializer;
return this;
}
+ /**
+ * Build the {@link DorisSink}.
+ *
+ * @return a DorisSink with the settings made for this builder.
+ */
public DorisSink<IN> build() {
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index 1b05453a..19a7fe36 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -168,21 +168,40 @@ public class DorisSource<OUT>
boundedness = Boundedness.BOUNDED;
}
+ /**
+ * Sets the DorisOptions for the DorisSource.
+ *
+ * @param options the common options of the doris cluster.
+ * @return this DorisSourceBuilder.
+ */
public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) {
this.options = options;
return this;
}
+ /**
+ * Sets the DorisReadOptions for the DorisSource.
+ *
+ * @param readOptions the read options of the DorisSource.
+ * @return this DorisSourceBuilder.
+ */
public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions
readOptions) {
this.readOptions = readOptions;
return this;
}
+ /** Sets the Boundedness for the DorisSource, Currently only BOUNDED
is supported. */
public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness)
{
this.boundedness = boundedness;
return this;
}
+ /**
+ * Sets the {@link DorisDeserializationSchema deserializer} of the
Record for DorisSource.
+ *
+ * @param deserializer the deserializer for Doris Record.
+ * @return this DorisSourceBuilder.
+ */
public DorisSourceBuilder<OUT> setDeserializer(
DorisDeserializationSchema<OUT> deserializer) {
this.deserializer = deserializer;
@@ -194,6 +213,11 @@ public class DorisSource<OUT>
return this;
}
+ /**
+ * Build the {@link DorisSource}.
+ *
+ * @return a DorisSource with the settings made for this builder.
+ */
public DorisSource<OUT> build() {
if (readOptions == null) {
readOptions = DorisReadOptions.builder().build();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]