This is an automated email from the ASF dual-hosted git repository.

hubgeter pushed a commit to branch mc-test-branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit db0da3215985ef6878b394ed724cacfabe1c145d
Author: daidai <[email protected]>
AuthorDate: Wed Apr 29 12:13:10 2026 +0800

    [improvement](maxcompute) Simplify FE block ID requests for MaxCompute 
writes (#62925)
    
    Related PR: #62578
    
    1. PR #62578 moved MaxCompute write block ID allocation from BE-local
    counters to
      Instead of calling FE through the BE JNI C++ bridge:
    
      MaxCompute connector Java -> BE JNI C++ -> FE
      the MaxCompute connector now requests FE directly through thrift:
      MaxCompute connector Java -> FE
    
    A new MaxComputeFeClient is added under the MaxCompute connector to
    handle FE
      methods.
    
      2. Removes the hardcoded `MAX_BLOCK_COUNT` variable from
    
    
`fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java`
      and moves it to the FE config `max_compute_write_max_block_count`
    The default value is still 20000, so the existing behavior is preserved.
---
 .../sink/writer/maxcompute/vmc_table_writer.cpp    |   6 +
 be/src/util/jni-util.cpp                           |   4 -
 be/src/util/jni_native_method.cpp                  | 129 --------
 be/src/util/jni_native_method.h                    |   4 -
 .../doris/common/jni/utils/JNINativeMethod.java    |   4 -
 .../max-compute-connector/pom.xml                  |   6 +
 .../doris/maxcompute/MaxComputeFeClient.java       | 326 +++++++++++++++++++++
 .../doris/maxcompute/MaxComputeJniWriter.java      |  11 +-
 .../doris/maxcompute/MaxComputeFeClientTest.java   | 177 +++++++++++
 .../main/java/org/apache/doris/common/Config.java  |   7 +
 .../doris/datasource/maxcompute/MCTransaction.java |   7 +-
 11 files changed, 533 insertions(+), 148 deletions(-)

diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp 
b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
index a53bf5edada..f4a984dac05 100644
--- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
+++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp
@@ -22,6 +22,7 @@
 #include "exprs/vexpr.h"
 #include "exprs/vexpr_context.h"
 #include "format/transformer/vjni_format_transformer.h"
+#include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
 #include "util/uid_util.h"
 
@@ -99,6 +100,7 @@ Status VMCTableWriter::open(RuntimeState* state, 
RuntimeProfile* profile) {
 
 std::map<std::string, std::string> VMCTableWriter::_build_base_writer_params() 
{
     auto params = _mc_sink.properties;
+    const auto& master_fe_addr = 
_state->exec_env()->cluster_info()->master_fe_addr;
     if (_mc_sink.__isset.endpoint) params["endpoint"] = _mc_sink.endpoint;
     if (_mc_sink.__isset.project) params["project"] = _mc_sink.project;
     if (_mc_sink.__isset.table_name) params["table"] = _mc_sink.table_name;
@@ -118,6 +120,10 @@ std::map<std::string, std::string> 
VMCTableWriter::_build_base_writer_params() {
     if (_mc_sink.__isset.retry_count) {
         params["retry_count"] = std::to_string(_mc_sink.retry_count);
     }
+    params["fe_host"] = master_fe_addr.hostname;
+    params["fe_port"] = std::to_string(master_fe_addr.port);
+    params["fe_rpc_timeout_ms"] = 
std::to_string(config::thrift_rpc_timeout_ms);
+    params["fe_thrift_server_type"] = config::thrift_server_type_of_fe;
     return params;
 }
 
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 6aad1010903..9f5f3999c17 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -406,8 +406,6 @@ Status Util::_init_register_natives() {
     static char memory_alloc_batch_sign[] = "([I)[J";
     static char memory_free_batch_name[] = "memoryTrackerFreeBatch";
     static char memory_free_batch_sign[] = "([J)V";
-    static char request_mc_block_id_name[] = "requestMaxComputeBlockId";
-    static char request_mc_block_id_sign[] = "(JLjava/lang/String;)J";
     static JNINativeMethod java_native_methods[] = {
             {memory_alloc_name, memory_alloc_sign, 
(void*)&JavaNativeMethods::memoryMalloc},
             {memory_free_name, memory_free_sign, 
(void*)&JavaNativeMethods::memoryFree},
@@ -415,8 +413,6 @@ Status Util::_init_register_natives() {
              (void*)&JavaNativeMethods::memoryMallocBatch},
             {memory_free_batch_name, memory_free_batch_sign,
              (void*)&JavaNativeMethods::memoryFreeBatch},
-            {request_mc_block_id_name, request_mc_block_id_sign,
-             (void*)&JavaNativeMethods::requestMaxComputeBlockId},
     };
 
     int res = env->RegisterNatives(local_jni_native_exc_cl, 
java_native_methods,
diff --git a/be/src/util/jni_native_method.cpp 
b/be/src/util/jni_native_method.cpp
index 6942095b376..549405c766d 100644
--- a/be/src/util/jni_native_method.cpp
+++ b/be/src/util/jni_native_method.cpp
@@ -17,119 +17,14 @@
 
 #include "util/jni_native_method.h"
 
-#include <gen_cpp/FrontendService.h>
-#include <glog/logging.h>
-
-#include <chrono>
 #include <cstdlib>
-#include <thread>
 #include <vector>
 
-#include "common/status.h"
 #include "jni.h"
-#include "runtime/exec_env.h"
-#include "util/client_cache.h"
 #include "util/defer_op.h"
-#include "util/thrift_rpc_helper.h"
 
 namespace doris {
 
-namespace {
-
-void throw_java_runtime_exception(JNIEnv* env, const std::string& message) {
-    jclass exception_cl = env->FindClass("java/lang/IllegalStateException");
-    if (exception_cl != nullptr) {
-        env->ThrowNew(exception_cl, message.c_str());
-        env->DeleteLocalRef(exception_cl);
-    }
-}
-
-Result<int64_t> request_maxcompute_block_id_from_fe(int64_t txn_id,
-                                                    const std::string& 
write_session_id) {
-    if (txn_id <= 0) {
-        return ResultError(Status::InvalidArgument(
-                "invalid MaxCompute txn_id for block_id allocation: {}", 
txn_id));
-    }
-    if (write_session_id.empty()) {
-        return ResultError(Status::InvalidArgument(
-                "empty MaxCompute write_session_id for block_id allocation"));
-    }
-
-    constexpr uint32_t FETCH_BLOCK_ID_MAX_RETRY_TIMES = 3;
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
-    for (uint32_t retry_times = 0; retry_times < 
FETCH_BLOCK_ID_MAX_RETRY_TIMES; retry_times++) {
-        TMaxComputeBlockIdRequest request;
-        TMaxComputeBlockIdResult result;
-        request.__set_txn_id(txn_id);
-        request.__set_write_session_id(write_session_id);
-        request.__set_length(1);
-
-        Status rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
-                master_addr.hostname, master_addr.port,
-                [&request, &result](FrontendServiceConnection& client) {
-                    client->getMaxComputeBlockIdRange(result, request);
-                });
-
-        if (!rpc_status.ok()) {
-            LOG(WARNING) << "Failed to allocate MaxCompute block_id, rpc 
failure, retry_time="
-                         << retry_times << ", txn_id=" << txn_id
-                         << ", write_session_id=" << write_session_id << ", 
status=" << rpc_status;
-            std::this_thread::sleep_for(std::chrono::milliseconds(10));
-            continue;
-        }
-
-        if (!result.__isset.status) {
-            return ResultError(Status::RpcError(
-                    "failed to allocate MaxCompute block_id from FE, missing 
status in response, "
-                    "txn_id={}, write_session_id={}",
-                    txn_id, write_session_id));
-        }
-
-        Status fe_status = Status::create<false>(result.status);
-        if (fe_status.is<ErrorCode::NOT_MASTER>()) {
-            if (!result.__isset.master_address) {
-                return ResultError(Status::RpcError(
-                        "failed to allocate MaxCompute block_id from FE, 
missing master address "
-                        "in NOT_MASTER response, txn_id={}, 
write_session_id={}",
-                        txn_id, write_session_id));
-            }
-            LOG(WARNING) << "Failed to allocate MaxCompute block_id, requested 
non-master FE@"
-                         << master_addr.hostname << ":" << master_addr.port << 
", switch to FE@"
-                         << result.master_address.hostname << ":" << 
result.master_address.port
-                         << ", retry_time=" << retry_times << ", txn_id=" << 
txn_id
-                         << ", write_session_id=" << write_session_id;
-            master_addr = result.master_address;
-            std::this_thread::sleep_for(std::chrono::milliseconds(10));
-            continue;
-        }
-
-        if (!fe_status.ok()) {
-            LOG(WARNING) << "Failed to allocate MaxCompute block_id, FE 
returned error, retry_time="
-                         << retry_times << ", txn_id=" << txn_id
-                         << ", write_session_id=" << write_session_id << ", 
status=" << fe_status;
-            return ResultError(std::move(fe_status));
-        }
-
-        if (result.length != 1) {
-            return ResultError(Status::RpcError(
-                    "failed to allocate MaxCompute block_id from FE, expected 
length=1 but got "
-                    "{}, txn_id={}, write_session_id={}",
-                    result.length, txn_id, write_session_id));
-        }
-
-        LOG(INFO) << "Allocated MaxCompute block_id from FE@" << 
master_addr.hostname << ":"
-                  << master_addr.port << ", txn_id=" << txn_id
-                  << ", write_session_id=" << write_session_id << ", 
block_id=" << result.start;
-        return result.start;
-    }
-
-    return ResultError(Status::RpcError(
-            "failed to allocate MaxCompute block_id from FE, txn_id={}, 
write_session_id={}",
-            txn_id, write_session_id));
-}
-
-} // namespace
-
 jlong JavaNativeMethods::memoryMalloc(JNIEnv* env, jclass clazz, jlong bytes) {
     return reinterpret_cast<long>(malloc(bytes));
 }
@@ -209,28 +104,4 @@ void JavaNativeMethods::memoryFreeBatch(JNIEnv* env, 
jclass clazz, jlongArray ad
     env->ReleaseLongArrayElements(addrs, elems, JNI_ABORT);
 }
 
-jlong JavaNativeMethods::requestMaxComputeBlockId(JNIEnv* env, jclass clazz, 
jlong txn_id,
-                                                  jstring write_session_id) {
-    if (write_session_id == nullptr) {
-        throw_java_runtime_exception(
-                env, "MaxCompute write_session_id is null when requesting 
block_id");
-        return 0;
-    }
-
-    const char* write_session_id_chars = 
env->GetStringUTFChars(write_session_id, nullptr);
-    if (write_session_id_chars == nullptr) {
-        throw_java_runtime_exception(env, "Failed to read MaxCompute 
write_session_id from Java");
-        return 0;
-    }
-    std::string write_session_id_str(write_session_id_chars);
-    env->ReleaseStringUTFChars(write_session_id, write_session_id_chars);
-
-    auto block_id = request_maxcompute_block_id_from_fe(txn_id, 
write_session_id_str);
-    if (!block_id.has_value()) {
-        throw_java_runtime_exception(env, block_id.error().to_string());
-        return 0;
-    }
-    return static_cast<jlong>(block_id.value());
-}
-
 } // namespace doris
diff --git a/be/src/util/jni_native_method.h b/be/src/util/jni_native_method.h
index 23429c3500a..48c74d91d67 100644
--- a/be/src/util/jni_native_method.h
+++ b/be/src/util/jni_native_method.h
@@ -42,10 +42,6 @@ struct JavaNativeMethods {
 
     // Batch free multiple addresses; addrs is a long[]
     static void memoryFreeBatch(JNIEnv* env, jclass clazz, jlongArray addrs);
-
-    // Request a MaxCompute block id from FE via BE.
-    static jlong requestMaxComputeBlockId(JNIEnv* env, jclass clazz, jlong 
txn_id,
-                                          jstring write_session_id);
 };
 
 } // namespace doris
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java
index 1104a5fa934..d48fe8e9347 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/JNINativeMethod.java
@@ -43,8 +43,4 @@ public class JNINativeMethod {
      */
     public static native void memoryTrackerFreeBatch(long[] addrs);
 
-    /**
-     * Request a MaxCompute block id from BE, which will forward the request 
to FE.
-     */
-    public static native long requestMaxComputeBlockId(long txnId, String 
writeSessionId);
 }
diff --git a/fe/be-java-extensions/max-compute-connector/pom.xml 
b/fe/be-java-extensions/max-compute-connector/pom.xml
index 4662ab83a53..5501700800d 100644
--- a/fe/be-java-extensions/max-compute-connector/pom.xml
+++ b/fe/be-java-extensions/max-compute-connector/pom.xml
@@ -41,6 +41,12 @@ under the License.
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>fe-thrift</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>com.aliyun.odps</groupId>
             <artifactId>odps-sdk-core</artifactId>
diff --git 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
new file mode 100644
index 00000000000..82b58f48493
--- /dev/null
+++ 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeFeClient.java
@@ -0,0 +1,326 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.maxcompute;
+
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TMaxComputeBlockIdRequest;
+import org.apache.doris.thrift.TMaxComputeBlockIdResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.layered.TFramedTransport;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * FE thrift client used by MaxCompute writer runtime code in BE's embedded 
JVM.
+ */
+class MaxComputeFeClient implements AutoCloseable {
+    static final String FE_HOST = "fe_host";
+    static final String FE_PORT = "fe_port";
+    static final String FE_RPC_TIMEOUT_MS = "fe_rpc_timeout_ms";
+    static final String FE_THRIFT_SERVER_TYPE = "fe_thrift_server_type";
+
+    private static final Logger LOG = 
Logger.getLogger(MaxComputeFeClient.class);
+    private static final int FETCH_BLOCK_ID_MAX_RETRY_TIMES = 3;
+    private static final long FETCH_BLOCK_ID_RETRY_SLEEP_MS = 10L;
+    private static final long FETCH_BLOCK_ID_LENGTH = 1L;
+    private static final int DEFAULT_FE_RPC_TIMEOUT_MS = 60000;
+    private static final String THREADED_SELECTOR = "THREADED_SELECTOR";
+    private static final String THREAD_POOL = "THREAD_POOL";
+
+    private final int rpcTimeoutMs;
+    private final String thriftServerType;
+    private final RpcExecutor rpcExecutor;
+    private final long retrySleepMs;
+    private TNetworkAddress masterAddress;
+
+    static MaxComputeFeClient create(Map<String, String> params) {
+        String host = requireParam(params, FE_HOST);
+        int port = Integer.parseInt(requireParam(params, FE_PORT));
+        int timeoutMs = Integer.parseInt(params.getOrDefault(FE_RPC_TIMEOUT_MS,
+                String.valueOf(DEFAULT_FE_RPC_TIMEOUT_MS)));
+        String serverType = params.getOrDefault(FE_THRIFT_SERVER_TYPE, 
THREAD_POOL);
+        return new MaxComputeFeClient(new TNetworkAddress(host, port), 
timeoutMs, serverType,
+                new ReusableRpcExecutor(),
+                FETCH_BLOCK_ID_RETRY_SLEEP_MS);
+    }
+
+    MaxComputeFeClient(TNetworkAddress masterAddress, int rpcTimeoutMs, String 
thriftServerType,
+            RpcExecutor rpcExecutor, long retrySleepMs) {
+        this.masterAddress = copyAddress(Objects.requireNonNull(masterAddress, 
"masterAddress"));
+        this.rpcTimeoutMs = rpcTimeoutMs;
+        this.thriftServerType = thriftServerType == null ? THREAD_POOL : 
thriftServerType;
+        this.rpcExecutor = Objects.requireNonNull(rpcExecutor, "rpcExecutor");
+        this.retrySleepMs = retrySleepMs;
+    }
+
+    long requestBlockId(long txnId, String writeSessionId) throws IOException {
+        if (txnId <= 0) {
+            throw new IOException("invalid MaxCompute txn_id for block_id 
allocation: " + txnId);
+        }
+        if (writeSessionId == null || writeSessionId.isEmpty()) {
+            throw new IOException("empty MaxCompute write_session_id for 
block_id allocation");
+        }
+
+        TMaxComputeBlockIdRequest request = buildBlockIdRequest(txnId, 
writeSessionId);
+        return callWithMasterRedirect(
+                "allocate MaxCompute block_id",
+                client -> client.getMaxComputeBlockIdRange(request),
+                (result, requestAddress, retryTimes) ->
+                        handleBlockIdResult(result, requestAddress, 
retryTimes, txnId, writeSessionId));
+    }
+
+    @Override
+    public synchronized void close() {
+        rpcExecutor.close();
+    }
+
+    private synchronized <T, R> R callWithMasterRedirect(String operation, 
FeCall<T> call,
+            ResponseHandler<T, R> handler)
+            throws IOException {
+        validateAddress(masterAddress);
+
+        Exception lastException = null;
+        for (int retryTimes = 0; retryTimes < FETCH_BLOCK_ID_MAX_RETRY_TIMES; 
retryTimes++) {
+            TNetworkAddress requestAddress = copyAddress(masterAddress);
+            T result;
+            try {
+                result = rpcExecutor.call(requestAddress, rpcTimeoutMs, 
useFramedTransport(), call);
+            } catch (Exception e) {
+                lastException = e;
+                rpcExecutor.close();
+                LOG.warn("Failed to " + operation + ", rpc failure, 
retry_time="
+                        + retryTimes + ", fe=" + 
formatAddress(requestAddress), e);
+                sleepBeforeRetry();
+                continue;
+            }
+
+            try {
+                return handler.handle(result, requestAddress, retryTimes);
+            } catch (NotMasterException e) {
+                masterAddress = copyAddress(e.masterAddress);
+                lastException = e;
+                rpcExecutor.close();
+                sleepBeforeRetry();
+            }
+        }
+
+        throw new IOException("failed to " + operation + " from FE", 
lastException);
+    }
+
+    private long handleBlockIdResult(TMaxComputeBlockIdResult result, 
TNetworkAddress requestAddress, int retryTimes,
+            long txnId, String writeSessionId) throws IOException, 
NotMasterException {
+        if (result == null || !result.isSetStatus()) {
+            throw new IOException("failed to allocate MaxCompute block_id from 
FE, missing status in response, "
+                    + "txn_id=" + txnId + ", write_session_id=" + 
writeSessionId);
+        }
+
+        TStatus status = result.getStatus();
+        TStatusCode code = status.getStatusCode();
+        if (code == null) {
+            throw new IOException("failed to allocate MaxCompute block_id from 
FE, missing status code, "
+                    + "txn_id=" + txnId + ", write_session_id=" + 
writeSessionId);
+        }
+        if (code == TStatusCode.NOT_MASTER) {
+            if (!result.isSetMasterAddress()) {
+                throw new IOException("failed to allocate MaxCompute block_id 
from FE, missing master address "
+                        + "in NOT_MASTER response, txn_id=" + txnId + ", 
write_session_id=" + writeSessionId);
+            }
+            LOG.warn("Failed to allocate MaxCompute block_id, requested 
non-master FE@"
+                    + formatAddress(requestAddress) + ", switch to FE@" + 
formatAddress(result.getMasterAddress())
+                    + ", retry_time=" + retryTimes + ", txn_id=" + txnId
+                    + ", write_session_id=" + writeSessionId);
+            throw new NotMasterException(result.getMasterAddress());
+        }
+
+        if (code != TStatusCode.OK) {
+            throw new IOException("failed to allocate MaxCompute block_id from 
FE, status="
+                    + statusErrorMessage(status) + ", txn_id=" + txnId
+                    + ", write_session_id=" + writeSessionId);
+        }
+
+        if (!result.isSetStart()) {
+            throw new IOException("failed to allocate MaxCompute block_id from 
FE, missing start in response, "
+                    + "txn_id=" + txnId + ", write_session_id=" + 
writeSessionId);
+        }
+        if (!result.isSetLength() || result.getLength() != 
FETCH_BLOCK_ID_LENGTH) {
+            throw new IOException("failed to allocate MaxCompute block_id from 
FE, expected length=1 but got "
+                    + result.getLength() + ", txn_id=" + txnId + ", 
write_session_id=" + writeSessionId);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Allocated MaxCompute block_id from FE@" + 
formatAddress(requestAddress)
+                    + ", txn_id=" + txnId + ", write_session_id=" + 
writeSessionId
+                    + ", block_id=" + result.getStart());
+        }
+        return result.getStart();
+    }
+
+    private static TMaxComputeBlockIdRequest buildBlockIdRequest(long txnId, 
String writeSessionId) {
+        TMaxComputeBlockIdRequest request = new TMaxComputeBlockIdRequest();
+        request.setTxnId(txnId);
+        request.setWriteSessionId(writeSessionId);
+        request.setLength(FETCH_BLOCK_ID_LENGTH);
+        return request;
+    }
+
+    private boolean useFramedTransport() {
+        return THREADED_SELECTOR.equalsIgnoreCase(thriftServerType);
+    }
+
+    private void sleepBeforeRetry() throws IOException {
+        try {
+            Thread.sleep(retrySleepMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("interrupted while retrying MaxCompute 
block_id allocation", e);
+        }
+    }
+
+    private static TTransport createTransport(TNetworkAddress address, int 
timeoutMs,
+            boolean useFramedTransport) throws TTransportException {
+        TSocket socket = new TSocket(address.getHostname(), address.getPort(), 
timeoutMs);
+        return useFramedTransport ? new TFramedTransport(socket) : socket;
+    }
+
+    private static void validateAddress(TNetworkAddress address) throws 
IOException {
+        if (address.getHostname() == null || address.getHostname().isEmpty() 
|| address.getPort() <= 0) {
+            throw new IOException("invalid FE address for MaxCompute block_id 
allocation: "
+                    + formatAddress(address));
+        }
+    }
+
+    private static String statusErrorMessage(TStatus status) {
+        List<String> errorMsgs = status.getErrorMsgs();
+        if (errorMsgs == null || errorMsgs.isEmpty()) {
+            return status.getStatusCode().name();
+        }
+        return status.getStatusCode().name() + ": " + String.join("; ", 
errorMsgs);
+    }
+
+    private static String requireParam(Map<String, String> params, String key) 
{
+        String value = params.get(key);
+        if (value == null || value.isEmpty()) {
+            throw new IllegalArgumentException("required property '" + key + 
"'.");
+        }
+        return value;
+    }
+
+    private static TNetworkAddress copyAddress(TNetworkAddress address) {
+        return new TNetworkAddress(address.getHostname(), address.getPort());
+    }
+
+    private static boolean sameAddress(TNetworkAddress left, TNetworkAddress 
right) {
+        return left != null && right != null
+                && Objects.equals(left.getHostname(), right.getHostname())
+                && left.getPort() == right.getPort();
+    }
+
+    private static String formatAddress(TNetworkAddress address) {
+        if (address == null) {
+            return "null";
+        }
+        return address.getHostname() + ":" + address.getPort();
+    }
+
+    interface RpcExecutor {
+        <T> T call(TNetworkAddress address, int timeoutMs, boolean 
useFramedTransport,
+                FeCall<T> call) throws Exception;
+
+        default void close() {
+        }
+    }
+
+    interface FeCall<T> {
+        T call(FrontendService.Client client) throws Exception;
+    }
+
+    private interface ResponseHandler<T, R> {
+        R handle(T result, TNetworkAddress requestAddress, int retryTimes) 
throws IOException, NotMasterException;
+    }
+
+    private static class ReusableRpcExecutor implements RpcExecutor {
+        private TNetworkAddress connectedAddress;
+        private boolean connectedFramedTransport;
+        private TTransport transport;
+        private FrontendService.Client client;
+
+        @Override
+        public synchronized <T> T call(TNetworkAddress address, int timeoutMs, 
boolean useFramedTransport,
+                FeCall<T> call) throws Exception {
+            ensureConnected(address, timeoutMs, useFramedTransport);
+            try {
+                return call.call(client);
+            } catch (Exception e) {
+                close();
+                throw e;
+            }
+        }
+
+        @Override
+        public synchronized void close() {
+            if (transport != null) {
+                transport.close();
+            }
+            transport = null;
+            client = null;
+            connectedAddress = null;
+        }
+
+        private void ensureConnected(TNetworkAddress address, int timeoutMs, 
boolean useFramedTransport)
+                throws Exception {
+            if (client != null && transport != null && transport.isOpen()
+                    && connectedFramedTransport == useFramedTransport
+                    && sameAddress(connectedAddress, address)) {
+                return;
+            }
+
+            close();
+            TTransport newTransport = createTransport(address, timeoutMs, 
useFramedTransport);
+            try {
+                newTransport.open();
+                transport = newTransport;
+                client = new FrontendService.Client(new 
TBinaryProtocol(transport));
+                connectedAddress = copyAddress(address);
+                connectedFramedTransport = useFramedTransport;
+            } catch (Exception e) {
+                newTransport.close();
+                throw e;
+            }
+        }
+    }
+
+    private static class NotMasterException extends Exception {
+        private final TNetworkAddress masterAddress;
+
+        NotMasterException(TNetworkAddress masterAddress) {
+            super("not master, master=" + formatAddress(masterAddress));
+            this.masterAddress = copyAddress(masterAddress);
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
index 38bfc7f17a7..9788184057e 100644
--- 
a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
+++ 
b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java
@@ -18,7 +18,6 @@
 package org.apache.doris.maxcompute;
 
 import org.apache.doris.common.jni.JniWriter;
-import org.apache.doris.common.jni.utils.JNINativeMethod;
 import org.apache.doris.common.jni.vec.VectorColumn;
 import org.apache.doris.common.jni.vec.VectorTable;
 import org.apache.doris.common.maxcompute.MCProperties;
@@ -115,6 +114,7 @@ public class MaxComputeJniWriter extends JniWriter {
     private final int readTimeout;
     private final int retryCount;
     private final long maxBlockBytes;
+    private final MaxComputeFeClient feClient;
 
     // Storage API objects
     private TableBatchWriteSession writeSession;
@@ -155,6 +155,7 @@ public class MaxComputeJniWriter extends JniWriter {
         this.maxBlockBytes = Long.parseLong(
                 params.getOrDefault(MCProperties.WRITE_MAX_BLOCK_BYTES,
                         MCProperties.DEFAULT_WRITE_MAX_BLOCK_BYTES));
+        this.feClient = MaxComputeFeClient.create(params);
     }
 
     @Override
@@ -241,12 +242,12 @@ public class MaxComputeJniWriter extends JniWriter {
         }
     }
 
-    private long resolveInitialBlockId() {
+    private long resolveInitialBlockId() throws IOException {
         return preallocatedBlockId != null ? preallocatedBlockId : 
requestBlockId();
     }
 
-    private long requestBlockId() {
-        return JNINativeMethod.requestMaxComputeBlockId(txnId, writeSessionId);
+    private long requestBlockId() throws IOException {
+        return feClient.requestBlockId(txnId, writeSessionId);
     }
 
     private void openBatchWriter(long blockId) throws IOException {
@@ -918,6 +919,8 @@ public class MaxComputeJniWriter extends JniWriter {
             String errorMsg = "Failed to close MaxCompute arrow writer";
             LOG.error(errorMsg, e);
             throw new IOException(errorMsg, e);
+        } finally {
+            feClient.close();
         }
     }
 
diff --git 
a/fe/be-java-extensions/max-compute-connector/src/test/java/org/apache/doris/maxcompute/MaxComputeFeClientTest.java
 
b/fe/be-java-extensions/max-compute-connector/src/test/java/org/apache/doris/maxcompute/MaxComputeFeClientTest.java
new file mode 100644
index 00000000000..923919de408
--- /dev/null
+++ 
b/fe/be-java-extensions/max-compute-connector/src/test/java/org/apache/doris/maxcompute/MaxComputeFeClientTest.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.maxcompute;
+
+import org.apache.doris.thrift.TMaxComputeBlockIdRequest;
+import org.apache.doris.thrift.TMaxComputeBlockIdResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+
+public class MaxComputeFeClientTest {
+    @Test
+    public void testRequestBlockIdSuccess() throws Exception {
+        FakeExecutor executor = new FakeExecutor(okResult(42L));
+        MaxComputeFeClient client = new MaxComputeFeClient(
+                new TNetworkAddress("fe1", 9010), 1234, "THREAD_POOL", 
executor, 0);
+
+        Assert.assertEquals(42L, client.requestBlockId(100L, "session-1"));
+        Assert.assertEquals(1, executor.addresses.size());
+        Assert.assertEquals("fe1", executor.addresses.get(0).getHostname());
+        Assert.assertEquals(9010, executor.addresses.get(0).getPort());
+        Assert.assertFalse(executor.framedTransports.get(0));
+        Assert.assertEquals(1234, (int) executor.timeouts.get(0));
+        Assert.assertEquals(100L, executor.requests.get(0).getTxnId());
+        Assert.assertEquals("session-1", 
executor.requests.get(0).getWriteSessionId());
+        Assert.assertEquals(1L, executor.requests.get(0).getLength());
+    }
+
+    @Test
+    public void testRequestBlockIdRedirectsToMaster() throws Exception {
+        FakeExecutor executor = new FakeExecutor(notMasterResult("master", 
9020), okResult(7L));
+        MaxComputeFeClient client = new MaxComputeFeClient(
+                new TNetworkAddress("follower", 9010), 1234, 
"THREADED_SELECTOR", executor, 0);
+
+        Assert.assertEquals(7L, client.requestBlockId(101L, "session-2"));
+        Assert.assertEquals(2, executor.addresses.size());
+        Assert.assertEquals("follower", 
executor.addresses.get(0).getHostname());
+        Assert.assertEquals("master", executor.addresses.get(1).getHostname());
+        Assert.assertTrue(executor.framedTransports.get(0));
+        Assert.assertTrue(executor.framedTransports.get(1));
+    }
+
+    @Test
+    public void testFeErrorFailsWithoutRetry() {
+        FakeExecutor executor = new FakeExecutor(errorResult("allocation 
failed"));
+        MaxComputeFeClient client = new MaxComputeFeClient(
+                new TNetworkAddress("fe1", 9010), 1234, "THREAD_POOL", 
executor, 0);
+
+        expectIOExceptionContains(() -> client.requestBlockId(102L, 
"session-3"), "allocation failed");
+        Assert.assertEquals(1, executor.addresses.size());
+    }
+
+    @Test
+    public void testRpcFailureRetries() throws Exception {
+        FakeExecutor executor = new FakeExecutor(
+                new IOException("connect failed"),
+                new IOException("temporary failure"),
+                okResult(9L));
+        MaxComputeFeClient client = new MaxComputeFeClient(
+                new TNetworkAddress("fe1", 9010), 1234, "THREAD_POOL", 
executor, 0);
+
+        Assert.assertEquals(9L, client.requestBlockId(103L, "session-4"));
+        Assert.assertEquals(3, executor.addresses.size());
+    }
+
+    private static TMaxComputeBlockIdResult okResult(long start) {
+        TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult();
+        result.setStatus(new TStatus(TStatusCode.OK));
+        result.setStart(start);
+        result.setLength(1L);
+        return result;
+    }
+
+    private static TMaxComputeBlockIdResult notMasterResult(String host, int 
port) {
+        TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult();
+        result.setStatus(new TStatus(TStatusCode.NOT_MASTER));
+        result.setMasterAddress(new TNetworkAddress(host, port));
+        return result;
+    }
+
+    private static TMaxComputeBlockIdResult errorResult(String errorMsg) {
+        TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
+        status.addToErrorMsgs(errorMsg);
+        TMaxComputeBlockIdResult result = new TMaxComputeBlockIdResult();
+        result.setStatus(status);
+        return result;
+    }
+
+    private static void expectIOExceptionContains(IOAction action, String 
expectedMessage) {
+        try {
+            action.run();
+            Assert.fail("expected IOException");
+        } catch (IOException e) {
+            Assert.assertTrue(e.getMessage(), 
e.getMessage().contains(expectedMessage));
+        }
+    }
+
+    private interface IOAction {
+        void run() throws IOException;
+    }
+
+    private static class FakeExecutor implements 
MaxComputeFeClient.RpcExecutor {
+        private final Queue<Object> responses;
+        private final List<TNetworkAddress> addresses = new ArrayList<>();
+        private final List<Integer> timeouts = new ArrayList<>();
+        private final List<Boolean> framedTransports = new ArrayList<>();
+        private final List<TMaxComputeBlockIdRequest> requests = new 
ArrayList<>();
+
+        FakeExecutor(Object... responses) {
+            this.responses = new ArrayDeque<>(Arrays.asList(responses));
+        }
+
+        @Override
+        public <T> T call(TNetworkAddress address, int timeoutMs, boolean 
useFramedTransport,
+                MaxComputeFeClient.FeCall<T> call) throws Exception {
+            addresses.add(new TNetworkAddress(address.getHostname(), 
address.getPort()));
+            timeouts.add(timeoutMs);
+            framedTransports.add(useFramedTransport);
+
+            FrontendServiceClient client = new FrontendServiceClient();
+            return call.call(client);
+        }
+
+        private class FrontendServiceClient extends 
org.apache.doris.thrift.FrontendService.Client {
+            FrontendServiceClient() {
+                super((TProtocol) null);
+            }
+
+            @Override
+            public TMaxComputeBlockIdResult 
getMaxComputeBlockIdRange(TMaxComputeBlockIdRequest request)
+                    throws org.apache.thrift.TException {
+                requests.add(request);
+
+                Object response = responses.remove();
+                if (response instanceof RuntimeException) {
+                    throw (RuntimeException) response;
+                }
+                if (response instanceof IOException) {
+                    throw new RuntimeException((IOException) response);
+                }
+                if (response instanceof org.apache.thrift.TException) {
+                    throw (org.apache.thrift.TException) response;
+                }
+                if (response instanceof Exception) {
+                    throw new RuntimeException((Exception) response);
+                }
+                return (TMaxComputeBlockIdResult) response;
+            }
+        }
+    }
+}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 74a3ffa1f70..394172f51fe 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2349,6 +2349,13 @@ public class Config extends ConfigBase {
             "Max cache file number of external table split file meta cache at 
query level."})
     public static long max_external_table_split_file_meta_cache_num = 100000;
 
+    /**
+     * Maximum number of MaxCompute Storage API write block IDs that can be 
allocated in one write session.
+     */
+    @ConfField(mutable = false, masterOnly = true, description = {
+            "Maximum number of MaxCompute Storage API write block IDs that can 
be allocated in one write session."})
+    public static long max_compute_write_max_block_count = 20000L;
+
     /**
      * Max cache loader thread-pool size.
      * Max thread pool size for loading external meta cache
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
index 76a3c84ebb7..9f1c61ddf24 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.maxcompute;
 
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalTable;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
@@ -49,7 +50,6 @@ import java.util.stream.Collectors;
 public class MCTransaction implements Transaction {
 
     private static final Logger LOG = 
LogManager.getLogger(MCTransaction.class);
-    private static final long MAX_BLOCK_COUNT = 20000L;
 
     private final MaxComputeExternalCatalog catalog;
     private MaxComputeExternalTable table;
@@ -147,9 +147,10 @@ public class MCTransaction implements Transaction {
         do {
             start = nextBlockId.get();
             endExclusive = start + length;
-            if (endExclusive > MAX_BLOCK_COUNT) {
+            if (endExclusive > Config.max_compute_write_max_block_count) {
                 throw new UserException("MaxCompute block_id exceeds limit, 
start="
-                        + start + ", length=" + length + ", maxBlockCount=" + 
MAX_BLOCK_COUNT);
+                        + start + ", length=" + length + ", maxBlockCount="
+                        + Config.max_compute_write_max_block_count);
             }
         } while (!nextBlockId.compareAndSet(start, endExclusive));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to