This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 2e20e385234 [improvement](jdbc catalog) remove useless jdbc catalog code (#34986) (#35418) 2e20e385234 is described below commit 2e20e385234228667dbf1c628efecef2373ab658 Author: zy-kkk <zhongy...@gmail.com> AuthorDate: Mon May 27 14:25:26 2024 +0800 [improvement](jdbc catalog) remove useless jdbc catalog code (#34986) (#35418) --- be/src/vec/exec/vjdbc_connector.cpp | 247 +---- be/src/vec/exec/vjdbc_connector.h | 13 - fe/be-java-extensions/java-common/pom.xml | 4 - .../apache/doris/common/jni/utils/UdfUtils.java | 57 - fe/be-java-extensions/jdbc-scanner/pom.xml | 17 - .../org/apache/doris/jdbc/BaseJdbcExecutor.java | 12 - .../apache/doris/jdbc/ClickHouseJdbcExecutor.java | 194 ++-- .../org/apache/doris/jdbc/DefaultJdbcExecutor.java | 1123 -------------------- .../org/apache/doris/jdbc/JdbcExecutorFactory.java | 2 +- fe/be-java-extensions/preload-extensions/pom.xml | 15 - .../org/apache/doris/catalog/JdbcResource.java | 4 - .../java/org/apache/doris/catalog/JdbcTable.java | 1 - .../doris/datasource/jdbc/source/JdbcScanNode.java | 23 - fe/pom.xml | 20 - gensrc/thrift/Types.thrift | 1 - .../jdbc/test_jdbc_query_mysql.groovy | 38 +- 16 files changed, 143 insertions(+), 1628 deletions(-) diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index bef99daf0bb..20660a4cbe6 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -20,7 +20,6 @@ #include <gen_cpp/Types_types.h> #include <algorithm> -#include <boost/iterator/iterator_facade.hpp> // IWYU pragma: no_include <bits/std_abs.h> #include <cmath> // IWYU pragma: keep #include <memory> @@ -32,7 +31,6 @@ #include "exec/table_connector.h" #include "gutil/strings/substitute.h" #include "jni.h" -#include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "runtime/types.h" @@ -46,14 +44,12 @@ #include "vec/exec/jni_connector.h" #include "vec/exprs/vexpr.h" #include "vec/functions/simple_function_factory.h" -#include "vec/io/reader_buffer.h" namespace doris::vectorized { const char* JDBC_EXECUTOR_FACTORY_CLASS = "org/apache/doris/jdbc/JdbcExecutorFactory"; const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V"; const char* JDBC_EXECUTOR_STMT_WRITE_SIGNATURE = "(Ljava/util/Map;)I"; const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z"; -const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;"; const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V"; const char* JDBC_EXECUTOR_TRANSACTION_SIGNATURE = "()V"; @@ -69,11 +65,6 @@ JdbcConnector::~JdbcConnector() { } } -#define GET_BASIC_JAVA_CLAZZ(JAVA_TYPE, CPP_TYPE) \ - RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JAVA_TYPE, &_executor_##CPP_TYPE##_clazz)); - -#define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz); - Status JdbcConnector::close(Status /*unused*/) { SCOPED_RAW_TIMER(&_jdbc_statistic._connector_close_timer); _closed = true; @@ -88,10 +79,6 @@ Status JdbcConnector::close(Status /*unused*/) { env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_close_id); env->DeleteGlobalRef(_executor_factory_clazz); env->DeleteGlobalRef(_executor_clazz); - DELETE_BASIC_JAVA_CLAZZ_REF(object) - DELETE_BASIC_JAVA_CLAZZ_REF(string) - DELETE_BASIC_JAVA_CLAZZ_REF(list) -#undef DELETE_BASIC_JAVA_CLAZZ_REF RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); env->DeleteGlobalRef(_executor_obj); return Status::OK(); @@ -128,9 +115,6 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { env->DeleteLocalRef(jtable_type); env->ReleaseStringUTFChars(executor_name, executor_name_str); env->DeleteLocalRef(executor_name); - GET_BASIC_JAVA_CLAZZ("java/util/List", list) - GET_BASIC_JAVA_CLAZZ("java/lang/Object", object) - GET_BASIC_JAVA_CLAZZ("java/lang/String", string) #undef GET_BASIC_JAVA_CLAZZ RETURN_IF_ERROR(_register_func_id(env)); @@ -244,9 +228,6 @@ Status JdbcConnector::query() { } LOG(INFO) << "JdbcConnector::query has exec success: " << _sql_str; - if (_conn_param.table_type != TOdbcTableType::NEBULA) { - RETURN_IF_ERROR(_check_column_type()); - } return Status::OK(); } @@ -386,12 +367,6 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { _executor_get_block_address_id)); RETURN_IF_ERROR( register_id(_executor_clazz, "getCurBlockRows", "()I", _executor_block_rows_id)); - RETURN_IF_ERROR(register_id(_executor_list_clazz, "get", "(I)Ljava/lang/Object;", - _executor_get_list_id)); - RETURN_IF_ERROR(register_id(_executor_string_clazz, "getBytes", "(Ljava/lang/String;)[B", - _get_bytes_id)); - RETURN_IF_ERROR( - register_id(_executor_object_clazz, "toString", "()Ljava/lang/String;", _to_string_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "openTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_begin_trans_id)); @@ -399,8 +374,6 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { _executor_finish_trans_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "rollbackTrans", JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id)); - RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames", - JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id)); RETURN_IF_ERROR( register_id(_executor_clazz, "testConnection", "()V", _executor_test_connection_id)); RETURN_IF_ERROR( @@ -408,206 +381,6 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { return Status::OK(); } -Status JdbcConnector::_check_column_type() { - SCOPED_RAW_TIMER(&_jdbc_statistic._check_type_timer); - JNIEnv* env = nullptr; - RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - jobject type_lists = - env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz, _executor_get_types_id); - auto column_size = _tuple_desc->slots().size(); - for (int column_index = 0, materialized_column_index = 0; column_index < column_size; - ++column_index) { - auto slot_desc = _tuple_desc->slots()[column_index]; - if (!slot_desc->is_materialized()) { - continue; - } - jobject column_type = - env->CallObjectMethod(type_lists, _executor_get_list_id, materialized_column_index); - - const std::string& type_str = _jobject_to_string(env, column_type); - RETURN_IF_ERROR(_check_type(slot_desc, type_str, column_index)); - env->DeleteLocalRef(column_type); - materialized_column_index++; - } - env->DeleteLocalRef(type_lists); - return JniUtil::GetJniExceptionMsg(env); -} - -/* type mapping: https://doris.apache.org/zh-CN/docs/dev/ecosystem/external-table/jdbc-of-doris?_highlight=jdbc - -Doris MYSQL PostgreSQL Oracle SQLServer - -BOOLEAN java.lang.Boolean java.lang.Boolean java.lang.Boolean -TINYINT java.lang.Integer java.lang.Short -SMALLINT java.lang.Integer java.lang.Integer java.math.BigDecimal java.lang.Short -INT java.lang.Integer java.lang.Integer java.math.BigDecimal java.lang.Integer -BIGINT java.lang.Long java.lang.Long java.lang.Long -LARGET java.math.BigInteger -DECIMAL java.math.BigDecimal java.math.BigDecimal java.math.BigDecimal java.math.BigDecimal -VARCHAR java.lang.String java.lang.String java.lang.String java.lang.String -DOUBLE java.lang.Double java.lang.Double java.lang.Double java.lang.Double -FLOAT java.lang.Float java.lang.Float java.lang.Float -DATE java.sql.Date java.sql.Date java.sql.Date -DATETIME java.sql.Timestamp java.sql.Timestamp java.sql.Timestamp java.sql.Timestamp - -NOTE: because oracle always use number(p,s) to create all numerical type, so it's java type maybe java.math.BigDecimal -*/ - -Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& type_str, - int column_index) { - const std::string error_msg = fmt::format( - "Fail to convert jdbc type of {} to doris type {} on column: {}. You need to " - "check this column type between external table and doris table.", - type_str, slot_desc->type().debug_string(), slot_desc->col_name()); - switch (slot_desc->type().type) { - case TYPE_BOOLEAN: { - if (type_str != "java.lang.Boolean" && type_str != "java.lang.Byte" && - type_str != "java.lang.Integer") { - return Status::InternalError(error_msg); - } - break; - } - case TYPE_TINYINT: - case TYPE_SMALLINT: - case TYPE_INT: { - if (type_str != "java.lang.Short" && type_str != "java.lang.Integer" && - type_str != "java.math.BigDecimal" && type_str != "java.lang.Byte" && - type_str != "com.clickhouse.data.value.UnsignedByte" && - type_str != "com.clickhouse.data.value.UnsignedShort" && type_str != "java.lang.Long") { - return Status::InternalError(error_msg); - } - break; - } - case TYPE_BIGINT: - case TYPE_LARGEINT: { - if (type_str != "java.lang.Long" && type_str != "java.math.BigDecimal" && - type_str != "java.math.BigInteger" && type_str != "java.lang.String" && - type_str != "com.clickhouse.data.value.UnsignedInteger" && - type_str != "com.clickhouse.data.value.UnsignedLong") { - return Status::InternalError(error_msg); - } - break; - } - case TYPE_FLOAT: { - if (type_str != "java.lang.Float" && type_str != "java.math.BigDecimal") { - return Status::InternalError(error_msg); - } - break; - } - case TYPE_DOUBLE: { - if (type_str != "java.lang.Double" && type_str != "java.math.BigDecimal" && - type_str != "java.lang.String") { - return Status::InternalError(error_msg); - } - break; - } - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_STRING: { - //now here break directly - break; - } - case TYPE_DATE: - case TYPE_DATEV2: - case TYPE_TIMEV2: - case TYPE_DATETIME: - case TYPE_DATETIMEV2: { - if (type_str != "java.sql.Timestamp" && type_str != "java.time.LocalDateTime" && - type_str != "java.sql.Date" && type_str != "java.time.LocalDate" && - type_str != "oracle.sql.TIMESTAMP" && type_str != "java.time.OffsetDateTime" && - type_str != "java.lang.String") { - return Status::InternalError(error_msg); - } - break; - } - case TYPE_DECIMALV2: - case TYPE_DECIMAL32: - case TYPE_DECIMAL64: - case TYPE_DECIMAL128I: - case TYPE_DECIMAL256: { - if (type_str != "java.math.BigDecimal") { - return Status::InternalError(error_msg); - } - break; - } - case TYPE_ARRAY: { - if (type_str != "java.sql.Array" && type_str != "java.lang.String" && - type_str != "java.lang.Object") { - return Status::InternalError(error_msg); - } - break; - } - case TYPE_JSONB: { - if (type_str != "java.lang.String" && type_str != "org.postgresql.util.PGobject") { - return Status::InternalError(error_msg); - } - - _map_column_idx_to_cast_idx_json[column_index] = _input_json_string_types.size(); - if (slot_desc->is_nullable()) { - _input_json_string_types.push_back(make_nullable(std::make_shared<DataTypeString>())); - } else { - _input_json_string_types.push_back(std::make_shared<DataTypeString>()); - } - str_json_cols.push_back( - _input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]] - ->create_column()); - break; - } - case TYPE_HLL: { - if (type_str != "java.lang.String") { - return Status::InternalError(error_msg); - } - - _map_column_idx_to_cast_idx_hll[column_index] = _input_hll_string_types.size(); - if (slot_desc->is_nullable()) { - _input_hll_string_types.push_back(make_nullable(std::make_shared<DataTypeString>())); - } else { - _input_hll_string_types.push_back(std::make_shared<DataTypeString>()); - } - - str_hll_cols.push_back( - _input_hll_string_types[_map_column_idx_to_cast_idx_hll[column_index]] - ->create_column()); - break; - } - case TYPE_OBJECT: { - if (type_str != "java.lang.String") { - return Status::InternalError(error_msg); - } - - _map_column_idx_to_cast_idx_bitmap[column_index] = _input_bitmap_string_types.size(); - if (slot_desc->is_nullable()) { - _input_bitmap_string_types.push_back(make_nullable(std::make_shared<DataTypeString>())); - } else { - _input_bitmap_string_types.push_back(std::make_shared<DataTypeString>()); - } - - str_bitmap_cols.push_back( - _input_bitmap_string_types[_map_column_idx_to_cast_idx_bitmap[column_index]] - ->create_column()); - break; - } - default: { - return Status::InternalError(error_msg); - } - } - return Status::OK(); -} - -std::string JdbcConnector::_jobject_to_string(JNIEnv* env, jobject jobj) { - jobject jstr = env->CallObjectMethod(jobj, _to_string_id); - auto coding = env->NewStringUTF("UTF-8"); - const jbyteArray stringJbytes = (jbyteArray)env->CallObjectMethod(jstr, _get_bytes_id, coding); - size_t length = (size_t)env->GetArrayLength(stringJbytes); - jbyte* pBytes = env->GetByteArrayElements(stringJbytes, nullptr); - std::string str = std::string((char*)pBytes, length); - env->ReleaseByteArrayElements(stringJbytes, pBytes, JNI_ABORT); - env->DeleteLocalRef(stringJbytes); - env->DeleteLocalRef(jstr); - env->DeleteLocalRef(coding); - return str; -} - jobject JdbcConnector::_get_reader_params(Block* block, JNIEnv* env, size_t column_size) { std::ostringstream columns_nullable; std::ostringstream columns_replace_string; @@ -687,6 +460,13 @@ Status JdbcConnector::_cast_string_to_special(Block* block, JNIEnv* env, size_t Status JdbcConnector::_cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows) { + _map_column_idx_to_cast_idx_hll[column_index] = _input_hll_string_types.size(); + if (slot_desc->is_nullable()) { + _input_hll_string_types.push_back(make_nullable(std::make_shared<DataTypeString>())); + } else { + _input_hll_string_types.push_back(std::make_shared<DataTypeString>()); + } + DataTypePtr _target_data_type = slot_desc->get_data_type_ptr(); std::string _target_data_type_name = _target_data_type->get_name(); DataTypePtr _cast_param_data_type = _target_data_type; @@ -724,6 +504,13 @@ Status JdbcConnector::_cast_string_to_hll(const SlotDescriptor* slot_desc, Block Status JdbcConnector::_cast_string_to_bitmap(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows) { + _map_column_idx_to_cast_idx_bitmap[column_index] = _input_bitmap_string_types.size(); + if (slot_desc->is_nullable()) { + _input_bitmap_string_types.push_back(make_nullable(std::make_shared<DataTypeString>())); + } else { + _input_bitmap_string_types.push_back(std::make_shared<DataTypeString>()); + } + DataTypePtr _target_data_type = slot_desc->get_data_type_ptr(); std::string _target_data_type_name = _target_data_type->get_name(); DataTypePtr _cast_param_data_type = _target_data_type; @@ -762,6 +549,12 @@ Status JdbcConnector::_cast_string_to_bitmap(const SlotDescriptor* slot_desc, Bl // Deprecated, this code is retained only for compatibility with query problems that may be encountered when upgrading the version that maps JSON to JSONB to this version, and will be deleted in subsequent versions. Status JdbcConnector::_cast_string_to_json(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows) { + _map_column_idx_to_cast_idx_json[column_index] = _input_json_string_types.size(); + if (slot_desc->is_nullable()) { + _input_json_string_types.push_back(make_nullable(std::make_shared<DataTypeString>())); + } else { + _input_json_string_types.push_back(std::make_shared<DataTypeString>()); + } DataTypePtr _target_data_type = slot_desc->get_data_type_ptr(); std::string _target_data_type_name = _target_data_type->get_name(); DataTypePtr _cast_param_data_type = _target_data_type; diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index e42097b3abd..b308cfdf392 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -121,9 +121,6 @@ protected: private: Status _register_func_id(JNIEnv* env); - Status _check_column_type(); - Status _check_type(SlotDescriptor*, const std::string& type_str, int column_index); - std::string _jobject_to_string(JNIEnv* env, jobject jobj); jobject _get_reader_params(Block* block, JNIEnv* env, size_t column_size); @@ -139,9 +136,6 @@ private: bool _closed = false; jclass _executor_factory_clazz; jclass _executor_clazz; - jclass _executor_list_clazz; - jclass _executor_object_clazz; - jclass _executor_string_clazz; jobject _executor_obj; jmethodID _executor_factory_ctor_id; jmethodID _executor_ctor_id; @@ -150,11 +144,7 @@ private: jmethodID _executor_has_next_id; jmethodID _executor_get_block_address_id; jmethodID _executor_block_rows_id; - jmethodID _executor_get_types_id; jmethodID _executor_close_id; - jmethodID _executor_get_list_id; - jmethodID _get_bytes_id; - jmethodID _to_string_id; jmethodID _executor_begin_trans_id; jmethodID _executor_finish_trans_id; jmethodID _executor_abort_trans_id; @@ -163,15 +153,12 @@ private: std::map<int, int> _map_column_idx_to_cast_idx_hll; std::vector<DataTypePtr> _input_hll_string_types; - std::vector<MutableColumnPtr> str_hll_cols; // for hll type to save data like string std::map<int, int> _map_column_idx_to_cast_idx_bitmap; std::vector<DataTypePtr> _input_bitmap_string_types; - std::vector<MutableColumnPtr> str_bitmap_cols; // for bitmap type to save data like string std::map<int, int> _map_column_idx_to_cast_idx_json; std::vector<DataTypePtr> _input_json_string_types; - std::vector<MutableColumnPtr> str_json_cols; // for json type to save data like string JdbcStatistic _jdbc_statistic; }; diff --git a/fe/be-java-extensions/java-common/pom.xml b/fe/be-java-extensions/java-common/pom.xml index 20ed0104fa5..f91b31065f0 100644 --- a/fe/be-java-extensions/java-common/pom.xml +++ b/fe/be-java-extensions/java-common/pom.xml @@ -34,10 +34,6 @@ under the License. </properties> <dependencies> - <dependency> - <groupId>com.vesoft</groupId> - <artifactId>client</artifactId> - </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>fe-common</artifactId> diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfUtils.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfUtils.java index 5ebafb57084..36b88a74825 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfUtils.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfUtils.java @@ -24,9 +24,6 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.Pair; import org.apache.doris.common.exception.InternalException; -import com.vesoft.nebula.client.graph.data.DateTimeWrapper; -import com.vesoft.nebula.client.graph.data.DateWrapper; -import com.vesoft.nebula.client.graph.data.ValueWrapper; import org.apache.log4j.Logger; import sun.misc.Unsafe; @@ -38,8 +35,6 @@ import java.net.URL; import java.net.URLClassLoader; import java.security.AccessController; import java.security.PrivilegedAction; -import java.time.LocalDate; -import java.time.LocalDateTime; import java.util.Set; public class UdfUtils { @@ -231,56 +226,4 @@ public class UdfUtils { } return bytes; } - - // only used by nebula-graph - // transfer to an object that can copy to the block - public static Object convertObject(ValueWrapper value) { - try { - if (value.isLong()) { - return value.asLong(); - } - if (value.isBoolean()) { - return value.asBoolean(); - } - if (value.isDouble()) { - return value.asDouble(); - } - if (value.isString()) { - return value.asString(); - } - if (value.isTime()) { - return value.asTime().toString(); - } - if (value.isDate()) { - DateWrapper date = value.asDate(); - return LocalDate.of(date.getYear(), date.getMonth(), date.getDay()); - } - if (value.isDateTime()) { - DateTimeWrapper dateTime = value.asDateTime(); - return LocalDateTime.of(dateTime.getYear(), dateTime.getMonth(), dateTime.getDay(), - dateTime.getHour(), dateTime.getMinute(), dateTime.getSecond(), dateTime.getMicrosec() * 1000); - } - if (value.isVertex()) { - return value.asNode().toString(); - } - if (value.isEdge()) { - return value.asRelationship().toString(); - } - if (value.isPath()) { - return value.asPath().toString(); - } - if (value.isList()) { - return value.asList().toString(); - } - if (value.isSet()) { - return value.asSet().toString(); - } - if (value.isMap()) { - return value.asMap().toString(); - } - return null; - } catch (Exception e) { - return null; - } - } } diff --git a/fe/be-java-extensions/jdbc-scanner/pom.xml b/fe/be-java-extensions/jdbc-scanner/pom.xml index a37b5e0f62f..bebf1c4ffc4 100644 --- a/fe/be-java-extensions/jdbc-scanner/pom.xml +++ b/fe/be-java-extensions/jdbc-scanner/pom.xml @@ -40,28 +40,11 @@ under the License. <artifactId>java-common</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>com.oracle.database.jdbc</groupId> - <artifactId>ojdbc8</artifactId> - <scope>provided</scope> - </dependency> <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> <scope>provided</scope> </dependency> - <dependency> - <groupId>com.clickhouse</groupId> - <artifactId>clickhouse-jdbc</artifactId> - <classifier>all</classifier> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.oracle.ojdbc</groupId> - <artifactId>orai18n</artifactId> - <version>19.3.0.0</version> - <scope>provided</scope> - </dependency> </dependencies> <build> diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java index 6b64d6e0e5f..54ae3a31272 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java @@ -26,7 +26,6 @@ import org.apache.doris.common.jni.vec.VectorColumn; import org.apache.doris.common.jni.vec.VectorTable; import org.apache.doris.thrift.TJdbcExecutorCtorParams; import org.apache.doris.thrift.TJdbcOperation; -import org.apache.doris.thrift.TOdbcTableType; import com.google.common.base.Preconditions; import com.zaxxer.hikari.HikariDataSource; @@ -57,14 +56,12 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor { private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); private HikariDataSource hikariDataSource = null; private final byte[] hikariDataSourceLock = new byte[0]; - private TOdbcTableType tableType; private JdbcDataSourceConfig config; private Connection conn = null; protected PreparedStatement preparedStatement = null; protected Statement stmt = null; protected ResultSet resultSet = null; protected ResultSetMetaData resultSetMetaData = null; - protected List<String> resultColumnTypeNames = null; protected List<Object[]> block = null; protected VectorTable outputTable = null; protected int batchSizeNum = 0; @@ -78,7 +75,6 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor { } catch (TException e) { throw new InternalException(e.getMessage()); } - tableType = request.table_type; this.config = new JdbcDataSourceConfig() .setCatalogId(request.catalog_id) .setJdbcUser(request.jdbc_user) @@ -175,11 +171,7 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor { resultSet = ((PreparedStatement) stmt).executeQuery(); resultSetMetaData = resultSet.getMetaData(); int columnCount = resultSetMetaData.getColumnCount(); - resultColumnTypeNames = new ArrayList<>(columnCount); block = new ArrayList<>(columnCount); - for (int i = 0; i < columnCount; ++i) { - resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); - } return columnCount; } catch (SQLException e) { throw new UdfRuntimeException("JDBC executor sql has error: ", e); @@ -283,10 +275,6 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor { } } - public List<String> getResultColumnTypeNames() { - return resultColumnTypeNames; - } - public int getCurBlockRows() { return curBlockRows; } diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/ClickHouseJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/ClickHouseJdbcExecutor.java index a73c7a0b25d..a3bb9b7b261 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/ClickHouseJdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/ClickHouseJdbcExecutor.java @@ -22,6 +22,8 @@ import org.apache.doris.common.jni.vec.ColumnType.Type; import org.apache.doris.common.jni.vec.ColumnValueConverter; import org.apache.doris.common.jni.vec.VectorTable; +import com.google.common.collect.Lists; + import java.lang.reflect.Array; import java.math.BigDecimal; import java.math.BigInteger; @@ -30,11 +32,7 @@ import java.sql.SQLException; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; public class ClickHouseJdbcExecutor extends BaseJdbcExecutor { @@ -89,7 +87,7 @@ public class ClickHouseJdbcExecutor extends BaseJdbcExecutor { case STRING: return resultSet.getObject(columnIndex + 1, String.class); case ARRAY: - return resultSet.getObject(columnIndex + 1); + return convertArrayToList(resultSet.getArray(columnIndex + 1).getArray()); default: throw new IllegalArgumentException("Unsupported column type: " + type.getType()); } @@ -99,114 +97,128 @@ public class ClickHouseJdbcExecutor extends BaseJdbcExecutor { protected ColumnValueConverter getOutputConverter(ColumnType columnType, String replaceString) { if (columnType.getType() == Type.ARRAY) { return createConverter( - (Object input) -> convertArray(input, columnType.getChildTypes().get(0)), + (Object input) -> convertArray((List<?>) input, columnType.getChildTypes().get(0)), List.class); } else { return null; } } - private <T, U> List<U> convertArray(T[] input, Function<T, U> converter) { - if (input == null) { - return Collections.emptyList(); + private List<Object> convertArrayToList(Object array) { + if (array == null) { + return null; } - return Arrays.stream(input) - .map(converter) - .collect(Collectors.toList()); - } - private List<?> convertArray(Object input, ColumnType childType) { - if (input == null) { - return Collections.emptyList(); + int length = Array.getLength(array); + List<Object> list = new ArrayList<>(length); + + for (int i = 0; i < length; i++) { + Object element = Array.get(array, i); + list.add(element); } - if (childType.isArray()) { - ColumnType subType = childType.getChildTypes().get(0); - Object[] array = (Object[]) input; - List<Object> convertedList = new ArrayList<>(); - for (Object subArray : array) { - convertedList.add(convertArray(subArray, subType)); - } - return convertedList; + + return list; + } + + private List<?> convertArray(List<?> array, ColumnType type) { + if (array == null) { + return null; } - if (input instanceof Object[]) { - Object[] arrayInput = (Object[]) input; - switch (childType.getType()) { - case SMALLINT: - return input instanceof Byte[] - ? convertArray((Byte[]) input, - byteValue -> byteValue != null ? (short) (byte) byteValue : null) - : convertArray((Number[]) arrayInput, - number -> number != null ? number.shortValue() : null); - case INT: - return input instanceof Short[] - ? convertArray((Short[]) input, - shortValue -> shortValue != null ? (int) (short) shortValue : null) - : convertArray((Number[]) arrayInput, number -> number != null ? number.intValue() : null); - case BIGINT: - return input instanceof Integer[] - ? convertArray((Integer[]) input, - intValue -> intValue != null ? (long) (int) intValue : null) - : convertArray((Number[]) arrayInput, number -> number != null ? number.longValue() : null); - case LARGEINT: - return input instanceof Long[] - ? convertArray((Long[]) input, - longValue -> longValue != null ? BigInteger.valueOf(longValue) : null) - : convertArray((Number[]) arrayInput, - number -> number != null ? BigInteger.valueOf(number.longValue()) : null); - case STRING: - if (input instanceof InetAddress[]) { - return convertArray((InetAddress[]) input, - inetAddress -> inetAddress != null ? inetAddress.getHostAddress() : null); + switch (type.getType()) { + case SMALLINT: { + List<Short> result = Lists.newArrayList(); + for (Object element : array) { + if (element == null) { + result.add(null); } else { - return convertArray(arrayInput, element -> element != null ? element.toString() : null); + if (element instanceof Byte) { + result.add(((Byte) element).shortValue()); + } else if (element instanceof Number) { + result.add(((Number) element).shortValue()); + } else { + throw new IllegalArgumentException("Unsupported element type: " + element.getClass()); + } } - default: - return Arrays.asList(arrayInput); + } + return result; } - } else { - return convertPrimitiveArray(input, childType); - } - } - - private List<?> convertPrimitiveArray(Object input, ColumnType childType) { - int length = Array.getLength(input); - List<Object> list = new ArrayList<>(length); - for (int i = 0; i < length; i++) { - Object element = Array.get(input, i); - switch (childType.getType()) { - case SMALLINT: - if (input instanceof byte[]) { - list.add((short) (byte) element); + case INT: { + List<Integer> result = Lists.newArrayList(); + for (Object element : array) { + if (element == null) { + result.add(null); } else { - list.add(element); + if (element instanceof Short) { + result.add(((Short) element).intValue()); + } else if (element instanceof Number) { + result.add(((Number) element).intValue()); + } else { + throw new IllegalArgumentException("Unsupported element type: " + element.getClass()); + } } - break; - case INT: - if (input instanceof short[]) { - list.add((int) (short) element); + } + return result; + } + case BIGINT: { + List<Long> result = Lists.newArrayList(); + for (Object element : array) { + if (element == null) { + result.add(null); } else { - list.add(element); + if (element instanceof Integer) { + result.add(((Integer) element).longValue()); + } else if (element instanceof Number) { + result.add(((Number) element).longValue()); + } else { + throw new IllegalArgumentException("Unsupported element type: " + element.getClass()); + } } - break; - case BIGINT: - if (input instanceof int[]) { - list.add((long) (int) element); + } + return result; + } + case LARGEINT: { + List<BigInteger> result = Lists.newArrayList(); + for (Object element : array) { + if (element == null) { + result.add(null); } else { - list.add(element); + if (element instanceof BigDecimal) { + result.add(((BigDecimal) element).toBigInteger()); + } else if (element instanceof Number) { + result.add(BigInteger.valueOf(((Number) element).longValue())); + } else { + throw new IllegalArgumentException("Unsupported element type: " + element.getClass()); + } } - break; - case LARGEINT: - if (input instanceof long[]) { - list.add(BigInteger.valueOf((long) element)); + } + return result; + } + case STRING: { + List<String> result = Lists.newArrayList(); + for (Object element : array) { + if (element == null) { + result.add(null); + } else if (element instanceof InetAddress) { + result.add(((InetAddress) element).getHostAddress()); } else { - list.add(element); + result.add(element.toString()); } - break; - default: - list.add(element); - break; + } + return result; } + case ARRAY: + List<List<?>> resultArray = Lists.newArrayList(); + for (Object element : array) { + if (element == null) { + resultArray.add(null); + } else { + resultArray.add( + Lists.newArrayList(convertArray((List<?>) element, type.getChildTypes().get(0)))); + } + } + return resultArray; + default: + return array; } - return list; } } diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java deleted file mode 100644 index 7d464e2369a..00000000000 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java +++ /dev/null @@ -1,1123 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.jdbc; - -import org.apache.doris.common.exception.InternalException; -import org.apache.doris.common.exception.UdfRuntimeException; -import org.apache.doris.common.jni.utils.UdfUtils; -import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ColumnValueConverter; -import org.apache.doris.common.jni.vec.VectorColumn; -import org.apache.doris.common.jni.vec.VectorTable; -import org.apache.doris.thrift.TJdbcExecutorCtorParams; -import org.apache.doris.thrift.TJdbcOperation; -import org.apache.doris.thrift.TOdbcTableType; - -import com.clickhouse.data.value.UnsignedByte; -import com.clickhouse.data.value.UnsignedInteger; -import com.clickhouse.data.value.UnsignedLong; -import com.clickhouse.data.value.UnsignedShort; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; -import com.vesoft.nebula.client.graph.data.ValueWrapper; -import com.zaxxer.hikari.HikariDataSource; -import org.apache.log4j.Logger; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; - -import java.io.FileNotFoundException; -import java.lang.reflect.Array; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.net.Inet4Address; -import java.net.Inet6Address; -import java.net.InetAddress; -import java.net.MalformedURLException; -import java.sql.Connection; -import java.sql.Date; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.OffsetDateTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; -import java.time.temporal.ChronoField; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.function.Function; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -public class DefaultJdbcExecutor { - private static final Logger LOG = Logger.getLogger(DefaultJdbcExecutor.class); - private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); - private Connection conn = null; - private PreparedStatement preparedStatement = null; - private Statement stmt = null; - private ResultSet resultSet = null; - private ResultSetMetaData resultSetMetaData = null; - private List<String> resultColumnTypeNames = null; - private List<Object[]> block = null; - private VectorTable outputTable = null; - private int batchSizeNum = 0; - private int curBlockRows = 0; - private static final byte[] emptyBytes = new byte[0]; - private HikariDataSource hikariDataSource = null; - private final byte[] hikariDataSourceLock = new byte[0]; - private TOdbcTableType tableType; - private JdbcDataSourceConfig config; - - public DefaultJdbcExecutor(byte[] thriftParams) throws Exception { - TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); - TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY); - try { - deserializer.deserialize(request, thriftParams); - } catch (TException e) { - throw new InternalException(e.getMessage()); - } - tableType = request.table_type; - this.config = new JdbcDataSourceConfig() - .setCatalogId(request.catalog_id) - .setJdbcUser(request.jdbc_user) - .setJdbcPassword(request.jdbc_password) - .setJdbcUrl(request.jdbc_url) - .setJdbcDriverUrl(request.driver_path) - .setJdbcDriverClass(request.jdbc_driver_class) - .setBatchSize(request.batch_size) - .setOp(request.op) - .setTableType(request.table_type) - .setConnectionPoolMinSize(request.connection_pool_min_size) - .setConnectionPoolMaxSize(request.connection_pool_max_size) - .setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time) - .setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time) - .setConnectionPoolKeepAlive(request.connection_pool_keep_alive); - JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time); - init(config, request.statement); - } - - public void close() throws Exception { - try { - if (stmt != null) { - try { - stmt.cancel(); - } catch (SQLException e) { - LOG.error("Error cancelling statement", e); - } - } - - boolean shouldAbort = conn != null && resultSet != null - && (tableType == TOdbcTableType.MYSQL || tableType == TOdbcTableType.SQLSERVER); - boolean aborted = false; // Used to record whether the abort operation is performed - if (shouldAbort) { - aborted = abortReadConnection(conn, resultSet, tableType); - } - - // If no abort operation is performed, the resource needs to be closed manually - if (!aborted) { - closeResources(resultSet, stmt, conn); - } - } finally { - if (config.getConnectionPoolMinSize() == 0 && hikariDataSource != null) { - hikariDataSource.close(); - JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey()); - hikariDataSource = null; - } - } - } - - private void closeResources(AutoCloseable... closeables) { - for (AutoCloseable closeable : closeables) { - if (closeable != null) { - try { - if (closeable instanceof Connection) { - if (!((Connection) closeable).isClosed()) { - closeable.close(); - } - } else { - closeable.close(); - } - } catch (Exception e) { - LOG.error("Cannot close resource: ", e); - } - } - } - } - - public boolean abortReadConnection(Connection connection, ResultSet resultSet, TOdbcTableType tableType) - throws SQLException { - if (!resultSet.isAfterLast() && (tableType == TOdbcTableType.MYSQL || tableType == TOdbcTableType.SQLSERVER)) { - // Abort connection before closing. Without this, the MySQL/SQLServer driver - // attempts to drain the connection by reading all the results. - connection.abort(MoreExecutors.directExecutor()); - return true; - } - return false; - } - - public void cleanDataSource() { - if (hikariDataSource != null) { - hikariDataSource.close(); - JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey()); - hikariDataSource = null; - } - } - - public void testConnection() throws UdfRuntimeException { - try { - resultSet = ((PreparedStatement) stmt).executeQuery(); - if (!resultSet.next()) { - throw new UdfRuntimeException( - "Failed to test connection in BE: query executed but returned no results."); - } - } catch (SQLException e) { - throw new UdfRuntimeException("Failed to test connection in BE: ", e); - } - } - - public int read() throws UdfRuntimeException { - try { - resultSet = ((PreparedStatement) stmt).executeQuery(); - resultSetMetaData = resultSet.getMetaData(); - int columnCount = resultSetMetaData.getColumnCount(); - resultColumnTypeNames = new ArrayList<>(columnCount); - block = new ArrayList<>(columnCount); - if (isNebula()) { - for (int i = 0; i < columnCount; ++i) { - block.add((Object[]) Array.newInstance(Object.class, batchSizeNum)); - } - } else { - for (int i = 0; i < columnCount; ++i) { - resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); - block.add((Object[]) Array.newInstance(Object.class, batchSizeNum)); - } - } - return columnCount; - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor sql has error: ", e); - } - } - - public long getBlockAddress(int batchSize, Map<String, String> outputParams) - throws UdfRuntimeException { - try { - if (outputTable != null) { - outputTable.close(); - } - - String isNullableString = outputParams.get("is_nullable"); - String replaceString = outputParams.get("replace_string"); - - if (isNullableString == null || replaceString == null) { - throw new IllegalArgumentException( - "Output parameters 'is_nullable' and 'replace_string' are required."); - } - - String[] nullableList = isNullableString.split(","); - String[] replaceStringList = replaceString.split(","); - curBlockRows = 0; - int columnCount = resultSetMetaData.getColumnCount(); - - do { - for (int i = 0; i < columnCount; ++i) { - boolean isBitmapOrHll = - replaceStringList[i].equals("bitmap") - || replaceStringList[i].equals("hll"); - block.get(i)[curBlockRows] = getColumnValue(tableType, i, isBitmapOrHll); - } - curBlockRows++; - } while (curBlockRows < batchSize && resultSet.next()); - - outputTable = VectorTable.createWritableTable(outputParams, curBlockRows); - - for (int i = 0; i < columnCount; ++i) { - Object[] columnData = block.get(i); - ColumnType type = outputTable.getColumnType(i); - Class<?> clz = findNonNullClass(columnData, type); - Object[] newColumn = (Object[]) Array.newInstance(clz, curBlockRows); - System.arraycopy(columnData, 0, newColumn, 0, curBlockRows); - boolean isNullable = Boolean.parseBoolean(nullableList[i]); - outputTable.appendData( - i, - newColumn, - getOutputConverter(type, clz, replaceStringList[i]), - isNullable); - } - } catch (Exception e) { - LOG.warn("jdbc get block address exception: ", e); - throw new UdfRuntimeException("jdbc get block address: ", e); - } - return outputTable.getMetaAddress(); - } - - public int write(Map<String, String> params) throws UdfRuntimeException { - VectorTable batchTable = VectorTable.createReadableTable(params); - // Can't release or close batchTable, it's released by c++ - try { - insert(batchTable); - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor sql has error: ", e); - } - return batchTable.getNumRows(); - } - - public void openTrans() throws UdfRuntimeException { - try { - if (conn != null) { - conn.setAutoCommit(false); - } - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor open transaction has error: ", e); - } - } - - public void commitTrans() throws UdfRuntimeException { - try { - if (conn != null) { - conn.commit(); - } - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor commit transaction has error: ", e); - } - } - - public void rollbackTrans() throws UdfRuntimeException { - try { - if (conn != null) { - conn.rollback(); - } - } catch (SQLException e) { - throw new UdfRuntimeException("JDBC executor rollback transaction has error: ", e); - } - } - - public List<String> getResultColumnTypeNames() { - return resultColumnTypeNames; - } - - public int getCurBlockRows() { - return curBlockRows; - } - - public boolean hasNext() throws UdfRuntimeException { - try { - if (resultSet == null) { - return false; - } - return resultSet.next(); - } catch (SQLException e) { - throw new UdfRuntimeException("resultSet to get next error: ", e); - } - } - - private void init(JdbcDataSourceConfig config, String sql) throws UdfRuntimeException { - ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader(); - String hikariDataSourceKey = config.createCacheKey(); - try { - if (isNebula()) { - batchSizeNum = config.getBatchSize(); - Class.forName(config.getJdbcDriverClass()); - conn = DriverManager.getConnection(config.getJdbcDriverClass(), config.getJdbcUser(), - config.getJdbcPassword()); - stmt = conn.prepareStatement(sql); - } else { - ClassLoader parent = getClass().getClassLoader(); - ClassLoader classLoader = UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent); - Thread.currentThread().setContextClassLoader(classLoader); - hikariDataSource = JdbcDataSource.getDataSource().getSource(hikariDataSourceKey); - if (hikariDataSource == null) { - synchronized (hikariDataSourceLock) { - hikariDataSource = JdbcDataSource.getDataSource().getSource(hikariDataSourceKey); - if (hikariDataSource == null) { - long start = System.currentTimeMillis(); - HikariDataSource ds = new HikariDataSource(); - ds.setDriverClassName(config.getJdbcDriverClass()); - ds.setJdbcUrl(config.getJdbcUrl()); - ds.setUsername(config.getJdbcUser()); - ds.setPassword(config.getJdbcPassword()); - ds.setMinimumIdle(config.getConnectionPoolMinSize()); // default 1 - ds.setMaximumPoolSize(config.getConnectionPoolMaxSize()); // default 10 - ds.setConnectionTimeout(config.getConnectionPoolMaxWaitTime()); // default 5000 - ds.setMaxLifetime(config.getConnectionPoolMaxLifeTime()); // default 30 min - ds.setIdleTimeout(config.getConnectionPoolMaxLifeTime() / 2L); // default 15 min - setValidationQuery(ds, config.getTableType()); - if (config.isConnectionPoolKeepAlive()) { - ds.setKeepaliveTime(config.getConnectionPoolMaxLifeTime() / 5L); // default 6 min - } - hikariDataSource = ds; - JdbcDataSource.getDataSource().putSource(hikariDataSourceKey, ds); - LOG.info("JdbcClient set" - + " ConnectionPoolMinSize = " + config.getConnectionPoolMinSize() - + ", ConnectionPoolMaxSize = " + config.getConnectionPoolMaxSize() - + ", ConnectionPoolMaxWaitTime = " + config.getConnectionPoolMaxWaitTime() - + ", ConnectionPoolMaxLifeTime = " + config.getConnectionPoolMaxLifeTime() - + ", ConnectionPoolKeepAlive = " + config.isConnectionPoolKeepAlive()); - LOG.info("init datasource [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + ( - System.currentTimeMillis() - start) + " ms"); - } - } - } - - long start = System.currentTimeMillis(); - conn = hikariDataSource.getConnection(); - LOG.info("get connection [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + ( - System.currentTimeMillis() - start) - + " ms"); - if (config.getOp() == TJdbcOperation.READ) { - conn.setAutoCommit(false); - Preconditions.checkArgument(sql != null); - stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - if (tableType == TOdbcTableType.MYSQL) { - stmt.setFetchSize(Integer.MIN_VALUE); - } else { - stmt.setFetchSize(config.getBatchSize()); - } - batchSizeNum = config.getBatchSize(); - } else { - LOG.info("insert sql: " + sql); - preparedStatement = conn.prepareStatement(sql); - } - } - } catch (MalformedURLException e) { - throw new UdfRuntimeException("MalformedURLException to load class about " + config.getJdbcDriverUrl(), e); - } catch (SQLException e) { - throw new UdfRuntimeException("Initialize datasource failed: ", e); - } catch (FileNotFoundException e) { - throw new UdfRuntimeException("FileNotFoundException failed: ", e); - } catch (Exception e) { - throw new UdfRuntimeException("Initialize datasource failed: ", e); - } finally { - Thread.currentThread().setContextClassLoader(oldClassLoader); - } - } - - private void setValidationQuery(HikariDataSource ds, TOdbcTableType tableType) { - if (tableType == TOdbcTableType.ORACLE || tableType == TOdbcTableType.OCEANBASE_ORACLE) { - ds.setConnectionTestQuery("SELECT 1 FROM dual"); - } else if (tableType == TOdbcTableType.SAP_HANA) { - ds.setConnectionTestQuery("SELECT 1 FROM DUMMY"); - } else { - ds.setConnectionTestQuery("SELECT 1"); - } - } - - public boolean isNebula() { - return tableType == TOdbcTableType.NEBULA; - } - - private Class<?> findNonNullClass(Object[] columnData, ColumnType type) { - for (Object data : columnData) { - if (data != null) { - return data.getClass(); - } - } - switch (type.getType()) { - case BOOLEAN: - return Boolean.class; - case TINYINT: - return Byte.class; - case SMALLINT: - return Short.class; - case INT: - return Integer.class; - case BIGINT: - return Long.class; - case LARGEINT: - return BigInteger.class; - case FLOAT: - return Float.class; - case DOUBLE: - return Double.class; - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - return BigDecimal.class; - case DATE: - case DATEV2: - return LocalDate.class; - case DATETIME: - case DATETIMEV2: - return LocalDateTime.class; - case CHAR: - case VARCHAR: - case STRING: - return String.class; - case ARRAY: - return List.class; - default: - throw new IllegalArgumentException( - "Unsupported column type: " + type.getType()); - } - } - - public Object getColumnValue(TOdbcTableType tableType, int columnIndex, boolean isBitmapOrHll) - throws SQLException { - Object result; - if (tableType == TOdbcTableType.NEBULA) { - result = UdfUtils.convertObject((ValueWrapper) resultSet.getObject(columnIndex + 1)); - } else { - result = - isBitmapOrHll - ? resultSet.getBytes(columnIndex + 1) - : resultSet.getObject(columnIndex + 1); - } - return result; - } - - /* - | Type | Java Array Type | - |---------------------------------------------|----------------------------| - | BOOLEAN | Boolean[] | - | TINYINT | Byte[] | - | SMALLINT | Short[] | - | INT | Integer[] | - | BIGINT | Long[] | - | LARGEINT | BigInteger[] | - | FLOAT | Float[] | - | DOUBLE | Double[] | - | DECIMALV2, DECIMAL32, DECIMAL64, DECIMAL128 | BigDecimal[] | - | DATE, DATEV2 | LocalDate[] | - | DATETIME, DATETIMEV2 | LocalDateTime[] | - | CHAR, VARCHAR, STRING | String[] | - | ARRAY | List<Object>[] | - | MAP | Map<Object, Object>[] | - | STRUCT | Map<String, Object>[] | - */ - - private ColumnValueConverter getOutputConverter( - ColumnType columnType, Class clz, String replaceString) { - switch (columnType.getType()) { - case BOOLEAN: - if (Integer.class.equals(clz)) { - return createConverter(input -> ((Integer) input) != 0, Boolean.class); - } - if (Byte.class.equals(clz)) { - return createConverter(input -> ((Byte) input) != 0, Boolean.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> - Boolean.parseBoolean( - String.valueOf(input).equals("1") ? "true" : "false"), - Boolean.class); - } - break; - case TINYINT: - if (Integer.class.equals(clz)) { - return createConverter(input -> ((Integer) input).byteValue(), Byte.class); - } - if (Short.class.equals(clz)) { - return createConverter(input -> ((Short) input).byteValue(), Byte.class); - } - if (Object.class.equals(clz)) { - return createConverter( - input -> (byte) Integer.parseInt(String.valueOf(input)), Byte.class); - } - if (BigDecimal.class.equals(clz)) { - return createConverter(input -> ((BigDecimal) input).byteValue(), Byte.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> Byte.parseByte(String.valueOf(input)), Byte.class); - } - break; - case SMALLINT: - if (Integer.class.equals(clz)) { - return createConverter(input -> ((Integer) input).shortValue(), Short.class); - } - if (BigDecimal.class.equals(clz)) { - return createConverter(input -> ((BigDecimal) input).shortValue(), Short.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> Short.parseShort(String.valueOf(input)), Short.class); - } - if (Byte.class.equals(clz)) { - return createConverter(input -> ((Byte) input).shortValue(), Short.class); - } - if (com.clickhouse.data.value.UnsignedByte.class.equals(clz)) { - return createConverter( - input -> ((UnsignedByte) input).shortValue(), Short.class); - } - break; - case INT: - if (Long.class.equals(clz)) { - return createConverter(input -> ((Long) input).intValue(), Integer.class); - } - if (BigDecimal.class.equals(clz)) { - return createConverter(input -> ((BigDecimal) input).intValue(), Integer.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> Integer.parseInt(String.valueOf(input)), Integer.class); - } - if (Short.class.equals(clz)) { - return createConverter(input -> ((Short) input).intValue(), Integer.class); - } - if (com.clickhouse.data.value.UnsignedShort.class.equals(clz)) { - return createConverter( - input -> ((UnsignedShort) input).intValue(), Integer.class); - } - break; - case BIGINT: - if (BigDecimal.class.equals(clz)) { - return createConverter(input -> ((BigDecimal) input).longValue(), Long.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> Long.parseLong(String.valueOf(input)), Long.class); - } - if (Integer.class.equals(clz)) { - return createConverter(input -> ((Integer) input).longValue(), Long.class); - } - if (com.clickhouse.data.value.UnsignedInteger.class.equals(clz)) { - return createConverter( - input -> ((UnsignedInteger) input).longValue(), Long.class); - } - break; - case LARGEINT: - if (BigDecimal.class.equals(clz)) { - return createConverter( - input -> ((BigDecimal) input).toBigInteger(), BigInteger.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> new BigInteger(String.valueOf(input)), BigInteger.class); - } - if (Long.class.equals(clz)) { - return createConverter( - input -> BigInteger.valueOf((Long) input), BigInteger.class); - } - if (com.clickhouse.data.value.UnsignedLong.class.equals(clz)) { - return createConverter( - input -> ((UnsignedLong) input).bigIntegerValue(), BigInteger.class); - } - break; - case DOUBLE: - if (BigDecimal.class.equals(clz)) { - return createConverter( - input -> ((BigDecimal) input).doubleValue(), Double.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> Double.parseDouble(String.valueOf(input)), Double.class); - } - break; - case FLOAT: - return createConverter( - input -> Float.parseFloat(String.valueOf(input)), Float.class); - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - return createConverter( - input -> new BigDecimal(String.valueOf(input)), BigDecimal.class); - case DATE: - case DATEV2: - if (Date.class.equals(clz)) { - return createConverter(input -> ((Date) input).toLocalDate(), LocalDate.class); - } - if (Timestamp.class.equals(clz)) { - return createConverter( - input -> ((Timestamp) input).toLocalDateTime().toLocalDate(), - LocalDate.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> LocalDate.parse(String.valueOf(input)), LocalDate.class); - } - break; - case DATETIME: - case DATETIMEV2: - if (Timestamp.class.equals(clz)) { - return createConverter( - input -> ((Timestamp) input).toLocalDateTime(), LocalDateTime.class); - } - if (OffsetDateTime.class.equals(clz)) { - return createConverter( - input -> ((OffsetDateTime) input).toLocalDateTime(), - LocalDateTime.class); - } - if (oracle.sql.TIMESTAMP.class.equals(clz)) { - return createConverter( - input -> { - try { - return ((oracle.sql.TIMESTAMP) input) - .timestampValue() - .toLocalDateTime(); - } catch (SQLException e) { - throw new RuntimeException(e); - } - }, - LocalDateTime.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> - LocalDateTime.parse( - String.valueOf(input), - getDateTimeFormatter(String.valueOf(input))), - LocalDateTime.class); - } - break; - case CHAR: - return createConverter( - input -> trimSpaces(tableType, input.toString()), String.class); - case VARCHAR: - case STRING: - if (byte[].class.equals(clz)) { - if (replaceString.equals("bitmap") || replaceString.equals("hll")) { - break; - } else { - return createConverter( - input -> byteArrayToHexString(tableType, (byte[]) input), - String.class); - } - } - if (Time.class.equals(clz)) { - return createConverter( - input -> timeToString((java.sql.Time) input), String.class); - } - if (oracle.sql.CLOB.class.equals(clz)) { - return createConverter( - input -> { - try { - oracle.sql.CLOB clob = (oracle.sql.CLOB) input; - return clob.getSubString(1, (int) clob.length()); - } catch (SQLException e) { - throw new RuntimeException(e); - } - }, - String.class); - } - if (java.net.Inet4Address.class.equals(clz)) { - return createConverter( - input -> ((InetAddress) input).getHostAddress(), String.class); - } - if (java.net.Inet6Address.class.equals(clz)) { - return createConverter( - input -> { - String inetAddress = ((InetAddress) input).getHostAddress(); - return simplifyIPv6Address(inetAddress); - }, - String.class); - } else { - return createConverter(Object::toString, String.class); - } - case ARRAY: - if (java.sql.Array.class.equals(clz)) { - return createConverter( - input -> { - try { - return Arrays.asList( - (Object[]) ((java.sql.Array) input).getArray()); - } catch (SQLException e) { - throw new RuntimeException(e); - } - }, - List.class); - } - if (String.class.equals(clz)) { - return createConverter( - input -> { - List<Object> list = parseArray(String.valueOf(input)); - return convertArray(list, columnType.getChildTypes().get(0)); - }, - List.class); - } - if (tableType == TOdbcTableType.CLICKHOUSE) { - return createConverter( - input -> { - List<Object> list = convertClickHouseArray(input); - return convertArray(list, columnType.getChildTypes().get(0)); - }, - List.class); - } - break; - default: - throw new IllegalArgumentException( - "Unsupported column type: " + columnType.getType()); - } - return null; - } - - private ColumnValueConverter createConverter( - Function<Object, ?> converterFunction, Class<?> type) { - return (Object[] columnData) -> { - Object[] result = (Object[]) Array.newInstance(type, columnData.length); - for (int i = 0; i < columnData.length; i++) { - result[i] = columnData[i] != null ? converterFunction.apply(columnData[i]) : null; - } - return result; - }; - } - - private String byteArrayToHexString(TOdbcTableType tableType, byte[] columnData) { - if (tableType == TOdbcTableType.MYSQL || tableType == TOdbcTableType.OCEANBASE) { - return mysqlByteArrayToHexString(columnData); - } else if (tableType == TOdbcTableType.POSTGRESQL) { - return pgByteArrayToHexString(columnData); - } else { - return defaultByteArrayToHexString(columnData); - } - } - - private String mysqlByteArrayToHexString(byte[] bytes) { - StringBuilder hexString = new StringBuilder("0x"); - for (byte b : bytes) { - String hex = Integer.toHexString(0xFF & b); - if (hex.length() == 1) { - hexString.append('0'); - } - hexString.append(hex.toUpperCase()); - } - return hexString.toString(); - } - - private static String pgByteArrayToHexString(byte[] bytes) { - StringBuilder hexString = new StringBuilder("\\x"); - for (byte b : bytes) { - hexString.append(String.format("%02x", b & 0xff)); - } - return hexString.toString(); - } - - private String defaultByteArrayToHexString(byte[] bytes) { - StringBuilder hexString = new StringBuilder(); - for (byte b : bytes) { - String hex = Integer.toHexString(0xFF & b); - if (hex.length() == 1) { - hexString.append('0'); - } - hexString.append(hex.toUpperCase()); - } - return hexString.toString(); - } - - private String trimSpaces(TOdbcTableType tableType, String str) { - if (tableType == TOdbcTableType.POSTGRESQL || tableType == TOdbcTableType.ORACLE) { - int end = str.length() - 1; - while (end >= 0 && str.charAt(end) == ' ') { - end--; - } - return str.substring(0, end + 1); - } else { - return str; - } - } - - public String timeToString(java.sql.Time time) { - long milliseconds = time.getTime() % 1000L; - if (milliseconds > 0) { - return String.format("%s.%03d", time, milliseconds); - } else { - return time.toString(); - } - } - - private List<Object> convertArray(List<Object> list, ColumnType childType) { - Class<?> clz = Object.class; - for (Object data : list) { - if (data != null) { - clz = data.getClass(); - break; - } - } - List<Object> convertedList = new ArrayList<>(list.size()); - ColumnValueConverter converter = getOutputConverter(childType, clz, "not_replace"); - for (Object element : list) { - if (childType.isComplexType()) { - convertedList.add(convertArray((List<Object>) element, childType)); - } else { - if (converter != null) { - convertedList.add(converter.convert(new Object[] {element})[0]); - } else { - convertedList.add(element); - } - } - } - return convertedList; - } - - private static String simplifyIPv6Address(String address) { - // Replace longest sequence of zeros with "::" - String[] parts = address.split(":"); - int longestSeqStart = -1; - int longestSeqLen = 0; - int curSeqStart = -1; - int curSeqLen = 0; - for (int i = 0; i < parts.length; i++) { - if (parts[i].equals("0")) { - if (curSeqStart == -1) { - curSeqStart = i; - } - curSeqLen++; - if (curSeqLen > longestSeqLen) { - longestSeqStart = curSeqStart; - longestSeqLen = curSeqLen; - } - } else { - curSeqStart = -1; - curSeqLen = 0; - } - } - if (longestSeqLen <= 1) { - return address; // No sequences of zeros to replace - } - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < longestSeqStart; i++) { - sb.append(parts[i]).append(':'); - } - sb.append(':'); - for (int i = longestSeqStart + longestSeqLen; i < parts.length; i++) { - sb.append(parts[i]); - if (i < parts.length - 1) { - sb.append(':'); - } - } - return sb.toString(); - } - - private static final Pattern MILLIS_PATTERN = Pattern.compile("(\\.\\d+)"); - - public static DateTimeFormatter getDateTimeFormatter(String dateTimeString) { - Matcher matcher = MILLIS_PATTERN.matcher(dateTimeString); - int fractionDigits = 0; - if (matcher.find()) { - fractionDigits = matcher.group(1).length() - 1; // Subtract 1 to exclude the dot - } - fractionDigits = Math.min(fractionDigits, 6); // Limit the fraction digits to 6 - - return new DateTimeFormatterBuilder() - .appendPattern("yyyy-MM-dd HH:mm:ss") - .appendFraction(ChronoField.MILLI_OF_SECOND, fractionDigits, fractionDigits, true) - .toFormatter(); - } - - private static final Map<Class<?>, Function<Object, List<Object>>> CK_ARRAY_CONVERTERS = - new HashMap<>(); - - static { - CK_ARRAY_CONVERTERS.put(String[].class, res -> Arrays.asList((String[]) res)); - CK_ARRAY_CONVERTERS.put(boolean[].class, res -> toList((boolean[]) res)); - CK_ARRAY_CONVERTERS.put(Boolean[].class, res -> Arrays.asList((Boolean[]) res)); - CK_ARRAY_CONVERTERS.put(byte[].class, res -> toList((byte[]) res)); - CK_ARRAY_CONVERTERS.put(Byte[].class, res -> Arrays.asList((Byte[]) res)); - CK_ARRAY_CONVERTERS.put(LocalDate[].class, res -> Arrays.asList((LocalDate[]) res)); - CK_ARRAY_CONVERTERS.put(LocalDateTime[].class, res -> Arrays.asList((LocalDateTime[]) res)); - CK_ARRAY_CONVERTERS.put(float[].class, res -> toList((float[]) res)); - CK_ARRAY_CONVERTERS.put(Float[].class, res -> Arrays.asList((Float[]) res)); - CK_ARRAY_CONVERTERS.put(double[].class, res -> toList((double[]) res)); - CK_ARRAY_CONVERTERS.put(Double[].class, res -> Arrays.asList((Double[]) res)); - CK_ARRAY_CONVERTERS.put(short[].class, res -> toList((short[]) res)); - CK_ARRAY_CONVERTERS.put(Short[].class, res -> Arrays.asList((Short[]) res)); - CK_ARRAY_CONVERTERS.put(int[].class, res -> toList((int[]) res)); - CK_ARRAY_CONVERTERS.put(Integer[].class, res -> Arrays.asList((Integer[]) res)); - CK_ARRAY_CONVERTERS.put(long[].class, res -> toList((long[]) res)); - CK_ARRAY_CONVERTERS.put(Long[].class, res -> Arrays.asList((Long[]) res)); - CK_ARRAY_CONVERTERS.put(BigInteger[].class, res -> Arrays.asList((BigInteger[]) res)); - CK_ARRAY_CONVERTERS.put(BigDecimal[].class, res -> Arrays.asList((BigDecimal[]) res)); - CK_ARRAY_CONVERTERS.put( - Inet4Address[].class, - res -> - Arrays.stream((Inet4Address[]) res) - .map(addr -> addr == null ? null : addr.getHostAddress()) - .collect(Collectors.toList())); - CK_ARRAY_CONVERTERS.put( - Inet6Address[].class, - res -> - Arrays.stream((Inet6Address[]) res) - .map(addr -> addr == null ? null : simplifyIPv6Address(addr.getHostAddress())) - .collect(Collectors.toList())); - CK_ARRAY_CONVERTERS.put(UUID[].class, res -> Arrays.asList((UUID[]) res)); - CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedByte[].class, - res -> Arrays.asList((com.clickhouse.data.value.UnsignedByte[]) res)); - CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedShort[].class, - res -> Arrays.asList((com.clickhouse.data.value.UnsignedShort[]) res)); - CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedInteger[].class, - res -> Arrays.asList((com.clickhouse.data.value.UnsignedInteger[]) res)); - CK_ARRAY_CONVERTERS.put(com.clickhouse.data.value.UnsignedLong[].class, - res -> Arrays.asList((com.clickhouse.data.value.UnsignedLong[]) res)); - } - - public static List<Object> convertClickHouseArray(Object obj) { - Function<Object, List<Object>> converter = CK_ARRAY_CONVERTERS.get(obj.getClass()); - return converter != null ? converter.apply(obj) : Collections.singletonList(obj); - } - - private static <T> List<Object> toList(T array) { - if (array instanceof Object[]) { - return Arrays.asList((Object[]) array); - } - int length = Array.getLength(array); - List<Object> list = new ArrayList<>(length); - for (int i = 0; i < length; i++) { - list.add(Array.get(array, i)); - } - return list; - } - - private static final Pattern ARRAY_PATTERN = Pattern.compile("\"([^\"]*)\"|([^,]+)"); - - private static List<Object> parseArray(String input) { - String trimmedInput = input.substring(1, input.length() - 1); - List<Object> list = new ArrayList<>(); - Matcher matcher = ARRAY_PATTERN.matcher(trimmedInput); - while (matcher.find()) { - if (matcher.group(1) != null) { - list.add(matcher.group(1)); - } else { - list.add(matcher.group(2)); - } - } - return list; - } - - private int insert(VectorTable data) throws SQLException { - for (int i = 0; i < data.getNumRows(); ++i) { - for (int j = 0; j < data.getColumns().length; ++j) { - insertColumn(i, j, data.getColumns()[j]); - } - preparedStatement.addBatch(); - } - preparedStatement.executeBatch(); - preparedStatement.clearBatch(); - return data.getNumRows(); - } - - private void insertColumn(int rowIdx, int colIdx, VectorColumn column) throws SQLException { - int parameterIndex = colIdx + 1; - ColumnType.Type dorisType = column.getColumnTyp(); - if (column.isNullAt(rowIdx)) { - insertNullColumn(parameterIndex, dorisType); - return; - } - switch (dorisType) { - case BOOLEAN: - preparedStatement.setBoolean(parameterIndex, column.getBoolean(rowIdx)); - break; - case TINYINT: - preparedStatement.setByte(parameterIndex, column.getByte(rowIdx)); - break; - case SMALLINT: - preparedStatement.setShort(parameterIndex, column.getShort(rowIdx)); - break; - case INT: - preparedStatement.setInt(parameterIndex, column.getInt(rowIdx)); - break; - case BIGINT: - preparedStatement.setLong(parameterIndex, column.getLong(rowIdx)); - break; - case LARGEINT: - preparedStatement.setObject(parameterIndex, column.getBigInteger(rowIdx)); - break; - case FLOAT: - preparedStatement.setFloat(parameterIndex, column.getFloat(rowIdx)); - break; - case DOUBLE: - preparedStatement.setDouble(parameterIndex, column.getDouble(rowIdx)); - break; - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - preparedStatement.setBigDecimal(parameterIndex, column.getDecimal(rowIdx)); - break; - case DATEV2: - preparedStatement.setDate(parameterIndex, Date.valueOf(column.getDate(rowIdx))); - break; - case DATETIMEV2: - preparedStatement.setTimestamp( - parameterIndex, Timestamp.valueOf(column.getDateTime(rowIdx))); - break; - case CHAR: - case VARCHAR: - case STRING: - case BINARY: - preparedStatement.setString(parameterIndex, column.getStringWithOffset(rowIdx)); - break; - default: - throw new RuntimeException("Unknown type value: " + dorisType); - } - } - - private void insertNullColumn(int parameterIndex, ColumnType.Type dorisType) - throws SQLException { - switch (dorisType) { - case BOOLEAN: - preparedStatement.setNull(parameterIndex, Types.BOOLEAN); - break; - case TINYINT: - preparedStatement.setNull(parameterIndex, Types.TINYINT); - break; - case SMALLINT: - preparedStatement.setNull(parameterIndex, Types.SMALLINT); - break; - case INT: - preparedStatement.setNull(parameterIndex, Types.INTEGER); - break; - case BIGINT: - preparedStatement.setNull(parameterIndex, Types.BIGINT); - break; - case LARGEINT: - preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT); - break; - case FLOAT: - preparedStatement.setNull(parameterIndex, Types.FLOAT); - break; - case DOUBLE: - preparedStatement.setNull(parameterIndex, Types.DOUBLE); - break; - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: - preparedStatement.setNull(parameterIndex, Types.DECIMAL); - break; - case DATEV2: - preparedStatement.setNull(parameterIndex, Types.DATE); - break; - case DATETIMEV2: - preparedStatement.setNull(parameterIndex, Types.TIMESTAMP); - break; - case CHAR: - case VARCHAR: - case STRING: - case BINARY: - preparedStatement.setNull(parameterIndex, Types.VARCHAR); - break; - default: - throw new RuntimeException("Unknown type value: " + dorisType); - } - } -} diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java index 5f60745fc4d..b4787bebabb 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutorFactory.java @@ -42,7 +42,7 @@ public class JdbcExecutorFactory { case PRESTO: return "org/apache/doris/jdbc/TrinoJdbcExecutor"; default: - return "org/apache/doris/jdbc/DefaultJdbcExecutor"; + throw new IllegalArgumentException("Unsupported jdbc type: " + type); } } } diff --git a/fe/be-java-extensions/preload-extensions/pom.xml b/fe/be-java-extensions/preload-extensions/pom.xml index 3627b912d36..8cc11473fdd 100644 --- a/fe/be-java-extensions/preload-extensions/pom.xml +++ b/fe/be-java-extensions/preload-extensions/pom.xml @@ -199,25 +199,10 @@ under the License. </exclusions> </dependency> <!-- For JDBC Scanner PreLoad--> - <dependency> - <groupId>com.oracle.database.jdbc</groupId> - <artifactId>ojdbc8</artifactId> - </dependency> <dependency> <groupId>com.zaxxer</groupId> <artifactId>HikariCP</artifactId> </dependency> - <dependency> - <groupId>com.clickhouse</groupId> - <artifactId>clickhouse-jdbc</artifactId> - <classifier>all</classifier> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>com.oracle.ojdbc</groupId> - <artifactId>orai18n</artifactId> - <version>19.3.0.0</version> - </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>hive-catalog-shade</artifactId> diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 95522503691..c411c6d1143 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -65,7 +65,6 @@ import java.util.Map; public class JdbcResource extends Resource { private static final Logger LOG = LogManager.getLogger(JdbcResource.class); - public static final String JDBC_NEBULA = "jdbc:nebula"; public static final String JDBC_MYSQL = "jdbc:mysql"; public static final String JDBC_MARIADB = "jdbc:mariadb"; public static final String JDBC_POSTGRESQL = "jdbc:postgresql"; @@ -78,7 +77,6 @@ public class JdbcResource extends Resource { public static final String JDBC_OCEANBASE = "jdbc:oceanbase"; public static final String JDBC_DB2 = "jdbc:db2"; - public static final String NEBULA = "NEBULA"; public static final String MYSQL = "MYSQL"; public static final String POSTGRESQL = "POSTGRESQL"; public static final String ORACLE = "ORACLE"; @@ -323,8 +321,6 @@ public class JdbcResource extends Resource { return PRESTO; } else if (url.startsWith(JDBC_OCEANBASE)) { return OCEANBASE; - } else if (url.startsWith(JDBC_NEBULA)) { - return NEBULA; } else if (url.startsWith(JDBC_DB2)) { return DB2; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java index 4d3ded2540e..d81b82fd3eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java @@ -92,7 +92,6 @@ public class JdbcTable extends Table { static { Map<String, TOdbcTableType> tempMap = new CaseInsensitiveMap(); - tempMap.put("nebula", TOdbcTableType.NEBULA); tempMap.put("mysql", TOdbcTableType.MYSQL); tempMap.put("postgresql", TOdbcTableType.POSTGRESQL); tempMap.put("sqlserver", TOdbcTableType.SQLSERVER); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java index 58ab0f9d226..0d292100fe0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java @@ -98,7 +98,6 @@ public class JdbcScanNode extends ExternalScanNode { @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); - getGraphQueryString(); } /** @@ -112,25 +111,6 @@ public class JdbcScanNode extends ExternalScanNode { cardinality = (long) statsDeriveResult.getRowCount(); } - private boolean isNebula() { - return jdbcType == TOdbcTableType.NEBULA; - } - - private void getGraphQueryString() { - if (!isNebula()) { - return; - } - for (Expr expr : conjuncts) { - FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr; - if ("g".equals(functionCallExpr.getFnName().getFunction())) { - graphQueryString = functionCallExpr.getChild(0).getStringValue(); - break; - } - } - // clean conjusts cause graph sannnode no need conjuncts - conjuncts = Lists.newArrayList(); - } - private void createJdbcFilters() { if (conjuncts.isEmpty()) { return; @@ -194,9 +174,6 @@ public class JdbcScanNode extends ExternalScanNode { } private String getJdbcQueryStr() { - if (isNebula()) { - return graphQueryString; - } StringBuilder sql = new StringBuilder("SELECT "); // Oracle use the where clause to do top n diff --git a/fe/pom.xml b/fe/pom.xml index ba0679370ba..5dcdabfb331 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -259,7 +259,6 @@ under the License. <json-simple.version>1.1.1</json-simple.version> <junit.version>5.8.2</junit.version> <hikaricp.version>4.0.3</hikaricp.version> - <clickhouse.version>0.4.6</clickhouse.version> <thrift.version>0.16.0</thrift.version> <tomcat-embed-core.version>8.5.86</tomcat-embed-core.version> <log4j2.version>2.18.0</log4j2.version> @@ -335,7 +334,6 @@ under the License. <project.scm.id>github</project.scm.id> <spring.version>2.7.13</spring.version> <orc.version>1.8.4</orc.version> - <ojdbc8.version>12.2.0.1</ojdbc8.version> <zookeeper.version>3.9.1</zookeeper.version> <velocity-engine-core.version>2.3</velocity-engine-core.version> <ranger-plugins-common.version>2.4.0</ranger-plugins-common.version> @@ -345,7 +343,6 @@ under the License. <jettison.version>1.5.4</jettison.version> <jetty.version>9.4.53.v20231009</jetty.version> <immutables.version>2.9.3</immutables.version> - <vesoft.client.version>3.0.0</vesoft.client.version> <!--todo waiting release--> <quartz.version>2.3.2</quartz.version> <!-- paimon --> @@ -741,11 +738,6 @@ under the License. <artifactId>json-simple</artifactId> <version>${json-simple.version}</version> </dependency> - <dependency> - <groupId>com.oracle.database.jdbc</groupId> - <artifactId>ojdbc8</artifactId> - <version>${ojdbc8.version}</version> - </dependency> <!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-engine --> <dependency> <groupId>org.junit.jupiter</groupId> @@ -1536,13 +1528,6 @@ under the License. <artifactId>HikariCP</artifactId> <version>${hikaricp.version}</version> </dependency> - <dependency> - <groupId>com.clickhouse</groupId> - <artifactId>clickhouse-jdbc</artifactId> - <version>${clickhouse.version}</version> - <scope>provided</scope> - <classifier>all</classifier> - </dependency> <!-- https://mvnrepository.com/artifact/joda-time/joda-time --> <dependency> <groupId>joda-time</groupId> @@ -1555,11 +1540,6 @@ under the License. <artifactId>tomcat-embed-core</artifactId> <version>${tomcat-embed-core.version}</version> </dependency> - <dependency> - <groupId>com.vesoft</groupId> - <artifactId>client</artifactId> - <version>${vesoft.client.version}</version> - </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index d8953e7dd7a..dbab5c61caa 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -399,7 +399,6 @@ enum TOdbcTableType { PRESTO, OCEANBASE, OCEANBASE_ORACLE, - NEBULA, DB2 } diff --git a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy index 7964b7e9370..51dcd1436e5 100644 --- a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy +++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy @@ -949,25 +949,25 @@ suite("test_jdbc_query_mysql", "p0,external,mysql,external_docker,external_docke } sql """alter resource $jdbcResourceMysql57 properties("password" = "123456")""" - // test for type check - sql """ drop table if exists ${exMysqlTypeTable} """ - sql """ - CREATE EXTERNAL TABLE ${exMysqlTypeTable} ( - `id` bigint NOT NULL, - `count_value` varchar(100) NULL - ) ENGINE=JDBC - COMMENT "JDBC Mysql 外部表" - PROPERTIES ( - "resource" = "$jdbcResourceMysql57", - "table" = "ex_tb2", - "table_type"="mysql" - ); - """ - - test { - sql """select * from ${exMysqlTypeTable} order by id""" - exception "Fail to convert jdbc type of java.lang.Integer to doris type BIGINT on column: id" - } +// // test for type check +// sql """ drop table if exists ${exMysqlTypeTable} """ +// sql """ +// CREATE EXTERNAL TABLE ${exMysqlTypeTable} ( +// `id` bigint NOT NULL, +// `count_value` varchar(100) NULL +// ) ENGINE=JDBC +// COMMENT "JDBC Mysql 外部表" +// PROPERTIES ( +// "resource" = "$jdbcResourceMysql57", +// "table" = "ex_tb2", +// "table_type"="mysql" +// ); +// """ +// +// test { +// sql """select * from ${exMysqlTypeTable} order by id""" +// exception "Fail to convert jdbc type of java.lang.Integer to doris type BIGINT on column: id" +// } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org