This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch 1.1.2-pick in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
commit 412caa1203657711aedf9529b18b3ae652c56b7f Author: wudi <[email protected]> AuthorDate: Mon Feb 9 22:30:08 2026 +0800 change thrift to 0.16 and add decimal type --- flink-doris-connector/pom.xml | 55 +-- .../apache/doris/flink/backend/BackendClient.java | 43 ++- .../flink/exception/DorisInternalException.java | 2 +- .../org/apache/doris/flink/rest/SchemaUtils.java | 2 +- .../apache/doris/flink/serialization/RowBatch.java | 13 +- .../doris/flink/datastream/ScalaValueReader.scala | 2 +- .../main/thrift/doris/DorisExternalService.thrift | 122 ------- .../src/main/thrift/doris/Status.thrift | 66 ---- .../src/main/thrift/doris/Types.thrift | 376 --------------------- .../doris/flink/serialization/TestRowBatch.java | 6 +- 10 files changed, 69 insertions(+), 618 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 4e98842d..0847d4a0 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -26,8 +26,8 @@ under the License. <version>23</version> </parent> <groupId>org.apache.doris</groupId> - <artifactId>flink-doris-connector-${flink.minor.version}_${scala.version}</artifactId> - <version>1.0.0-SNAPSHOT</version> + <artifactId>flink-doris-connector-1.14_2.12</artifactId> + <version>1.1.2</version> <name>Flink Doris Connector</name> <url>https://doris.apache.org/</url> <licenses> @@ -62,17 +62,17 @@ under the License. </mailingList> </mailingLists> <properties> - <scala.version>${env.scala.version}</scala.version> - <flink.version>${env.flink.version}</flink.version> - <flink.minor.version>${env.flink.minor.version}</flink.minor.version> - <libthrift.version>0.13.0</libthrift.version> + <scala.version>2.12</scala.version> + <flink.version>1.14.6</flink.version> + <flink.minor.version>1.14</flink.minor.version> + <libthrift.version>0.16.0</libthrift.version> <arrow.version>5.0.0</arrow.version> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> <maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version> <maven-source-plugin.version>3.2.1</maven-source-plugin.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <log4j2.version>2.17.2</log4j2.version> - <thrift.binary>${env.THRIFT_BIN}</thrift.binary> +<!-- <thrift.binary>/opt/homebrew/Cellar/[email protected]/0.13.0/bin/thrift</thrift.binary>--> <project.scm.id>github</project.scm.id> </properties> <profiles> @@ -145,6 +145,11 @@ under the License. </profiles> <dependencies> + <dependency> + <groupId>org.apache.doris</groupId> + <artifactId>thrift-service</artifactId> + <version>1.0.1</version> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> @@ -319,24 +324,24 @@ under the License. </execution> </executions> </plugin> - <plugin> - <groupId>org.apache.thrift.tools</groupId> - <artifactId>maven-thrift-plugin</artifactId> - <version>0.1.11</version> - <configuration> - <thriftExecutable>${thrift.binary}</thriftExecutable> - <generator>java:fullcamel</generator> - </configuration> - <executions> - <execution> - <id>thrift-sources</id> - <phase>generate-sources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - </plugin> +<!-- <plugin>--> +<!-- <groupId>org.apache.thrift.tools</groupId>--> +<!-- <artifactId>maven-thrift-plugin</artifactId>--> +<!-- <version>0.1.11</version>--> +<!-- <configuration>--> +<!-- <thriftExecutable>${thrift.binary}</thriftExecutable>--> +<!-- <generator>java:fullcamel</generator>--> +<!-- </configuration>--> +<!-- <executions>--> +<!-- <execution>--> +<!-- <id>thrift-sources</id>--> +<!-- <phase>generate-sources</phase>--> +<!-- <goals>--> +<!-- <goal>compile</goal>--> +<!-- </goals>--> +<!-- </execution>--> +<!-- </executions>--> +<!-- </plugin>--> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java index 9b8d955d..271f766b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java @@ -24,14 +24,15 @@ import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.DorisInternalException; import org.apache.doris.flink.serialization.Routing; import org.apache.doris.flink.util.ErrorMessages; -import org.apache.doris.thrift.TDorisExternalService; -import org.apache.doris.thrift.TScanBatchResult; -import org.apache.doris.thrift.TScanCloseParams; -import org.apache.doris.thrift.TScanCloseResult; -import org.apache.doris.thrift.TScanNextBatchParams; -import org.apache.doris.thrift.TScanOpenParams; -import org.apache.doris.thrift.TScanOpenResult; -import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.sdk.thrift.TDorisExternalService; +import org.apache.doris.sdk.thrift.TScanBatchResult; +import org.apache.doris.sdk.thrift.TScanCloseParams; +import org.apache.doris.sdk.thrift.TScanCloseResult; +import org.apache.doris.sdk.thrift.TScanNextBatchParams; +import org.apache.doris.sdk.thrift.TScanOpenParams; +import org.apache.doris.sdk.thrift.TScanOpenResult; +import org.apache.doris.sdk.thrift.TStatusCode; +import org.apache.thrift.TConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; @@ -72,25 +73,31 @@ public class BackendClient { TException ex = null; for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { logger.debug("Attempt {} to connect {}.", attempt, routing); - TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); - transport = new TSocket(routing.getHost(), routing.getPort(), socketTimeout, connectTimeout); - TProtocol protocol = factory.getProtocol(transport); - client = new TDorisExternalService.Client(protocol); - if (isConnected) { - logger.info("Success connect to {}.", routing); - return; - } try { - logger.trace("Connect status before open transport to {} is '{}'.", routing, isConnected); + TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); + transport = + new TSocket( + new TConfiguration(), + routing.getHost(), + routing.getPort(), + socketTimeout, + connectTimeout); + TProtocol protocol = factory.getProtocol(transport); + client = new TDorisExternalService.Client(protocol); + logger.trace( + "Connect status before open transport to {} is '{}'.", + routing, + isConnected); if (!transport.isOpen()) { transport.open(); isConnected = true; + logger.info("Success connect to {}.", routing); + break; } } catch (TTransportException e) { logger.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, routing, e); ex = e; } - } if (!isConnected) { logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java index eadd860d..646a069e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java @@ -17,7 +17,7 @@ package org.apache.doris.flink.exception; -import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.sdk.thrift.TStatusCode; import java.util.List; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java index 5c645562..c8372645 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java @@ -18,7 +18,7 @@ package org.apache.doris.flink.rest; import org.apache.doris.flink.rest.models.Field; import org.apache.doris.flink.rest.models.Schema; -import org.apache.doris.thrift.TScanColumnDesc; +import org.apache.doris.sdk.thrift.TScanColumnDesc; import java.util.List; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index 4dd6732e..a3c4d673 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -17,6 +17,9 @@ package org.apache.doris.flink.serialization; +import org.apache.doris.flink.exception.DorisException; +import org.apache.doris.flink.rest.models.Schema; +import org.apache.doris.sdk.thrift.TScanBatchResult; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; @@ -32,13 +35,9 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; -import org.apache.doris.flink.exception.DorisException; -import org.apache.doris.flink.rest.models.Schema; -import org.apache.doris.thrift.TScanBatchResult; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.math.BigDecimal; @@ -104,7 +103,7 @@ public class RowBatch { this.root = arrowStreamReader.getVectorSchemaRoot(); while (arrowStreamReader.loadNextBatch()) { fieldVectors = root.getFieldVectors(); - if (fieldVectors.size() != schema.size()) { + if (fieldVectors.size() > schema.size()) { logger.error("Schema size '{}' is not equal to arrow field size '{}'.", fieldVectors.size(), schema.size()); throw new DorisException("Load Doris data failed, schema size of fetch data is wrong."); @@ -236,6 +235,10 @@ public class RowBatch { break; case "DECIMAL": case "DECIMALV2": + case "DECIMAL32": + case "DECIMAL64": + case "DECIMAL128I": + case "DECIMAL128": Preconditions.checkArgument(mt.equals(Types.MinorType.DECIMAL), typeMismatchMessage(currentType, mt)); DecimalVector decimalVector = (DecimalVector) curFieldVector; diff --git a/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala b/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala index 06df2ef4..9738aaf3 100644 --- a/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala +++ b/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala @@ -28,7 +28,7 @@ import org.apache.doris.flink.rest.models.Schema import org.apache.doris.flink.serialization.{Routing, RowBatch} import org.apache.doris.flink.util.ErrorMessages import org.apache.doris.flink.util.ErrorMessages._ -import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult} +import org.apache.doris.sdk.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult} import org.apache.log4j.Logger import scala.collection.JavaConversions._ diff --git a/flink-doris-connector/src/main/thrift/doris/DorisExternalService.thrift b/flink-doris-connector/src/main/thrift/doris/DorisExternalService.thrift deleted file mode 100644 index c1698748..00000000 --- a/flink-doris-connector/src/main/thrift/doris/DorisExternalService.thrift +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -namespace java org.apache.doris.thrift -namespace cpp doris - -include "Types.thrift" -include "Status.thrift" - - -// Parameters to open(). -struct TScanOpenParams { - - 1: required string cluster - - 2: required string database - - 3: required string table - - // tablets to scan - 4: required list<i64> tablet_ids - - // base64 encoded binary plan fragment - 5: required string opaqued_query_plan - - // A string specified for the table that is passed to the external data source. - // Always set, may be an empty string. - 6: optional i32 batch_size - - // reserved params for use - 7: optional map<string,string> properties - - // The query limit, if specified. - 8: optional i64 limit - - // The authenticated user name. Always set. - // maybe usefullless - 9: optional string user - - 10: optional string passwd - // max keep alive time min - 11: optional i16 keep_alive_min - - 12: optional i32 query_timeout - - // memory limit for a single query - 13: optional i64 mem_limit -} - -struct TScanColumnDesc { - // The column name - 1: optional string name - // The column type. Always set. - 2: optional Types.TPrimitiveType type -} - -// Returned by open(). -struct TScanOpenResult { - 1: required Status.TStatus status - // An opaque context_id used in subsequent getNext()/close() calls. Required. - 2: optional string context_id - // selected fields - 3: optional list<TScanColumnDesc> selected_columns - -} - -// Parameters to getNext() -struct TScanNextBatchParams { - // The opaque handle returned by the previous open() call. Always set. - 1: optional string context_id // doris olap engine context id - 2: optional i64 offset // doris should check the offset to prevent duplicate rpc calls -} - -// Returned by getNext(). -struct TScanBatchResult { - 1: required Status.TStatus status - - // If true, reached the end of the result stream; subsequent calls to - // getNext() won’t return any more results. Required. - 2: optional bool eos - - // A batch of rows of arrow format to return, if any exist. The number of rows in the batch - // should be less than or equal to the batch_size specified in TOpenParams. - 3: optional binary rows -} - -// Parameters to close() -struct TScanCloseParams { - // The opaque handle returned by the previous open() call. Always set. - 1: optional string context_id -} - -// Returned by close(). -struct TScanCloseResult { - 1: required Status.TStatus status -} - -// scan service expose ability of scanning data ability to other compute system -service TDorisExternalService { - // doris will build a scan context for this session, context_id returned if success - TScanOpenResult open_scanner(1: TScanOpenParams params); - - // return the batch_size of data - TScanBatchResult get_next(1: TScanNextBatchParams params); - - // release the context resource associated with the context_id - TScanCloseResult close_scanner(1: TScanCloseParams params); -} diff --git a/flink-doris-connector/src/main/thrift/doris/Status.thrift b/flink-doris-connector/src/main/thrift/doris/Status.thrift deleted file mode 100644 index 2966a8a5..00000000 --- a/flink-doris-connector/src/main/thrift/doris/Status.thrift +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -namespace cpp doris -namespace java org.apache.doris.thrift - -enum TStatusCode { - OK, - CANCELLED, - ANALYSIS_ERROR, - NOT_IMPLEMENTED_ERROR, - RUNTIME_ERROR, - MEM_LIMIT_EXCEEDED, - INTERNAL_ERROR, - THRIFT_RPC_ERROR, - TIMEOUT, - KUDU_NOT_ENABLED, // Deprecated - KUDU_NOT_SUPPORTED_ON_OS, // Deprecated - MEM_ALLOC_FAILED, - BUFFER_ALLOCATION_FAILED, - MINIMUM_RESERVATION_UNAVAILABLE, - PUBLISH_TIMEOUT, - LABEL_ALREADY_EXISTS, - ES_INTERNAL_ERROR, - ES_INDEX_NOT_FOUND, - ES_SHARD_NOT_FOUND, - ES_INVALID_CONTEXTID, - ES_INVALID_OFFSET, - ES_REQUEST_ERROR, - - // end of file - END_OF_FILE = 30, - NOT_FOUND = 31, - CORRUPTION = 32, - INVALID_ARGUMENT = 33, - IO_ERROR = 34, - ALREADY_EXIST = 35, - NETWORK_ERROR = 36, - ILLEGAL_STATE = 37, - NOT_AUTHORIZED = 38, - ABORTED = 39, - REMOTE_ERROR = 40, - SERVICE_UNAVAILABLE = 41, - UNINITIALIZED = 42, - CONFIGURATION_ERROR = 43, - INCOMPLETE = 44 -} - -struct TStatus { - 1: required TStatusCode status_code - 2: optional list<string> error_msgs -} diff --git a/flink-doris-connector/src/main/thrift/doris/Types.thrift b/flink-doris-connector/src/main/thrift/doris/Types.thrift deleted file mode 100644 index 44ce6062..00000000 --- a/flink-doris-connector/src/main/thrift/doris/Types.thrift +++ /dev/null @@ -1,376 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -namespace cpp doris -namespace java org.apache.doris.thrift - - -typedef i64 TTimestamp -typedef i32 TPlanNodeId -typedef i32 TTupleId -typedef i32 TSlotId -typedef i64 TTableId -typedef i64 TTabletId -typedef i64 TVersion -typedef i64 TVersionHash -typedef i32 TSchemaHash -typedef i32 TPort -typedef i64 TCount -typedef i64 TSize -typedef i32 TClusterId -typedef i64 TEpoch - -// add for real time load, partitionid is not defined previously, define it here -typedef i64 TTransactionId -typedef i64 TPartitionId - -enum TStorageType { - ROW, - COLUMN, -} - -enum TStorageMedium { - HDD, - SSD, -} - -enum TVarType { - SESSION, - GLOBAL -} - -enum TPrimitiveType { - INVALID_TYPE, - NULL_TYPE, - BOOLEAN, - TINYINT, - SMALLINT, - INT, - BIGINT, - FLOAT, - DOUBLE, - DATE, - DATETIME, - BINARY, - DECIMAL, - // CHAR(n). Currently only supported in UDAs - CHAR, - LARGEINT, - VARCHAR, - HLL, - DECIMALV2, - TIME, - OBJECT, - ARRAY, - MAP, - STRUCT, - STRING, - ALL -} - -enum TTypeNodeType { - SCALAR, - ARRAY, - MAP, - STRUCT -} - -struct TScalarType { - 1: required TPrimitiveType type - - // Only set if type == CHAR or type == VARCHAR - 2: optional i32 len - - // Only set for DECIMAL - 3: optional i32 precision - 4: optional i32 scale -} - -// Represents a field in a STRUCT type. -// TODO: Model column stats for struct fields. -struct TStructField { - 1: required string name - 2: optional string comment -} - -struct TTypeNode { - 1: required TTypeNodeType type - - // only set for scalar types - 2: optional TScalarType scalar_type - - // only used for structs; has struct_fields.size() corresponding child types - 3: optional list<TStructField> struct_fields -} - -// A flattened representation of a tree of column types obtained by depth-first -// traversal. Complex types such as map, array and struct have child types corresponding -// to the map key/value, array item type, and struct fields, respectively. -// For scalar types the list contains only a single node. -// Note: We cannot rename this to TType because it conflicts with Thrift's internal TType -// and the generated Python thrift files will not work. -// Note: TTypeDesc in impala is TColumnType, but we already use TColumnType, so we name this -// to TTypeDesc. In future, we merge these two to one -struct TTypeDesc { - 1: list<TTypeNode> types -} - -enum TAggregationType { - SUM, - MAX, - MIN, - REPLACE, - HLL_UNION, - NONE -} - -enum TPushType { - LOAD, - DELETE, - LOAD_DELETE -} - -enum TTaskType { - CREATE, - DROP, - PUSH, - CLONE, - STORAGE_MEDIUM_MIGRATE, - ROLLUP, - SCHEMA_CHANGE, - CANCEL_DELETE, // Deprecated - MAKE_SNAPSHOT, - RELEASE_SNAPSHOT, - CHECK_CONSISTENCY, - UPLOAD, - DOWNLOAD, - CLEAR_REMOTE_FILE, - MOVE - REALTIME_PUSH, - PUBLISH_VERSION, - CLEAR_ALTER_TASK, - CLEAR_TRANSACTION_TASK, - RECOVER_TABLET, - STREAM_LOAD, - UPDATE_TABLET_META_INFO, - ALTER_TASK -} - -enum TStmtType { - QUERY, - DDL, // Data definition, e.g. CREATE TABLE (includes read-only functions e.g. SHOW) - DML, // Data modification e.g. INSERT - EXPLAIN // EXPLAIN -} - -// level of verboseness for "explain" output -// TODO: should this go somewhere else? -enum TExplainLevel { - NORMAL, - VERBOSE -} - -struct TColumnType { - 1: required TPrimitiveType type - // Only set if type == CHAR_ARRAY - 2: optional i32 len - 3: optional i32 index_len - 4: optional i32 precision - 5: optional i32 scale -} - -// A TNetworkAddress is the standard host, port representation of a -// network address. The hostname field must be resolvable to an IPv4 -// address. -struct TNetworkAddress { - 1: required string hostname - 2: required i32 port -} - -// Wire format for UniqueId -struct TUniqueId { - 1: required i64 hi - 2: required i64 lo -} - -enum QueryState { - CREATED, - INITIALIZED, - COMPILED, - RUNNING, - FINISHED, - EXCEPTION -} - -enum TFunctionType { - SCALAR, - AGGREGATE, -} - -enum TFunctionBinaryType { - // Palo builtin. We can either run this interpreted or via codegen - // depending on the query option. - BUILTIN, - - // Hive UDFs, loaded from *.jar - HIVE, - - // Native-interface, precompiled UDFs loaded from *.so - NATIVE, - - // Native-interface, precompiled to IR; loaded from *.ll - IR, -} - -// Represents a fully qualified function name. -struct TFunctionName { - // Name of the function's parent database. Not set if in global - // namespace (e.g. builtins) - 1: optional string db_name - - // Name of the function - 2: required string function_name -} - -struct TScalarFunction { - // Symbol for the function - 1: required string symbol - 2: optional string prepare_fn_symbol - 3: optional string close_fn_symbol -} - -struct TAggregateFunction { - 1: required TTypeDesc intermediate_type - 2: optional string update_fn_symbol - 3: optional string init_fn_symbol - 4: optional string serialize_fn_symbol - 5: optional string merge_fn_symbol - 6: optional string finalize_fn_symbol - 8: optional string get_value_fn_symbol - 9: optional string remove_fn_symbol - 10: optional bool is_analytic_only_fn = false -} - -// Represents a function in the Catalog. -struct TFunction { - // Fully qualified function name. - 1: required TFunctionName name - - // Type of the udf. e.g. hive, native, ir - 2: required TFunctionBinaryType binary_type - - // The types of the arguments to the function - 3: required list<TTypeDesc> arg_types - - // Return type for the function. - 4: required TTypeDesc ret_type - - // If true, this function takes var args. - 5: required bool has_var_args - - // Optional comment to attach to the function - 6: optional string comment - - 7: optional string signature - - // HDFS path for the function binary. This binary must exist at the time the - // function is created. - 8: optional string hdfs_location - - // One of these should be set. - 9: optional TScalarFunction scalar_fn - 10: optional TAggregateFunction aggregate_fn - - 11: optional i64 id - 12: optional string checksum -} - -enum TLoadJobState { - PENDING, - ETL, - LOADING, - FINISHED, - CANCELLED -} - -enum TEtlState { - RUNNING, - FINISHED, - CANCELLED, - UNKNOWN -} - -enum TTableType { - MYSQL_TABLE, - OLAP_TABLE, - SCHEMA_TABLE, - KUDU_TABLE, // Deprecated - BROKER_TABLE, - ES_TABLE -} - -enum TKeysType { - PRIMARY_KEYS, - DUP_KEYS, - UNIQUE_KEYS, - AGG_KEYS -} - -enum TPriority { - NORMAL, - HIGH -} - -struct TBackend { - 1: required string host - 2: required TPort be_port - 3: required TPort http_port -} - -struct TResourceInfo { - 1: required string user - 2: required string group -} - -enum TExportState { - RUNNING, - FINISHED, - CANCELLED, - UNKNOWN -} - -enum TFileType { - FILE_LOCAL, - FILE_BROKER, - FILE_STREAM, // file content is streaming in the buffer -} - -struct TTabletCommitInfo { - 1: required i64 tabletId - 2: required i64 backendId -} - -enum TLoadType { - MANUL_LOAD, - ROUTINE_LOAD, - MINI_LOAD -} - -enum TLoadSourceType { - RAW, - KAFKA, -} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index 261acbe9..1ecb5a37 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -38,9 +38,9 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.Schema; -import org.apache.doris.thrift.TScanBatchResult; -import org.apache.doris.thrift.TStatus; -import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.sdk.thrift.TScanBatchResult; +import org.apache.doris.sdk.thrift.TStatus; +import org.apache.doris.sdk.thrift.TStatusCode; import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList; import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; import org.apache.flink.table.data.DecimalData; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
