This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 9831982 [Improvement] Move DorisSourceBuilder into DorisSource as a
static inner builder class (#189)
9831982 is described below
commit 983198218adfcd9ec11e1b389f106dafbfc51c8a
Author: thehuldra <[email protected]>
AuthorDate: Mon Sep 4 17:37:33 2023 +0800
[Improvement] Move DorisSourceBuilder into DorisSource as a static inner
builder class (#189)
---
.../org/apache/doris/flink/source/DorisSource.java | 52 ++++++++++++++++
.../doris/flink/source/DorisSourceBuilder.java | 71 ----------------------
.../doris/flink/table/DorisDynamicTableSource.java | 3 +-
.../doris/flink/source/DorisSourceExampleTest.java | 2 +-
4 files changed, 54 insertions(+), 74 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index e2b6d41..8edf10b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -120,4 +120,56 @@ public class DorisSource<OUT> implements Source<OUT,
DorisSourceSplit, PendingSp
public TypeInformation<OUT> getProducedType() {
return deserializer.getProducedType();
}
+
+ public static <OUT> DorisSourceBuilder<OUT> builder() {
+ return new DorisSourceBuilder();
+ }
+
+ /**
+ * build for DorisSource.
+ * @param <OUT> record type.
+ */
+
+ public static class DorisSourceBuilder<OUT> {
+
+ private DorisOptions options;
+ private DorisReadOptions readOptions;
+
+ // Boundedness
+ private Boundedness boundedness;
+ private DorisDeserializationSchema<OUT> deserializer;
+
+ DorisSourceBuilder() {
+ boundedness = Boundedness.BOUNDED;
+ }
+
+
+ public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) {
+ this.options = options;
+ return this;
+ }
+
+ public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions
readOptions) {
+ this.readOptions = readOptions;
+ return this;
+ }
+
+ public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness)
{
+ this.boundedness = boundedness;
+ return this;
+ }
+
+ public DorisSourceBuilder<OUT>
setDeserializer(DorisDeserializationSchema<OUT> deserializer) {
+ this.deserializer = deserializer;
+ return this;
+ }
+
+ public DorisSource<OUT> build() {
+ if(readOptions == null){
+ readOptions = DorisReadOptions.builder().build();
+ }
+ return new DorisSource<>(options, readOptions, boundedness,
deserializer);
+ }
+ }
+
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
deleted file mode 100644
index 94febb8..0000000
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.source;
-
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
-import org.apache.flink.api.connector.source.Boundedness;
-
-/**
- * The builder class for {@link DorisSource} to make it easier for the users
to construct a {@link
- * DorisSource}.
- **/
-public class DorisSourceBuilder<OUT> {
-
- private DorisOptions options;
- private DorisReadOptions readOptions;
-
- // Boundedness
- private Boundedness boundedness;
- private DorisDeserializationSchema<OUT> deserializer;
-
- DorisSourceBuilder() {
- boundedness = Boundedness.BOUNDED;
- }
-
- public static <OUT> DorisSourceBuilder<OUT> builder() {
- return new DorisSourceBuilder();
- }
-
- public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) {
- this.options = options;
- return this;
- }
-
- public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions
readOptions) {
- this.readOptions = readOptions;
- return this;
- }
-
- public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness) {
- this.boundedness = boundedness;
- return this;
- }
-
- public DorisSourceBuilder<OUT>
setDeserializer(DorisDeserializationSchema<OUT> deserializer) {
- this.deserializer = deserializer;
- return this;
- }
-
- public DorisSource<OUT> build() {
- if(readOptions == null){
- readOptions = DorisReadOptions.builder().build();
- }
- return new DorisSource<>(options, readOptions, boundedness,
deserializer);
- }
-}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 0fb096b..bd04e20 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -25,7 +25,6 @@ import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.PartitionDefinition;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.source.DorisSource;
-import org.apache.doris.flink.source.DorisSourceBuilder;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
@@ -114,7 +113,7 @@ public final class DorisDynamicTableSource implements
ScanTableSource, LookupTab
return InputFormatProvider.of(builder.build());
} else {
//Read data using the interface of the FLIP-27 specification
- DorisSource<RowData> build = DorisSourceBuilder.<RowData>builder()
+ DorisSource<RowData> build = DorisSource.<RowData>builder()
.setDorisReadOptions(readOptions)
.setDorisOptions(options)
.setDeserializer(new
RowDataDeserializationSchema((RowType)
physicalSchema.toRowDataType().getLogicalType()))
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
index d85e70d..392596d 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
@@ -34,7 +34,7 @@ public class DorisSourceExampleTest {
@Test
public void testBoundedDorisSource() throws Exception {
- DorisSource<List<?>> dorisSource =
DorisSourceBuilder.<List<?>>builder()
+ DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
.setDorisOptions(OptionUtils.buildDorisOptions())
.setDorisReadOptions(OptionUtils.buildDorisReadOptions())
.setDeserializer(new SimpleListDeserializationSchema())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]