This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7285212b [Chore](test) improve test example for doc (#513)
7285212b is described below
commit 7285212b836ba166d0a9fc7ebefacc52948a8073
Author: wudi <[email protected]>
AuthorDate: Mon Nov 18 14:40:40 2024 +0800
[Chore](test) improve test example for doc (#513)
---
.../doris/flink/example/DorisSinkExample.java | 58 ++++++++++++++-----
.../flink/example/DorisSinkExampleRowData.java | 45 ++++++---------
.../flink/example/DorisSinkMultiTableExample.java | 55 +++++++++---------
.../doris/flink/example/DorisSourceDataStream.java | 67 ++++++++++++++++++++--
4 files changed, 152 insertions(+), 73 deletions(-)
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java
index 35ef73fd..bcb16965 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java
@@ -40,6 +40,49 @@ import java.util.Properties;
public class DorisSinkExample {
public static void main(String[] args) throws Exception {
+ JSONFormatWrite();
+ }
+
+ public static void JSONFormatWrite() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(30000);
+ DorisSink.Builder<String> builder = DorisSink.builder();
+
+ DorisOptions dorisOptions =
+ DorisOptions.builder()
+ .setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("test.student")
+ .setUsername("root")
+ .setPassword("")
+ .build();
+
+ Properties properties = new Properties();
+ properties.setProperty("read_json_by_line", "true");
+ properties.setProperty("format", "json");
+
+ DorisExecutionOptions executionOptions =
+ DorisExecutionOptions.builder()
+ .setLabelPrefix("label-doris")
+ .setDeletable(false)
+ .setBatchMode(true)
+ .setStreamLoadProp(properties)
+ .build();
+
+ builder.setDorisReadOptions(DorisReadOptions.builder().build())
+ .setDorisExecutionOptions(executionOptions)
+ .setSerializer(new SimpleStringSerializer())
+ .setDorisOptions(dorisOptions);
+
+ List<String> data = new ArrayList<>();
+ data.add("{\"id\":3,\"name\":\"Michael\",\"age\":28}");
+ data.add("{\"id\":4,\"name\":\"David\",\"age\":38}");
+
+ env.fromCollection(data).sinkTo(builder.build());
+ env.execute("doris test");
+ }
+
+ public static void CSVFormatWrite() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -49,24 +92,13 @@ public class DorisSinkExample {
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,
Time.milliseconds(30000)));
DorisSink.Builder<String> builder = DorisSink.builder();
- final DorisReadOptions.Builder readOptionBuilder =
DorisReadOptions.builder();
- readOptionBuilder
- .setDeserializeArrowAsync(false)
- .setDeserializeQueueSize(64)
- .setExecMemLimit(2147483648L)
- .setRequestQueryTimeoutS(3600)
- .setRequestBatchSize(1000)
- .setRequestConnectTimeoutMs(10000)
- .setRequestReadTimeoutMs(10000)
- .setRequestRetries(3)
- .setRequestTabletSize(1024 * 1024);
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
- .setFenodes("127.0.0.1:8040")
+ .setFenodes("127.0.0.1:8030")
.setTableIdentifier("db.table")
.setUsername("test")
.setPassword("test");
@@ -77,7 +109,7 @@ public class DorisSinkExample {
.setBufferSize(8 * 1024)
.setBufferCount(3);
- builder.setDorisReadOptions(readOptionBuilder.build())
+ builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisBuilder.build());
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java
index 8037e2ea..ae6aa593 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java
@@ -17,13 +17,8 @@
package org.apache.doris.flink.example;
-import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
@@ -45,40 +40,37 @@ public class DorisSinkExampleRowData {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.enableCheckpointing(10000);
env.setParallelism(1);
- env.getCheckpointConfig()
- .enableExternalizedCheckpoints(
-
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,
Time.milliseconds(30000)));
+
DorisSink.Builder<RowData> builder = DorisSink.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
- // properties.setProperty("read_json_by_line", "true");
- // properties.setProperty("format", "json");
+ properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("127.0.0.1:8030")
- .setTableIdentifier("db.tbl")
+ .setTableIdentifier("test.students")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
-
executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
+ executionBuilder
+ .setLabelPrefix(UUID.randomUUID().toString())
+ .setDeletable(false)
+ .setStreamLoadProp(properties);
// flink rowdata‘s schema
- String[] fields = {"name", "age"};
- DataType[] types = {DataTypes.VARCHAR(256), DataTypes.INT()};
+ String[] fields = {"id", "name", "age"};
+ DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(256),
DataTypes.INT()};
builder.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(
RowDataSerializer.builder() // serialize according to
rowdata
- .setType(LoadConstants.CSV) //
.setType(LoadConstants.CSV)
+ .setType(LoadConstants.CSV)
.setFieldDelimiter(",")
- .setFieldNames(fields) //
.setFieldDelimiter(",")
+ .setFieldNames(fields)
.setFieldType(types)
.build())
.setDorisOptions(dorisBuilder.build());
@@ -91,16 +83,17 @@ public class DorisSinkExampleRowData {
@Override
public void flatMap(String s,
Collector<RowData> out)
throws Exception {
- GenericRowData genericRowData = new
GenericRowData(2);
+ GenericRowData genericRowData = new
GenericRowData(3);
+ genericRowData.setField(0, 1);
genericRowData.setField(
- 0,
StringData.fromString("beijing"));
- genericRowData.setField(1, 123);
+ 1,
StringData.fromString("Michael"));
+ genericRowData.setField(2, 18);
out.collect(genericRowData);
- GenericRowData genericRowData2 = new
GenericRowData(2);
- genericRowData2.setField(
- 0,
StringData.fromString("shanghai"));
- genericRowData2.setField(1, 1234);
+ GenericRowData genericRowData2 = new
GenericRowData(3);
+ genericRowData2.setField(0, 2);
+ genericRowData2.setField(1,
StringData.fromString("David"));
+ genericRowData2.setField(2, 38);
out.collect(genericRowData2);
}
});
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java
index feff8b32..2aeb3956 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java
@@ -17,38 +17,26 @@
package org.apache.doris.flink.example;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.sink.batch.DorisBatchSink;
+import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.batch.RecordWithMeta;
import org.apache.doris.flink.sink.writer.serializer.RecordWithMetaSerializer;
+import java.util.Arrays;
import java.util.Properties;
-import java.util.UUID;
public class DorisSinkMultiTableExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
+ env.enableCheckpointing(15000);
- DorisBatchSink.Builder<RecordWithMeta> builder =
DorisBatchSink.builder();
- final DorisReadOptions.Builder readOptionBuilder =
DorisReadOptions.builder();
-
- readOptionBuilder
- .setDeserializeArrowAsync(false)
- .setDeserializeQueueSize(64)
- .setExecMemLimit(2147483648L)
- .setRequestQueryTimeoutS(3600)
- .setRequestBatchSize(1000)
- .setRequestConnectTimeoutMs(10000)
- .setRequestReadTimeoutMs(10000)
- .setRequestRetries(3)
- .setRequestTabletSize(1024 * 1024);
-
+ DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
@@ -56,7 +44,7 @@ public class DorisSinkMultiTableExample {
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("127.0.0.1:8030")
- .setTableIdentifier("test.test_flink_tmp")
+ .setTableIdentifier("")
.setUsername("root")
.setPassword("");
@@ -66,21 +54,24 @@ public class DorisSinkMultiTableExample {
.setLabelPrefix("label")
.setStreamLoadProp(properties)
.setDeletable(false)
- .setBufferFlushMaxBytes(8 * 1024)
- .setBufferFlushMaxRows(10)
+ .setBatchMode(true)
+ .setBufferFlushMaxBytes(10 * 1024 * 1024)
+ .setBufferFlushMaxRows(10000)
.setBufferFlushIntervalMs(1000 * 10);
- builder.setDorisReadOptions(readOptionBuilder.build())
+ builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build())
.setSerializer(new RecordWithMetaSerializer());
- // RecordWithMeta record = new RecordWithMeta("test",
"test_flink_tmp1", "wangwu,1");
- // RecordWithMeta record1 = new RecordWithMeta("test",
"test_flink_tmp", "wangwu,1");
- // DataStreamSource<RecordWithMeta> stringDataStreamSource =
env.fromCollection(
- // Arrays.asList(record, record1));
- // stringDataStreamSource.sinkTo(builder.build());
+ RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1",
"wangwu,1");
+ RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp",
"wangwu,1");
+ DataStreamSource<RecordWithMeta> stringDataStreamSource =
+ env.fromCollection(Arrays.asList(record, record1));
+ stringDataStreamSource.sinkTo(builder.build());
+ /*
+ // mock unbounded streaming source
env.addSource(
new SourceFunction<RecordWithMeta>() {
private Long id = 1000000L;
@@ -90,6 +81,12 @@ public class DorisSinkMultiTableExample {
while (true) {
id = id + 1;
RecordWithMeta record =
+ new RecordWithMeta(
+ "test",
+ "test_flink_tmp",
+ UUID.randomUUID() + ",1");
+ out.collect(record);
+ record =
new RecordWithMeta(
"test",
"test_flink_tmp1",
@@ -98,10 +95,10 @@ public class DorisSinkMultiTableExample {
record =
new RecordWithMeta(
"test",
- "test_flink_tmp",
+ "test_flink_tmp2",
UUID.randomUUID() + ",1");
out.collect(record);
- Thread.sleep(3000);
+ Thread.sleep(1000);
}
}
@@ -109,7 +106,7 @@ public class DorisSinkMultiTableExample {
public void cancel() {}
})
.sinkTo(builder.build());
-
+ **/
env.execute("doris multi table test");
}
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java
index ee3fa135..ba57b950 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java
@@ -17,27 +17,84 @@
package org.apache.doris.flink.example;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.apache.doris.flink.source.DorisSource;
+import java.util.List;
import java.util.Properties;
public class DorisSourceDataStream {
public static void main(String[] args) throws Exception {
+ useArrowFlightSQLRead();
+ }
+
+ public static void useArrowFlightSQLRead() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DorisOptions option =
+ DorisOptions.builder()
+ .setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("test.students")
+ .setUsername("root")
+ .setPassword("")
+ .build();
+
+ DorisReadOptions readOptions =
+ DorisReadOptions.builder()
+ .setUseFlightSql(true)
+ .setFlightSqlPort(29747)
+ .setFilterQuery("age > 1")
+ .build();
+
+ DorisSource<List<?>> dorisSource =
+ DorisSource.<List<?>>builder()
+ .setDorisOptions(option)
+ .setDorisReadOptions(readOptions)
+ .setDeserializer(new SimpleListDeserializationSchema())
+ .build();
+
+ env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris
source").print();
+ env.execute("Doris Source Test");
+ }
+
+ public static void useThriftRead() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DorisOptions option =
+ DorisOptions.builder()
+ .setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("test.students")
+ .setUsername("root")
+ .setPassword("")
+ .build();
+
+ DorisReadOptions readOptions = DorisReadOptions.builder().build();
+ DorisSource<List<?>> dorisSource =
+ DorisSource.<List<?>>builder()
+ .setDorisOptions(option)
+ .setDorisReadOptions(readOptions)
+ .setDeserializer(new SimpleListDeserializationSchema())
+ .build();
+
+ env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris
source").print();
+ env.execute("Doris Source Test");
+ }
+
+ public static void useSourceFunctionRead() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
- properties.put("fenodes", "FE_IP:8030");
+ properties.put("fenodes", "127.0.0.1:8030");
properties.put("username", "root");
properties.put("password", "");
- properties.put("table.identifier", "db.table");
- properties.put("doris.read.field", "id,code,name");
- properties.put("doris.filter.query", "name='doris'");
+ properties.put("table.identifier", "test.students");
DorisStreamOptions options = new DorisStreamOptions(properties);
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.addSource(new DorisSourceFunction(options, new
SimpleListDeserializationSchema()))
.print();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]