This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 00b8821 [Feature] support doris catalog (#60)
00b8821 is described below
commit 00b882190e7d9492c6c77c8ce8474ba618be0ed6
Author: wudi <[email protected]>
AuthorDate: Tue Sep 6 16:35:04 2022 +0800
[Feature] support doris catalog (#60)
* add doris catalog and fix thrift Concurrency bug
---
.../apache/doris/flink/catalog/DorisCatalog.java | 526 +++++++++++++++++++++
.../doris/flink/catalog/DorisCatalogFactory.java | 124 +++++
.../doris/flink/catalog/DorisCatalogOptions.java | 26 +
.../doris/flink/catalog/DorisTypeMapper.java | 89 ++++
.../source/reader/DorisSourceSplitReader.java | 8 -
.../flink/source/reader/DorisValueReader.java | 94 ++--
.../doris/flink/table/DorisConfigOptions.java | 145 ++++++
.../flink/table/DorisDynamicTableFactory.java | 152 ++----
.../org.apache.flink.table.factories.Factory | 3 +-
.../apache/doris/flink/catalog/CatalogTest.java | 263 +++++++++++
.../flink/source/reader/DorisSourceReaderTest.java | 2 +
11 files changed, 1263 insertions(+), 169 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
new file mode 100644
index 0000000..00f7fb3
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.doris.flink.table.DorisDynamicTableFactory;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.function.Predicate;
+
+import static
org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
+import static org.apache.doris.flink.catalog.DorisCatalogOptions.JDBCURL;
+import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
+import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
+import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * ClickHouse catalog.
+ */
+public class DorisCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisCatalog.class);
+
+ private static final Set<String> builtinDatabases =
+ new HashSet<String>() {
+ {
+ add("information_schema");
+ }
+ };
+
+ private final String username;
+ private final String password;
+ private final String jdbcUrl;
+ private final Map<String, String> properties;
+
+ public DorisCatalog(
+ String catalogName,
+ String jdbcUrl,
+ String defaultDatabase,
+ String username,
+ String password,
+ Map<String, String> properties) {
+ super(catalogName, defaultDatabase);
+
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(jdbcUrl), "jdbc-url cannot
be null or empty");
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(username), "username
cannot be null or empty");
+
+ this.jdbcUrl = jdbcUrl.endsWith("/") ? jdbcUrl : jdbcUrl + "/";
+ ;
+ this.username = username;
+ this.password = password;
+ this.properties = Collections.unmodifiableMap(properties);
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ // test connection, fail early if we cannot connect to database
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, username,
password)) {
+ } catch (SQLException e) {
+ throw new ValidationException(
+ String.format("Failed connecting to %s via JDBC.",
jdbcUrl), e);
+ }
+
+ LOG.info("Catalog {} established connection to {}", getName(),
jdbcUrl);
+ }
+
+ @Override
+ public synchronized void close() throws CatalogException {
+ try {
+ LOG.info("Closed catalog {} ", getName());
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Closing catalog %s
failed.", getName()), e);
+ }
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(new DorisDynamicTableFactory());
+ }
+
+ // ------------- databases -------------
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return extractColumnValuesBySQL(
+ jdbcUrl,
+ "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
+ 1,
+ dbName -> !builtinDatabases.contains(dbName));
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ if (listDatabases().contains(databaseName)) {
+ return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+ } else {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+ return listDatabases().contains(databaseName);
+ }
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean
ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
+ throws DatabaseNotEmptyException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase,
boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------------- tables -------------
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ Preconditions.checkState(
+ org.apache.commons.lang3.StringUtils.isNotBlank(databaseName),
"Database name must not be blank.");
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ return extractColumnValuesBySQL(
+ jdbcUrl + databaseName,
+ "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE
TABLE_SCHEMA = ?",
+ 1,
+ null,
+ databaseName);
+ }
+
+ @Override
+ public List<String> listViews(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ String databaseName = tablePath.getDatabaseName();
+ String tableName = tablePath.getObjectName();
+ Map<String, String> props = new HashMap<>(properties);
+ props.put(CONNECTOR.key(), IDENTIFIER);
+ if (!props.containsKey(FENODES.key())) {
+ props.put(FENODES.key(), queryFenodes());
+ }
+ props.put(USERNAME.key(), username);
+ props.put(PASSWORD.key(), password);
+ props.put(TABLE_IDENTIFIER.key(), databaseName + "." + tableName);
+
+ String labelPrefix = props.getOrDefault(SINK_LABEL_PREFIX.key(),"");
+ props.put(SINK_LABEL_PREFIX.key(),
String.join("_",labelPrefix,databaseName,tableName));
+ //remove catalog option
+ props.remove(JDBCURL.key());
+ props.remove(DEFAULT_DATABASE.key());
+ return CatalogTable.of(createTableSchema(databaseName, tableName),
null, Lists.newArrayList(), props);
+
+ }
+
+ @VisibleForTesting
+ protected String queryFenodes() {
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, username,
password)) {
+ StringJoiner fenodes = new StringJoiner(",");
+ PreparedStatement ps = conn.prepareStatement("SHOW FRONTENDS");
+ ResultSet resultSet = ps.executeQuery();
+ while (resultSet.next()) {
+ String ip = resultSet.getString("IP");
+ String port = resultSet.getString("HttpPort");
+ fenodes.add(ip + ":" + port);
+ }
+ return fenodes.toString();
+ } catch (Exception e) {
+ throw new CatalogException("Failed getting fenodes", e);
+ }
+ }
+
+ private Schema createTableSchema(String databaseName, String tableName) {
+ String dbUrl = jdbcUrl + databaseName;
+ try (Connection conn = DriverManager.getConnection(dbUrl, username,
password)) {
+ PreparedStatement ps =
+ conn.prepareStatement(
+ String.format("SELECT
COLUMN_NAME,DATA_TYPE,COLUMN_SIZE,DECIMAL_DIGITS FROM
`information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`=
'%s'", databaseName, tableName));
+
+ List<String> columnNames = new ArrayList<>();
+ List<DataType> columnTypes = new ArrayList<>();
+ ResultSet resultSet = ps.executeQuery();
+ while (resultSet.next()) {
+ String columnName = resultSet.getString("COLUMN_NAME");
+ String columnType = resultSet.getString("DATA_TYPE");
+ long columnSize = resultSet.getLong("COLUMN_SIZE");
+ long columnDigit = resultSet.getLong("DECIMAL_DIGITS");
+ DataType flinkType = DorisTypeMapper.toFlinkType(columnName,
columnType, (int) columnSize, (int) columnDigit);
+ columnNames.add(columnName);
+ columnTypes.add(flinkType);
+ }
+ Schema.Builder schemaBuilder =
Schema.newBuilder().fromFields(columnNames, columnTypes);
+ Schema tableSchema = schemaBuilder.build();
+ return tableSchema;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed getting catalog %s database %s table
%s", getName(), databaseName, tableName), e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ try {
+ return databaseExists(tablePath.getDatabaseName())
+ &&
listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException,
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath, CatalogBaseTable newTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------------- partitions -------------
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException,
+ PartitionSpecInvalidException, CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(
+ ObjectPath tablePath, List<Expression> filters)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ throw new PartitionNotExistException(getName(), tablePath,
partitionSpec);
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec)
+ throws CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition partition,
+ boolean ignoreIfExists)
+ throws TableNotExistException, TableNotPartitionedException,
+ PartitionSpecInvalidException, PartitionAlreadyExistsException,
+ CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean
ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition newPartition,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------------- functions -------------
+
+ @Override
+ public List<String> listFunctions(String dbName)
+ throws DatabaseNotExistException, CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath)
+ throws FunctionNotExistException, CatalogException {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws
CatalogException {
+ return false;
+ }
+
+ @Override
+ public void createFunction(
+ ObjectPath functionPath, CatalogFunction function, boolean
ignoreIfExists)
+ throws FunctionAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(
+ ObjectPath functionPath, CatalogFunction newFunction, boolean
ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean
ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ // ------------- statistics -------------
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath
tablePath)
+ throws TableNotExistException, CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public void alterTableStatistics(
+ ObjectPath tablePath, CatalogTableStatistics tableStatistics,
boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableColumnStatistics(
+ ObjectPath tablePath,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException,
TablePartitionedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+
+ private List<String> extractColumnValuesBySQL(
+ String connUrl,
+ String sql,
+ int columnIndex,
+ Predicate<String> filterFunc,
+ Object... params) {
+
+ List<String> columnValues = Lists.newArrayList();
+
+ try (Connection conn = DriverManager.getConnection(connUrl, username,
password);
+ PreparedStatement ps = conn.prepareStatement(sql)) {
+ if (Objects.nonNull(params) && params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ ps.setObject(i + 1, params[i]);
+ }
+ }
+ ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ String columnValue = rs.getString(columnIndex);
+ if (Objects.isNull(filterFunc) ||
filterFunc.test(columnValue)) {
+ columnValues.add(columnValue);
+ }
+ }
+ return columnValues;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "The following SQL query could not be executed
(%s): %s", connUrl, sql),
+ e);
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
new file mode 100644
index 0000000..f23f0dc
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static
org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
+import static org.apache.doris.flink.catalog.DorisCatalogOptions.JDBCURL;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
+import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
+import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_CHECK_INTERVAL;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
+import static
org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
+import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
+
+/**
+ * Factory for {@link DorisCatalog}.
+ */
+public class DorisCatalogFactory implements CatalogFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(JDBCURL);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(JDBCURL);
+ options.add(DEFAULT_DATABASE);
+
+ options.add(FENODES);
+ options.add(TABLE_IDENTIFIER);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+
+ options.add(DORIS_READ_FIELD);
+ options.add(DORIS_FILTER_QUERY);
+ options.add(DORIS_TABLET_SIZE);
+ options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
+ options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
+ options.add(DORIS_REQUEST_QUERY_TIMEOUT_S);
+ options.add(DORIS_REQUEST_RETRIES);
+ options.add(DORIS_DESERIALIZE_ARROW_ASYNC);
+ options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
+ options.add(DORIS_BATCH_SIZE);
+ options.add(DORIS_EXEC_MEM_LIMIT);
+
+ options.add(SINK_CHECK_INTERVAL);
+ options.add(SINK_ENABLE_2PC);
+ options.add(SINK_MAX_RETRIES);
+ options.add(SINK_ENABLE_DELETE);
+ options.add(SINK_LABEL_PREFIX);
+ options.add(SINK_BUFFER_SIZE);
+ options.add(SINK_BUFFER_COUNT);
+
+ options.add(SOURCE_USE_OLD_API);
+ return options;
+ }
+
+ @Override
+ public Catalog createCatalog(Context context) {
+ final FactoryUtil.CatalogFactoryHelper helper =
+ FactoryUtil.createCatalogFactoryHelper(this, context);
+ helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
+
+ return new DorisCatalog(
+ context.getName(),
+ helper.getOptions().get(JDBCURL),
+ helper.getOptions().get(DEFAULT_DATABASE),
+ helper.getOptions().get(USERNAME),
+ helper.getOptions().get(PASSWORD),
+ ((Configuration) helper.getOptions()).toMap());
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
new file mode 100644
index 0000000..ff87f9b
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogOptions.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+
+public class DorisCatalogOptions {
+ public static final ConfigOption<String> JDBCURL =
ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris
jdbc url.");
+ public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY).stringType().noDefaultValue();
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
new file mode 100644
index 0000000..f7a16ce
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+
+public class DorisTypeMapper {
+
+ // -------------------------number----------------------------
+ private static final String DORIS_TINYINT = "TINYINT";
+ private static final String DORIS_SMALLINT = "SMALLINT";
+ private static final String DORIS_INT = "INT";
+ private static final String DORIS_BIGINT = "BIGINT";
+ private static final String DORIS_LARGEINT = "BIGINT UNSIGNED";
+ private static final String DORIS_DECIMAL = "DECIMAL";
+ private static final String DORIS_FLOAT = "FLOAT";
+ private static final String DORIS_DOUBLE = "DOUBLE";
+
+ // -------------------------string----------------------------
+ private static final String DORIS_CHAR = "CHAR";
+ private static final String DORIS_VARCHAR = "VARCHAR";
+ private static final String DORIS_STRING = "STRING";
+ private static final String DORIS_TEXT = "TEXT";
+
+ // ------------------------------time-------------------------
+ private static final String DORIS_DATE = "DATE";
+ private static final String DORIS_DATETIME = "DATETIME";
+
+ //------------------------------bool------------------------
+ private static final String DORIS_BOOLEAN = "BOOLEAN";
+
+
+ public static DataType toFlinkType(String columnName, String columnType,
int precision, int scale) {
+ columnType = columnType.toUpperCase();
+ switch (columnType) {
+ case DORIS_BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case DORIS_TINYINT:
+ if (precision == 0) {
+ //The boolean type will become tinyint when queried in
information_schema, and precision=0
+ return DataTypes.BOOLEAN();
+ } else {
+ return DataTypes.TINYINT();
+ }
+ case DORIS_SMALLINT:
+ return DataTypes.SMALLINT();
+ case DORIS_INT:
+ return DataTypes.INT();
+ case DORIS_BIGINT:
+ return DataTypes.BIGINT();
+ case DORIS_DECIMAL:
+ return DataTypes.DECIMAL(precision, scale);
+ case DORIS_FLOAT:
+ return DataTypes.FLOAT();
+ case DORIS_DOUBLE:
+ return DataTypes.DOUBLE();
+ case DORIS_CHAR:
+ return DataTypes.CHAR(precision);
+ case DORIS_LARGEINT:
+ case DORIS_VARCHAR:
+ case DORIS_STRING:
+ case DORIS_TEXT:
+ return DataTypes.STRING();
+ case DORIS_DATE:
+ return DataTypes.DATE();
+ case DORIS_DATETIME:
+ return DataTypes.TIMESTAMP(0);
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support Doris type '%s' on column
'%s'", columnType, columnName));
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
index c5d33f7..b12bf2a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java
@@ -74,14 +74,6 @@ public class DorisSourceSplitReader
}
private DorisSplitRecords finishSplit() {
- if (valueReader != null) {
- try {
- valueReader.close();
- } catch (Exception e) {
- LOG.error("close resource reader failed,", e);
- }
- valueReader = null;
- }
final DorisSplitRecords finishRecords =
DorisSplitRecords.finishedSplit(currentSplitId);
currentSplitId = null;
return finishRecords;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
index 173ea90..04474f0 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -41,6 +41,8 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DEFAULT_CLUSTER;
@@ -53,6 +55,8 @@ import static
org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAG
public class DorisValueReader implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(DorisValueReader.class);
protected BackendClient client;
+ protected Lock clientLock = new ReentrantLock();
+
private PartitionDefinition partition;
private DorisOptions options;
private DorisReadOptions readOptions;
@@ -85,10 +89,15 @@ public class DorisValueReader implements AutoCloseable {
}
private void init() {
- this.openParams = openParams();
- TScanOpenResult openResult = this.client.openScanner(this.openParams);
- this.contextId = openResult.getContextId();
- this.schema =
SchemaUtils.convertToSchema(openResult.getSelectedColumns());
+ clientLock.lock();
+ try {
+ this.openParams = openParams();
+ TScanOpenResult openResult =
this.client.openScanner(this.openParams);
+ this.contextId = openResult.getContextId();
+ this.schema =
SchemaUtils.convertToSchema(openResult.getSelectedColumns());
+ } finally {
+ clientLock.unlock();
+ }
this.asyncThreadStarted = asyncThreadStarted();
LOG.debug("Open scan result is, contextId: {}, schema: {}.",
contextId, schema);
}
@@ -127,22 +136,27 @@ public class DorisValueReader implements AutoCloseable {
protected Thread asyncThread = new Thread(new Runnable() {
@Override
public void run() {
- TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
- nextBatchParams.setContextId(contextId);
- while (!eos.get()) {
- nextBatchParams.setOffset(offset);
- TScanBatchResult nextResult = client.getNext(nextBatchParams);
- eos.set(nextResult.isEos());
- if (!eos.get()) {
- RowBatch rowBatch = new RowBatch(nextResult,
schema).readArrow();
- offset += rowBatch.getReadRowCount();
- rowBatch.close();
- try {
- rowBatchBlockingQueue.put(rowBatch);
- } catch (InterruptedException e) {
- throw new DorisRuntimeException(e);
+ clientLock.lock();
+ try{
+ TScanNextBatchParams nextBatchParams = new
TScanNextBatchParams();
+ nextBatchParams.setContextId(contextId);
+ while (!eos.get()) {
+ nextBatchParams.setOffset(offset);
+ TScanBatchResult nextResult =
client.getNext(nextBatchParams);
+ eos.set(nextResult.isEos());
+ if (!eos.get()) {
+ RowBatch rowBatch = new RowBatch(nextResult,
schema).readArrow();
+ offset += rowBatch.getReadRowCount();
+ rowBatch.close();
+ try {
+ rowBatchBlockingQueue.put(rowBatch);
+ } catch (InterruptedException e) {
+ throw new DorisRuntimeException(e);
+ }
}
}
+ } finally {
+ clientLock.unlock();
}
}
});
@@ -187,22 +201,27 @@ public class DorisValueReader implements AutoCloseable {
hasNext = true;
}
} else {
- // Arrow data was acquired synchronously during the iterative
process
- if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) {
- if (rowBatch != null) {
- offset += rowBatch.getReadRowCount();
- rowBatch.close();
- }
- TScanNextBatchParams nextBatchParams = new
TScanNextBatchParams();
- nextBatchParams.setContextId(contextId);
- nextBatchParams.setOffset(offset);
- TScanBatchResult nextResult = client.getNext(nextBatchParams);
- eos.set(nextResult.isEos());
- if (!eos.get()) {
- rowBatch = new RowBatch(nextResult, schema).readArrow();
+ clientLock.lock();
+ try{
+ // Arrow data was acquired synchronously during the iterative
process
+ if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) {
+ if (rowBatch != null) {
+ offset += rowBatch.getReadRowCount();
+ rowBatch.close();
+ }
+ TScanNextBatchParams nextBatchParams = new
TScanNextBatchParams();
+ nextBatchParams.setContextId(contextId);
+ nextBatchParams.setOffset(offset);
+ TScanBatchResult nextResult =
client.getNext(nextBatchParams);
+ eos.set(nextResult.isEos());
+ if (!eos.get()) {
+ rowBatch = new RowBatch(nextResult,
schema).readArrow();
+ }
}
+ hasNext = !eos.get();
+ } finally {
+ clientLock.unlock();
}
- hasNext = !eos.get();
}
return hasNext;
}
@@ -222,8 +241,13 @@ public class DorisValueReader implements AutoCloseable {
@Override
public void close() throws Exception {
- TScanCloseParams closeParams = new TScanCloseParams();
- closeParams.setContextId(contextId);
- client.closeScanner(closeParams);
+ clientLock.lock();
+ try {
+ TScanCloseParams closeParams = new TScanCloseParams();
+ closeParams.setContextId(contextId);
+ client.closeScanner(closeParams);
+ } finally {
+ clientLock.unlock();
+ }
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
new file mode 100644
index 0000000..5b56342
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
+import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+
+public class DorisConfigOptions {
+
+ public static final String IDENTIFIER = "doris";
+ // common option
+ public static final ConfigOption<String> FENODES =
ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris
fe http address.");
+ public static final ConfigOption<String> TABLE_IDENTIFIER =
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
jdbc table name.");
+ public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
jdbc user name.");
+ public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
jdbc password.");
+
+ // source config options
+ public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
+ .key("doris.read.field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("List of column names in the Doris table,
separated by commas");
+ public static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
+ .key("doris.filter.query")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Filter expression of the query, which is
transparently transmitted to Doris. Doris uses this expression to complete
source-side data filtering");
+ public static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
+ .key("doris.request.tablet.size")
+ .intType()
+ .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
+ .withDescription("");
+ public static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS
= ConfigOptions
+ .key("doris.request.connect.timeout.ms")
+ .intType()
+ .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+ .withDescription("");
+ public static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS =
ConfigOptions
+ .key("doris.request.read.timeout.ms")
+ .intType()
+ .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+ .withDescription("");
+ public static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S =
ConfigOptions
+ .key("doris.request.query.timeout.s")
+ .intType()
+ .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
+ .withDescription("");
+ public static final ConfigOption<Integer> DORIS_REQUEST_RETRIES =
ConfigOptions
+ .key("doris.request.retries")
+ .intType()
+ .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
+ .withDescription("");
+ public static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
ConfigOptions
+ .key("doris.deserialize.arrow.async")
+ .booleanType()
+ .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
+ .withDescription("");
+ public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
ConfigOptions
+ .key("doris.request.retriesdoris.deserialize.queue.size")
+ .intType()
+ .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
+ .withDescription("");
+ public static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
+ .key("doris.batch.size")
+ .intType()
+ .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+ .withDescription("");
+ public static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
+ .key("doris.exec.mem.limit")
+ .longType()
+ .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
+ .withDescription("");
+ public static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
ConfigOptions
+ .key("source.use-old-api")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to read data using the new interface
defined according to the FLIP-27 specification,default false");
+
+ // sink config options
+ public static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
+ .key("sink.enable-2pc")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("enable 2PC while loading");
+
+ public static final ConfigOption<Integer> SINK_CHECK_INTERVAL =
ConfigOptions
+ .key("sink.check-interval")
+ .intType()
+ .defaultValue(10000)
+ .withDescription("check exception with the interval while
loading");
+ public static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
+ .key("sink.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription("the max retry times if writing records to
database failed.");
+ public static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions
+ .key("sink.buffer-size")
+ .intType()
+ .defaultValue(256 * 1024)
+ .withDescription("the buffer size to cache data for stream load.");
+ public static final ConfigOption<Integer> SINK_BUFFER_COUNT = ConfigOptions
+ .key("sink.buffer-count")
+ .intType()
+ .defaultValue(3)
+ .withDescription("the buffer count to cache data for stream
load.");
+ public static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions
+ .key("sink.label-prefix")
+ .stringType()
+ .defaultValue("")
+ .withDescription("the unique label prefix.");
+ public static final ConfigOption<Boolean> SINK_ENABLE_DELETE =
ConfigOptions
+ .key("sink.enable-delete")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("whether to enable the delete function");
+
+ // Prefix for Doris StreamLoad specific properties.
+ public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
+
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index fb44359..81ee23c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -20,7 +20,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -31,21 +30,37 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
-import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
-import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
-import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
-import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
-import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
-import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
-import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
-import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
-import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_EXEC_MEM_LIMIT;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_FILTER_QUERY;
+import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_READ_FIELD;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_QUERY_TIMEOUT_S;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_READ_TIMEOUT_MS;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETRIES;
+import static
org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
+import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
+import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_CHECK_INTERVAL;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
+import static
org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
+import static
org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
+import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;
+
/**
* The {@link DorisDynamicTableFactory} translates the catalog table to a
table source.
@@ -55,121 +70,9 @@ import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_
*/
public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory, DynamicTableSinkFactory {
- public static final ConfigOption<String> FENODES =
ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris
fe http address.");
- public static final ConfigOption<String> TABLE_IDENTIFIER =
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
jdbc table name.");
- public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
jdbc user name.");
- public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
jdbc password.");
- // Prefix for Doris StreamLoad specific properties.
- public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
- // doris options
- private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
- .key("doris.read.field")
- .stringType()
- .noDefaultValue()
- .withDescription("List of column names in the Doris table,
separated by commas");
- private static final ConfigOption<String> DORIS_FILTER_QUERY =
ConfigOptions
- .key("doris.filter.query")
- .stringType()
- .noDefaultValue()
- .withDescription("Filter expression of the query, which is
transparently transmitted to Doris. Doris uses this expression to complete
source-side data filtering");
- private static final ConfigOption<Integer> DORIS_TABLET_SIZE =
ConfigOptions
- .key("doris.request.tablet.size")
- .intType()
- .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
- .withDescription("");
- private static final ConfigOption<Integer>
DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
- .key("doris.request.connect.timeout.ms")
- .intType()
- .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
- .withDescription("");
- private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS =
ConfigOptions
- .key("doris.request.read.timeout.ms")
- .intType()
- .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
- .withDescription("");
- private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S =
ConfigOptions
- .key("doris.request.query.timeout.s")
- .intType()
- .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
- .withDescription("");
- private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES =
ConfigOptions
- .key("doris.request.retries")
- .intType()
- .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
- .withDescription("");
- private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
ConfigOptions
- .key("doris.deserialize.arrow.async")
- .booleanType()
- .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
- .withDescription("");
- private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
ConfigOptions
- .key("doris.request.retriesdoris.deserialize.queue.size")
- .intType()
- .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
- .withDescription("");
- private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
- .key("doris.batch.size")
- .intType()
- .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
- .withDescription("");
- private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT =
ConfigOptions
- .key("doris.exec.mem.limit")
- .longType()
- .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
- .withDescription("");
- // flink write config options
- private static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
- .key("sink.enable-2pc")
- .booleanType()
- .defaultValue(true)
- .withDescription("enable 2PC while loading");
-
- private static final ConfigOption<Integer> SINK_CHECK_INTERVAL =
ConfigOptions
- .key("sink.check-interval")
- .intType()
- .defaultValue(10000)
- .withDescription("check exception with the interval while
loading");
- private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
- .key("sink.max-retries")
- .intType()
- .defaultValue(3)
- .withDescription("the max retry times if writing records to
database failed.");
- private static final ConfigOption<Integer> SINK_BUFFER_SIZE = ConfigOptions
- .key("sink.buffer-size")
- .intType()
- .defaultValue(256 * 1024)
- .withDescription("the buffer size to cache data for stream load.");
- private static final ConfigOption<Integer> SINK_BUFFER_COUNT =
ConfigOptions
- .key("sink.buffer-count")
- .intType()
- .defaultValue(3)
- .withDescription("the buffer count to cache data for stream
load.");
- private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions
- .key("sink.label-prefix")
- .stringType()
- .defaultValue("")
- .withDescription("the unique label prefix.");
- private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
ConfigOptions
- .key("sink.batch.interval")
- .durationType()
- .defaultValue(Duration.ofSeconds(1))
- .withDescription("the flush interval mills, over this time,
asynchronous threads will flush data. The " +
- "default value is 1s.");
- private static final ConfigOption<Boolean> SINK_ENABLE_DELETE =
ConfigOptions
- .key("sink.enable-delete")
- .booleanType()
- .defaultValue(true)
- .withDescription("whether to enable the delete function");
-
- private static final ConfigOption<Boolean> SOURCE_USE_OLD_API =
ConfigOptions
- .key("source.use-old-api")
- .booleanType()
- .defaultValue(false)
- .withDescription("Whether to read data using the new interface
defined according to the FLIP-27 specification,default false");
-
@Override
public String factoryIdentifier() {
- return "doris"; // used for matching to `connector = '...'`
+ return IDENTIFIER; // used for matching to `connector = '...'`
}
@Override
@@ -203,7 +106,6 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
options.add(SINK_CHECK_INTERVAL);
options.add(SINK_ENABLE_2PC);
options.add(SINK_MAX_RETRIES);
- options.add(SINK_BUFFER_FLUSH_INTERVAL);
options.add(SINK_ENABLE_DELETE);
options.add(SINK_LABEL_PREFIX);
options.add(SINK_BUFFER_SIZE);
diff --git
a/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index e625cc7..5863c8b 100644
---
a/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/flink-doris-connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,4 +15,5 @@
# specific language governing permissions and limitations
# under the License.
-org.apache.doris.flink.table.DorisDynamicTableFactory
\ No newline at end of file
+org.apache.doris.flink.table.DorisDynamicTableFactory
+org.apache.doris.flink.catalog.DorisCatalogFactory
\ No newline at end of file
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
new file mode 100644
index 0000000..e1452d3
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogTest.java
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Class for unit tests to run on catalogs. */
+@Ignore
+public class CatalogTest {
+ private static final String TEST_CATALOG_NAME = "doris_catalog";
+ private static final String TEST_FENODES = "127.0.0.1:8030";
+ private static final String TEST_JDBCURL =
"jdbc:mysql://127.0.0.1.78:9030";
+ private static final String TEST_USERNAME = "root";
+ private static final String TEST_PWD = "";
+ private static final String TEST_DB = "test";
+ private static final String TEST_TABLE = "t_all_types";
+ private static final String TEST_TABLE_SINK = "t_all_types_sink";
+ private static final String TEST_TABLE_SINK_GROUPBY =
"t_all_types_sink_groupby";
+
+ protected static final Schema TABLE_SCHEMA =
+ Schema.newBuilder()
+ .column("id", DataTypes.STRING())
+ .column("c_boolean", DataTypes.BOOLEAN())
+ .column("c_char", DataTypes.CHAR(1))
+ .column("c_date", DataTypes.DATE())
+ .column("c_datetime", DataTypes.TIMESTAMP(0))
+ .column("c_decimal", DataTypes.DECIMAL(10, 2))
+ .column("c_double", DataTypes.DOUBLE())
+ .column("c_float", DataTypes.FLOAT())
+ .column("c_int", DataTypes.INT())
+ .column("c_bigint", DataTypes.BIGINT())
+ .column("c_largeint", DataTypes.STRING())
+ .column("c_smallint", DataTypes.SMALLINT())
+ .column("c_string", DataTypes.STRING())
+ .column("c_tinyint", DataTypes.TINYINT())
+ .build();
+
+ private static final List<Row> ALL_TYPES_ROWS =
+ Lists.newArrayList(
+ Row.ofKind(
+ RowKind.INSERT,
+ "100001",
+ true,
+ "a",
+ Date.valueOf("2022-08-31").toLocalDate(),
+ Timestamp.valueOf("2022-08-31
11:12:13").toLocalDateTime(),
+ BigDecimal.valueOf(1.12).setScale(2),
+ 1.1234d,
+ 1.1f,
+ 1234567,
+ 1234567890L,
+ "123456790123456790",
+ Short.parseShort("10"),
+ "catalog",
+ Byte.parseByte("1")),
+ Row.ofKind(
+ RowKind.INSERT,
+ "100002",
+ true,
+ "a",
+ Date.valueOf("2022-08-31").toLocalDate(),
+ Timestamp.valueOf("2022-08-31
11:12:13").toLocalDateTime(),
+ BigDecimal.valueOf(1.12).setScale(2),
+ 1.1234d,
+ 1.1f,
+ 1234567,
+ 1234567890L,
+ "123456790123456790",
+ Short.parseShort("10"),
+ "catalog",
+ Byte.parseByte("1")));
+
+ private DorisCatalog catalog;
+ private TableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ Map<String,String> props = new HashMap<>();
+ props.put("sink.enable-2pc","false");
+ catalog = new DorisCatalog(TEST_CATALOG_NAME, TEST_JDBCURL, TEST_DB,
TEST_USERNAME, TEST_PWD, props);
+ this.tEnv =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+ tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+ // Use doris catalog.
+ tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+ tEnv.useCatalog(TEST_CATALOG_NAME);
+ }
+
+ @Test
+ public void testQueryFenodes(){
+ String actual = catalog.queryFenodes();
+ assertEquals("127.0.0.1:8030", actual);
+ }
+
+ @Test
+ public void testListDatabases() {
+ List<String> actual = catalog.listDatabases();
+ assertEquals(Collections.singletonList(TEST_DB), actual);
+ }
+
+ @Test
+ public void testDbExists() throws Exception {
+ String databaseNotExist = "nonexistent";
+ assertFalse(catalog.databaseExists(databaseNotExist));
+ assertTrue(catalog.databaseExists(TEST_DB));
+ }
+
+ @Test
+ public void testListTables() throws DatabaseNotExistException {
+ List<String> actual = catalog.listTables(TEST_DB);
+ assertEquals(
+ Arrays.asList(
+ TEST_TABLE,
+ TEST_TABLE_SINK,
+ TEST_TABLE_SINK_GROUPBY),
+ actual);
+ }
+
+ @Test
+ public void testTableExists() {
+ String tableNotExist = "nonexist";
+ assertFalse(catalog.tableExists(new ObjectPath(TEST_DB,
tableNotExist)));
+ }
+
+ @Test
+ public void testGetTable() throws TableNotExistException {
+ CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB,
TEST_TABLE));
+ System.out.println(table);
+ assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema());
+ }
+
+ // ------ test select query. ------
+
+ @Test
+ public void testSelectField() {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select id from %s",
TEST_TABLE))
+ .execute()
+ .collect());
+ assertEquals(
+ Lists.newArrayList(Row.ofKind(RowKind.INSERT, "100001"),
Row.ofKind(RowKind.INSERT, "100002")),
+ results);
+ }
+
+ @Test
+ public void testWithoutCatalogDB() {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select * from %s",
TEST_TABLE))
+ .execute()
+ .collect());
+ assertEquals(ALL_TYPES_ROWS, results);
+ }
+
+ @Test
+ public void testWithoutCatalog() {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(
+ String.format(
+ "select * from `%s`.`%s`",
+ TEST_DB, TEST_TABLE))
+ .execute()
+ .collect());
+ assertEquals(ALL_TYPES_ROWS, results);
+ }
+
+ @Test
+ public void testFullPath() {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(
+ String.format(
+ "select * from %s.%s.`%s`",
+ TEST_CATALOG_NAME,
+ catalog.getDefaultDatabase(),
+ TEST_TABLE))
+ .execute()
+ .collect());
+ assertEquals(ALL_TYPES_ROWS, results);
+ }
+
+ @Test
+ public void testSelectToInsert() throws Exception {
+
+ String sql =
+ String.format(
+ "insert into `%s` select * from `%s`",
+ TEST_TABLE_SINK, TEST_TABLE);
+ tEnv.executeSql(sql).await();
+
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(String.format("select * from %s",
TEST_TABLE_SINK))
+ .execute()
+ .collect());
+ assertEquals(ALL_TYPES_ROWS, results);
+ }
+
+ @Test
+ public void testGroupByInsert() throws Exception {
+ // Changes primary key for the next record.
+ tEnv.executeSql(
+ String.format(
+ "insert into `%s` select `c_string`,
max(`id`) `id` from `%s` "
+ + "group by `c_string` ",
+ TEST_TABLE_SINK_GROUPBY, TEST_TABLE))
+ .await();
+
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv.sqlQuery(
+ String.format(
+ "select * from `%s`",
+ TEST_TABLE_SINK_GROUPBY))
+ .execute()
+ .collect());
+ assertEquals(Lists.newArrayList(Row.ofKind(RowKind.INSERT,
"catalog","100002")), results);
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
index a44b96d..4ab44bf 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.source.reader;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@@ -29,6 +30,7 @@ import static org.junit.Assert.assertEquals;
/**
* Unit tests for the {@link DorisSourceReader}.
*/
+@Ignore
public class DorisSourceReaderTest {
private static DorisSourceReader createReader(TestingReaderContext
context) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]