This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 56fa38de1d [Enhencement](JDBC Catalog) refactor jdbc catalog insert
logic (#19950)
56fa38de1d is described below
commit 56fa38de1da518009b1ad77b8e5b28430469bea8
Author: zy-kkk <[email protected]>
AuthorDate: Tue May 30 22:03:39 2023 +0800
[Enhencement](JDBC Catalog) refactor jdbc catalog insert logic (#19950)
This PR refactors the old way of writing data to JDBC External Table & JDBC
Catalog, mainly including the following tasks
1. Continuing the work of @BePPPower 's PR #18594, changing the logic of
splicing Inster sql to operating off-heap memory and using
preparedStatement.set to write data logic to complete
2. Supplement the support written by largeint type, mainly to adapt to
Java.Math.BigInteger, which uses binary operations
3. Delete the splicing SQL logic in the JDBC External Table & JDBC Catalog
related written code
ToDo: Binary type,like bit,binary, blob...
Finally, special thanks to @BePPPower , @AshinGau for his work
Co-authored-by: Tiewei Fang <[email protected]>
---
be/src/exec/odbc_connector.h | 6 +
be/src/exec/table_connector.cpp | 95 ++-------------
be/src/exec/table_connector.h | 20 +---
be/src/vec/exec/jni_connector.cpp | 3 +
be/src/vec/exec/vjdbc_connector.cpp | 16 +--
be/src/vec/exec/vjdbc_connector.h | 7 +-
be/src/vec/sink/vjdbc_table_sink.cpp | 4 +-
be/src/vec/sink/vodbc_table_sink.cpp | 2 +-
.../java/org/apache/doris/catalog/JdbcTable.java | 14 +++
.../org/apache/doris/planner/JdbcTableSink.java | 3 +
.../org/apache/doris/hudi/HudiColumnValue.java | 6 +
.../java/org/apache/doris/jni/MockJniScanner.java | 6 +
.../apache/doris/jni/utils/TypeNativeBytes.java | 23 ++++
.../java/org/apache/doris/jni/vec/ColumnType.java | 4 +
.../java/org/apache/doris/jni/vec/ColumnValue.java | 3 +
.../org/apache/doris/jni/vec/ScanPredicate.java | 8 ++
.../org/apache/doris/jni/vec/VectorColumn.java | 32 +++++
.../java/org/apache/doris/udf/JdbcExecutor.java | 131 ++++++++++++++++++++-
.../java/org/apache/doris/jni/JniScannerTest.java | 4 +-
gensrc/thrift/DataSinks.thrift | 1 +
20 files changed, 268 insertions(+), 120 deletions(-)
diff --git a/be/src/exec/odbc_connector.h b/be/src/exec/odbc_connector.h
index 90ef031a10..06bc97bba9 100644
--- a/be/src/exec/odbc_connector.h
+++ b/be/src/exec/odbc_connector.h
@@ -72,6 +72,12 @@ public:
Status exec_write_sql(const std::u16string& insert_stmt,
const fmt::memory_buffer& insert_stmt_buffer)
override;
+ Status exec_stmt_write(vectorized::Block* block,
+ const vectorized::VExprContextSPtrs&
_output_vexpr_ctxs,
+ uint32_t* num_rows_sent) override {
+ return Status::OK();
+ }
+
// use in ODBC transaction
Status begin_trans() override; // should be call after connect and before
query or init_to_write
Status abort_trans() override; // should be call after transaction abort
diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp
index 667618df82..9eb1fface1 100644
--- a/be/src/exec/table_connector.cpp
+++ b/be/src/exec/table_connector.cpp
@@ -98,25 +98,11 @@ std::u16string TableConnector::utf8_to_u16string(const
char* first, const char*
Status TableConnector::append(const std::string& table_name,
vectorized::Block* block,
const vectorized::VExprContextSPtrs&
output_vexpr_ctxs,
- uint32_t start_send_row, uint32_t* num_rows_sent,
+ uint32_t start_send_row, uint32_t*
num_rows_sent, bool is_odbc,
TOdbcTableType::type table_type) {
- _insert_stmt_buffer.clear();
- std::u16string insert_stmt;
- if (table_type == TOdbcTableType::ORACLE) {
- SCOPED_TIMER(_convert_tuple_timer);
- oracle_type_append(table_name, block, output_vexpr_ctxs,
start_send_row, num_rows_sent,
- table_type);
- // Translate utf8 string to utf16 to use unicode encoding
- insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(),
- _insert_stmt_buffer.data() +
_insert_stmt_buffer.size());
- } else if (table_type == TOdbcTableType::SAP_HANA) {
- SCOPED_TIMER(_convert_tuple_timer);
- sap_hana_type_append(table_name, block, output_vexpr_ctxs,
start_send_row, num_rows_sent,
- table_type);
- // Translate utf8 string to utf16 to use unicode encoding
- insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(),
- _insert_stmt_buffer.data() +
_insert_stmt_buffer.size());
- } else {
+ if (is_odbc) {
+ _insert_stmt_buffer.clear();
+ std::u16string insert_stmt;
SCOPED_TIMER(_convert_tuple_timer);
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (",
table_name);
@@ -147,74 +133,15 @@ Status TableConnector::append(const std::string&
table_name, vectorized::Block*
// Translate utf8 string to utf16 to use unicode encoding
insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(),
_insert_stmt_buffer.data() +
_insert_stmt_buffer.size());
- }
- RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer));
- COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
- return Status::OK();
-}
-
-Status TableConnector::oracle_type_append(const std::string& table_name,
vectorized::Block* block,
- const vectorized::VExprContextSPtrs&
output_vexpr_ctxs,
- uint32_t start_send_row, uint32_t*
num_rows_sent,
- TOdbcTableType::type table_type) {
- fmt::format_to(_insert_stmt_buffer, "INSERT ALL ");
- int num_rows = block->rows();
- int num_columns = block->columns();
- for (int i = start_send_row; i < num_rows; ++i) {
- (*num_rows_sent)++;
- fmt::format_to(_insert_stmt_buffer, "INTO {} VALUES (", table_name);
- // Construct insert statement of odbc/jdbc table
- for (int j = 0; j < num_columns; ++j) {
- if (j != 0) {
- fmt::format_to(_insert_stmt_buffer, "{}", ", ");
- }
- auto& column_ptr = block->get_by_position(j).column;
- auto& type_ptr = block->get_by_position(j).type;
- RETURN_IF_ERROR(convert_column_data(
- column_ptr, type_ptr,
output_vexpr_ctxs[j]->root()->type(), i, table_type));
- }
-
- if (i < num_rows - 1 && _insert_stmt_buffer.size() <
INSERT_BUFFER_SIZE) {
- fmt::format_to(_insert_stmt_buffer, "{}", ") ");
- } else {
- // batch exhausted or _insert_stmt_buffer is full, need to do real
insert stmt
- fmt::format_to(_insert_stmt_buffer, "{}", ") SELECT 1 FROM DUAL");
- break;
- }
- }
- return Status::OK();
-}
-Status TableConnector::sap_hana_type_append(const std::string& table_name,
vectorized::Block* block,
- const
vectorized::VExprContextSPtrs& output_vexpr_ctxs,
- uint32_t start_send_row, uint32_t*
num_rows_sent,
- TOdbcTableType::type table_type) {
- fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} ", table_name);
- int num_rows = block->rows();
- int num_columns = block->columns();
- for (int i = start_send_row; i < num_rows; ++i) {
- (*num_rows_sent)++;
- fmt::format_to(_insert_stmt_buffer, "SELECT ");
- // Construct insert statement of odbc/jdbc table
- for (int j = 0; j < num_columns; ++j) {
- if (j != 0) {
- fmt::format_to(_insert_stmt_buffer, "{}", ", ");
- }
- auto& column_ptr = block->get_by_position(j).column;
- auto& type_ptr = block->get_by_position(j).type;
- RETURN_IF_ERROR(convert_column_data(
- column_ptr, type_ptr,
output_vexpr_ctxs[j]->root()->type(), i, table_type));
- }
-
- if (i < num_rows - 1 && _insert_stmt_buffer.size() <
INSERT_BUFFER_SIZE) {
- fmt::format_to(_insert_stmt_buffer, "{}", " FROM dummy UNION ALL
");
- } else {
- // batch exhausted or _insert_stmt_buffer is full, need to do real
insert stmt
- fmt::format_to(_insert_stmt_buffer, "{}", " FROM dummy");
- break;
- }
+ RETURN_IF_ERROR(exec_write_sql(insert_stmt, _insert_stmt_buffer));
+ COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
+ return Status::OK();
+ } else {
+ RETURN_IF_ERROR(exec_stmt_write(block, output_vexpr_ctxs,
num_rows_sent));
+ COUNTER_UPDATE(_sent_rows_counter, *num_rows_sent);
+ return Status::OK();
}
- return Status::OK();
}
Status TableConnector::convert_column_data(const vectorized::ColumnPtr&
column_ptr,
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index d8c6e01075..d8376259b7 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -54,13 +54,18 @@ public:
virtual Status abort_trans() = 0; // should be call after transaction
abort
virtual Status finish_trans() = 0; // should be call after transaction
commit
+ virtual Status exec_stmt_write(vectorized::Block* block,
+ const vectorized::VExprContextSPtrs&
_output_vexpr_ctxs,
+ uint32_t* num_rows_sent) = 0;
+
virtual Status exec_write_sql(const std::u16string& insert_stmt,
const fmt::memory_buffer&
_insert_stmt_buffer) = 0;
//write data into table vectorized
Status append(const std::string& table_name, vectorized::Block* block,
const vectorized::VExprContextSPtrs& _output_vexpr_ctxs,
uint32_t start_send_row,
- uint32_t* num_rows_sent, TOdbcTableType::type table_type =
TOdbcTableType::MYSQL);
+ uint32_t* num_rows_sent, bool is_odbc,
+ TOdbcTableType::type table_type = TOdbcTableType::MYSQL);
void init_profile(RuntimeProfile*);
@@ -88,19 +93,6 @@ protected:
RuntimeProfile::Counter* _result_send_timer = nullptr;
// number of sent rows
RuntimeProfile::Counter* _sent_rows_counter = nullptr;
-
-private:
- // Because Oracle and SAP Hana database do not support
- // insert into tables values (...),(...);
- // Here we do something special for Oracle and SAP Hana.
- Status oracle_type_append(const std::string& table_name,
vectorized::Block* block,
- const vectorized::VExprContextSPtrs&
output_vexpr_ctxs,
- uint32_t start_send_row, uint32_t* num_rows_sent,
- TOdbcTableType::type table_type);
- Status sap_hana_type_append(const std::string& table_name,
vectorized::Block* block,
- const vectorized::VExprContextSPtrs&
output_vexpr_ctxs,
- uint32_t start_send_row, uint32_t*
num_rows_sent,
- TOdbcTableType::type table_type);
};
} // namespace doris
diff --git a/be/src/vec/exec/jni_connector.cpp
b/be/src/vec/exec/jni_connector.cpp
index ad984042c4..fcddbad134 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -50,6 +50,7 @@ namespace doris::vectorized {
M(TypeIndex::UInt32, UInt32) \
M(TypeIndex::Int64, Int64) \
M(TypeIndex::UInt64, UInt64) \
+ M(TypeIndex::Int128, Int128) \
M(TypeIndex::Float32, Float32) \
M(TypeIndex::Float64, Float64)
@@ -281,6 +282,8 @@ std::string JniConnector::get_hive_type(const
TypeDescriptor& desc) {
return "int";
case TYPE_BIGINT:
return "bigint";
+ case TYPE_LARGEINT:
+ return "largeint";
case TYPE_FLOAT:
return "float";
case TYPE_DOUBLE:
diff --git a/be/src/vec/exec/vjdbc_connector.cpp
b/be/src/vec/exec/vjdbc_connector.cpp
index 613c76f8f8..df1bba2d83 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -703,19 +703,8 @@ Status JdbcConnector::_cast_string_to_array(const
SlotDescriptor* slot_desc, Blo
return Status::OK();
}
-Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt,
- const fmt::memory_buffer&
insert_stmt_buffer) {
- SCOPED_TIMER(_result_send_timer);
- JNIEnv* env = nullptr;
- RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
- jstring query_sql = env->NewString((const jchar*)insert_stmt.c_str(),
insert_stmt.size());
- env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz,
_executor_write_id, query_sql);
- env->DeleteLocalRef(query_sql);
- RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
- return Status::OK();
-}
-
-Status JdbcConnector::exec_stmt_write(Block* block, const VExprContextSPtrs&
output_vexpr_ctxs) {
+Status JdbcConnector::exec_stmt_write(Block* block, const VExprContextSPtrs&
output_vexpr_ctxs,
+ uint32_t* num_rows_sent) {
SCOPED_TIMER(_result_send_timer);
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
@@ -766,6 +755,7 @@ Status JdbcConnector::exec_stmt_write(Block* block, const
VExprContextSPtrs& out
hashmap_object);
env->DeleteLocalRef(hashmap_object);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
+ *num_rows_sent = block->rows();
return Status::OK();
}
diff --git a/be/src/vec/exec/vjdbc_connector.h
b/be/src/vec/exec/vjdbc_connector.h
index 38c71d0143..fc21078305 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -76,9 +76,12 @@ public:
Status query() override;
Status exec_write_sql(const std::u16string& insert_stmt,
- const fmt::memory_buffer& insert_stmt_buffer)
override;
+ const fmt::memory_buffer& insert_stmt_buffer)
override {
+ return Status::OK();
+ }
- Status exec_stmt_write(Block* block, const VExprContextSPtrs&
output_vexpr_ctxs);
+ Status exec_stmt_write(Block* block, const VExprContextSPtrs&
output_vexpr_ctxs,
+ uint32_t* num_rows_sent) override;
Status get_next(bool* eos, std::vector<MutableColumnPtr>& columns, Block*
block,
int batch_size);
diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp
b/be/src/vec/sink/vjdbc_table_sink.cpp
index ce7bc3f43d..e2eab4d7b0 100644
--- a/be/src/vec/sink/vjdbc_table_sink.cpp
+++ b/be/src/vec/sink/vjdbc_table_sink.cpp
@@ -55,6 +55,7 @@ Status VJdbcTableSink::init(const TDataSink& t_sink) {
_jdbc_param.driver_checksum = t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
_jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name;
_jdbc_param.table_type = t_jdbc_sink.table_type;
+ _jdbc_param.query_string = t_jdbc_sink.insert_sql;
_table_name = t_jdbc_sink.jdbc_table.jdbc_table_name;
_use_transaction = t_jdbc_sink.use_transaction;
@@ -89,7 +90,8 @@ Status VJdbcTableSink::send(RuntimeState* state, Block*
block, bool eos) {
uint32_t num_row_sent = 0;
while (start_send_row < output_block.rows()) {
RETURN_IF_ERROR(_writer->append(_table_name, &output_block,
_output_vexpr_ctxs,
- start_send_row, &num_row_sent,
_jdbc_param.table_type));
+ start_send_row, &num_row_sent, false,
+ _jdbc_param.table_type));
start_send_row += num_row_sent;
num_row_sent = 0;
}
diff --git a/be/src/vec/sink/vodbc_table_sink.cpp
b/be/src/vec/sink/vodbc_table_sink.cpp
index 9bd445538f..b392e0f2c1 100644
--- a/be/src/vec/sink/vodbc_table_sink.cpp
+++ b/be/src/vec/sink/vodbc_table_sink.cpp
@@ -78,7 +78,7 @@ Status VOdbcTableSink::send(RuntimeState* state, Block*
block, bool eos) {
uint32_t num_row_sent = 0;
while (start_send_row < output_block.rows()) {
RETURN_IF_ERROR(_writer->append(_table_name, &output_block,
_output_vexpr_ctxs,
- start_send_row, &num_row_sent));
+ start_send_row, &num_row_sent, true));
start_send_row += num_row_sent;
num_row_sent = 0;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
index 8699b53315..5fa608bd80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java
@@ -98,6 +98,20 @@ public class JdbcTable extends Table {
super(id, name, type, schema);
}
+ public String getInsertSql() {
+ StringBuilder sb = new StringBuilder("INSERT INTO ");
+
sb.append(OdbcTable.databaseProperName(TABLE_TYPE_MAP.get(getTableTypeName()),
getExternalTableName()));
+ sb.append(" VALUES (");
+ for (int i = 0; i < getFullSchema().size(); ++i) {
+ if (i != 0) {
+ sb.append(", ");
+ }
+ sb.append("?");
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
public String getCheckSum() {
return checkSum;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java
index 5135145447..73b981b19a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcTableSink.java
@@ -44,6 +44,7 @@ public class JdbcTableSink extends DataSink {
private final String checkSum;
private final TOdbcTableType jdbcType;
private final boolean useTransaction;
+ private String insertSql;
public JdbcTableSink(JdbcTable jdbcTable) {
resourceName = jdbcTable.getResourceName();
@@ -57,6 +58,7 @@ public class JdbcTableSink extends DataSink {
driverUrl = jdbcTable.getDriverUrl();
checkSum = jdbcTable.getCheckSum();
dorisTableName = jdbcTable.getName();
+ insertSql = jdbcTable.getInsertSql();
}
@Override
@@ -84,6 +86,7 @@ public class JdbcTableSink extends DataSink {
jdbcTableSink.jdbc_table.setJdbcDriverClass(driverClass);
jdbcTableSink.jdbc_table.setJdbcDriverChecksum(checkSum);
jdbcTableSink.jdbc_table.setJdbcResourceName(resourceName);
+ jdbcTableSink.setInsertSql(insertSql);
jdbcTableSink.setUseTransaction(useTransaction);
jdbcTableSink.setTableType(jdbcType);
diff --git
a/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
b/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
index fe7d6ac22b..3a5d4059b2 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
@@ -23,6 +23,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
@@ -75,6 +76,11 @@ public class HudiColumnValue implements ColumnValue {
return (double) inspectObject();
}
+ @Override
+ public BigInteger getBigInteger() {
+ return null;
+ }
+
@Override
public BigDecimal getDecimal() {
return null;
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java
b/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java
index c6097f9d75..afc957ec17 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java
@@ -25,6 +25,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -83,6 +84,11 @@ public class MockJniScanner extends JniScanner {
return (double) (j + i - 15) / (i + 1);
}
+ @Override
+ public BigInteger getBigInteger() {
+ return BigInteger.valueOf(getLong());
+ }
+
@Override
public BigDecimal getDecimal() {
return BigDecimal.valueOf(getDouble());
diff --git
a/fe/java-udf/src/main/java/org/apache/doris/jni/utils/TypeNativeBytes.java
b/fe/java-udf/src/main/java/org/apache/doris/jni/utils/TypeNativeBytes.java
index 01edf9c4ca..666f968329 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/utils/TypeNativeBytes.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/utils/TypeNativeBytes.java
@@ -39,6 +39,29 @@ public class TypeNativeBytes {
return bytes;
}
+ public static byte[] getBigIntegerBytes(BigInteger v) {
+ byte[] bytes = v.toByteArray();
+ // If the BigInteger is not negative and the first byte is 0, remove
the first byte
+ if (v.signum() >= 0 && bytes[0] == 0) {
+ bytes = Arrays.copyOfRange(bytes, 1, bytes.length);
+ }
+ // Convert the byte order if necessary
+ return convertByteOrder(bytes);
+ }
+
+ public static BigInteger getBigInteger(byte[] bytes) {
+ // Convert the byte order back if necessary
+ byte[] originalBytes = convertByteOrder(bytes);
+ // If the first byte has the sign bit set, add a 0 byte at the start
+ if ((originalBytes[0] & 0x80) != 0) {
+ byte[] extendedBytes = new byte[originalBytes.length + 1];
+ extendedBytes[0] = 0;
+ System.arraycopy(originalBytes, 0, extendedBytes, 1,
originalBytes.length);
+ originalBytes = extendedBytes;
+ }
+ return new BigInteger(originalBytes);
+ }
+
public static byte[] getDecimalBytes(BigDecimal v, int scale, int size) {
BigDecimal retValue = v.setScale(scale, RoundingMode.HALF_EVEN);
BigInteger data = retValue.unscaledValue();
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnType.java
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnType.java
index b856c8cab9..5b1b301356 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnType.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnType.java
@@ -38,6 +38,7 @@ public class ColumnType {
SMALLINT(2),
INT(4),
BIGINT(8),
+ LARGEINT(16),
FLOAT(4),
DOUBLE(8),
DATEV2(4),
@@ -250,6 +251,9 @@ public class ColumnType {
case "bigint":
type = Type.BIGINT;
break;
+ case "largeint":
+ type = Type.LARGEINT;
+ break;
case "float":
type = Type.FLOAT;
break;
diff --git
a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java
index 1dfa8d4eb4..da76b9cf33 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java
@@ -18,6 +18,7 @@
package org.apache.doris.jni.vec;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
@@ -43,6 +44,8 @@ public interface ColumnValue {
public double getDouble();
+ public BigInteger getBigInteger();
+
public BigDecimal getDecimal();
public String getString();
diff --git
a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java
index 74f70bc14e..a44de9062a 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java
@@ -24,6 +24,7 @@ import org.apache.doris.jni.vec.ColumnType.Type;
import org.apache.commons.lang3.StringUtils;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
@@ -94,6 +95,8 @@ public class ScanPredicate {
return byteBuffer.getInt();
case BIGINT:
return byteBuffer.getLong();
+ case LARGEINT:
+ return
TypeNativeBytes.getBigInteger(Arrays.copyOf(valueBytes, valueBytes.length));
case FLOAT:
return byteBuffer.getFloat();
case DOUBLE:
@@ -154,6 +157,11 @@ public class ScanPredicate {
return (double) inspectObject();
}
+ @Override
+ public BigInteger getBigInteger() {
+ return (BigInteger) inspectObject();
+ }
+
@Override
public BigDecimal getDecimal() {
return (BigDecimal) inspectObject();
diff --git
a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
index 70b11dc34a..8ce684ea95 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
@@ -22,6 +22,7 @@ import org.apache.doris.jni.utils.TypeNativeBytes;
import org.apache.doris.jni.vec.ColumnType.Type;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -269,6 +270,8 @@ public class VectorColumn {
return appendInt(0);
case BIGINT:
return appendLong(0);
+ case LARGEINT:
+ return appendBigInteger(BigInteger.ZERO);
case FLOAT:
return appendFloat(0);
case DOUBLE:
@@ -395,6 +398,29 @@ public class VectorColumn {
return OffHeap.getDouble(null, data + rowId * 8L);
}
+ public int appendBigInteger(BigInteger v) {
+ reserve(appendIndex + 1);
+ putBigInteger(appendIndex, v);
+ return appendIndex++;
+ }
+
+ private void putBigInteger(int rowId, BigInteger v) {
+ int typeSize = columnType.getTypeSize();
+ byte[] bytes = TypeNativeBytes.getBigIntegerBytes(v);
+ OffHeap.copyMemory(bytes, OffHeap.BYTE_ARRAY_OFFSET, null, data +
(long) rowId * typeSize, typeSize);
+ }
+
+ public byte[] getBigIntegerBytes(int rowId) {
+ int typeSize = columnType.getTypeSize();
+ byte[] bytes = new byte[typeSize];
+ OffHeap.copyMemory(null, data + (long) rowId * typeSize, bytes,
OffHeap.BYTE_ARRAY_OFFSET, typeSize);
+ return bytes;
+ }
+
+ public BigInteger getBigInteger(int rowId) {
+ return TypeNativeBytes.getBigInteger(getBigIntegerBytes(rowId));
+ }
+
public int appendDecimal(BigDecimal v) {
reserve(appendIndex + 1);
putDecimal(appendIndex, v);
@@ -546,6 +572,9 @@ public class VectorColumn {
case BIGINT:
appendLong(o.getLong());
break;
+ case LARGEINT:
+ appendBigInteger(o.getBigInteger());
+ break;
case FLOAT:
appendFloat(o.getFloat());
break;
@@ -601,6 +630,9 @@ public class VectorColumn {
case BIGINT:
sb.append(getLong(i));
break;
+ case LARGEINT:
+ sb.append(getBigInteger(i));
+ break;
case FLOAT:
sb.append(getFloat(i));
break;
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index ec47ff531f..ebeb68085b 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -18,6 +18,7 @@
package org.apache.doris.udf;
import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.VectorColumn;
import org.apache.doris.jni.vec.VectorTable;
import org.apache.doris.thrift.TJdbcExecutorCtorParams;
import org.apache.doris.thrift.TJdbcOperation;
@@ -52,6 +53,8 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
@@ -66,6 +69,7 @@ public class JdbcExecutor {
private static final Logger LOG = Logger.getLogger(JdbcExecutor.class);
private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new
TBinaryProtocol.Factory();
private Connection conn = null;
+ private PreparedStatement preparedStatement = null;
private Statement stmt = null;
private ResultSet resultSet = null;
private ResultSetMetaData resultSetMetaData = null;
@@ -154,12 +158,11 @@ public class JdbcExecutor {
}
}
- public int write(Map<String, String> params) {
+ public int write(Map<String, String> params) throws UdfRuntimeException {
String[] requiredFields = params.get("required_fields").split(",");
String[] types = params.get("columns_types").split("#");
long metaAddress = Long.parseLong(params.get("meta_address"));
// Get sql string from configuration map
- // String sql = params.get("write_sql");
ColumnType[] columnTypes = new ColumnType[types.length];
for (int i = 0; i < types.length; i++) {
columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
@@ -167,9 +170,130 @@ public class JdbcExecutor {
VectorTable batchTable = new VectorTable(columnTypes, requiredFields,
metaAddress);
// todo: insert the batch table by PreparedStatement
// Can't release or close batchTable, it's released by c++
+ try {
+ insert(batchTable);
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("JDBC executor sql has error: ", e);
+ }
return batchTable.getNumRows();
}
+ private int insert(VectorTable data) throws SQLException {
+ for (int i = 0; i < data.getNumRows(); ++i) {
+ for (int j = 0; j < data.getColumns().length; ++j) {
+ insertColumn(i, j, data.getColumns()[j]);
+ }
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ preparedStatement.clearBatch();
+ return data.getNumRows();
+ }
+
+ private void insertColumn(int rowIdx, int colIdx, VectorColumn column)
throws SQLException {
+ int parameterIndex = colIdx + 1;
+ ColumnType.Type dorisType = column.getColumnTyp();
+ if (column.isNullAt(rowIdx)) {
+ insertNullColumn(parameterIndex, dorisType);
+ return;
+ }
+ switch (dorisType) {
+ case BOOLEAN:
+ preparedStatement.setBoolean(parameterIndex,
column.getBoolean(rowIdx));
+ break;
+ case TINYINT:
+ preparedStatement.setByte(parameterIndex,
column.getByte(rowIdx));
+ break;
+ case SMALLINT:
+ preparedStatement.setShort(parameterIndex,
column.getShort(rowIdx));
+ break;
+ case INT:
+ preparedStatement.setInt(parameterIndex,
column.getInt(rowIdx));
+ break;
+ case BIGINT:
+ preparedStatement.setLong(parameterIndex,
column.getLong(rowIdx));
+ break;
+ case LARGEINT:
+ preparedStatement.setObject(parameterIndex,
column.getBigInteger(rowIdx));
+ break;
+ case FLOAT:
+ preparedStatement.setFloat(parameterIndex,
column.getFloat(rowIdx));
+ break;
+ case DOUBLE:
+ preparedStatement.setDouble(parameterIndex,
column.getDouble(rowIdx));
+ break;
+ case DECIMALV2:
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ preparedStatement.setBigDecimal(parameterIndex,
column.getDecimal(rowIdx));
+ break;
+ case DATEV2:
+ preparedStatement.setDate(parameterIndex,
Date.valueOf(column.getDate(rowIdx)));
+ break;
+ case DATETIMEV2:
+ preparedStatement.setTimestamp(parameterIndex,
Timestamp.valueOf(column.getDateTime(rowIdx)));
+ break;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ case BINARY:
+ preparedStatement.setString(parameterIndex,
column.getStringWithOffset(rowIdx));
+ break;
+ default:
+ throw new RuntimeException("Unknown type value: " + dorisType);
+ }
+ }
+
+ private void insertNullColumn(int parameterIndex, ColumnType.Type
dorisType) throws SQLException {
+ switch (dorisType) {
+ case BOOLEAN:
+ preparedStatement.setNull(parameterIndex, Types.BOOLEAN);
+ break;
+ case TINYINT:
+ preparedStatement.setNull(parameterIndex, Types.TINYINT);
+ break;
+ case SMALLINT:
+ preparedStatement.setNull(parameterIndex, Types.SMALLINT);
+ break;
+ case INT:
+ preparedStatement.setNull(parameterIndex, Types.INTEGER);
+ break;
+ case BIGINT:
+ preparedStatement.setNull(parameterIndex, Types.BIGINT);
+ break;
+ case LARGEINT:
+ preparedStatement.setNull(parameterIndex, Types.JAVA_OBJECT);
+ break;
+ case FLOAT:
+ preparedStatement.setNull(parameterIndex, Types.FLOAT);
+ break;
+ case DOUBLE:
+ preparedStatement.setNull(parameterIndex, Types.DOUBLE);
+ break;
+ case DECIMALV2:
+ case DECIMAL32:
+ case DECIMAL64:
+ case DECIMAL128:
+ preparedStatement.setNull(parameterIndex, Types.DECIMAL);
+ break;
+ case DATEV2:
+ preparedStatement.setNull(parameterIndex, Types.DATE);
+ break;
+ case DATETIMEV2:
+ preparedStatement.setNull(parameterIndex, Types.TIMESTAMP);
+ break;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ case BINARY:
+ preparedStatement.setNull(parameterIndex, Types.VARCHAR);
+ break;
+ default:
+ throw new RuntimeException("Unknown type value: " + dorisType);
+ }
+ }
+
public List<String> getResultColumnTypeNames() {
return resultColumnTypeNames;
}
@@ -312,7 +436,8 @@ public class JdbcExecutor {
}
batchSizeNum = batchSize;
} else {
- stmt = conn.createStatement();
+ LOG.info("insert sql: " + sql);
+ preparedStatement = conn.prepareStatement(sql);
}
}
} catch (MalformedURLException e) {
diff --git a/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java
b/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java
index 32f4cccd79..6be67ecba3 100644
--- a/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java
+++ b/fe/java-udf/src/test/java/org/apache/doris/jni/JniScannerTest.java
@@ -33,9 +33,9 @@ public class JniScannerTest {
MockJniScanner scanner = new MockJniScanner(32, new HashMap<String,
String>() {
{
put("mock_rows", "128");
- put("required_fields",
"boolean,tinyint,smallint,int,bigint,float,double,"
+ put("required_fields",
"boolean,tinyint,smallint,int,bigint,largeint,float,double,"
+
"date,timestamp,char,varchar,string,decimalv2,decimal64");
- put("columns_types",
"boolean#tinyint#smallint#int#bigint#float#double#"
+ put("columns_types",
"boolean#tinyint#smallint#int#bigint#largeint#float#double#"
+
"date#timestamp#char(10)#varchar(10)#string#decimalv2(12,4)#decimal64(10,3)");
}
});
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 23e21b6715..af5d4d26a3 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -205,6 +205,7 @@ struct TJdbcTableSink {
1: optional Descriptors.TJdbcTable jdbc_table
2: optional bool use_transaction
3: optional Types.TOdbcTableType table_type
+ 4: optional string insert_sql
}
struct TExportSink {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]