This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new a092ebb17c1 branch-3.0: [Fix](Iceberg-hadoop-catalog)Fix Kerberos-authenticated HadoopCatalog insert failures due to missing kerberos credentials #51245 (#51336) a092ebb17c1 is described below commit a092ebb17c1081750ffdde357893fe9a7d15f45e Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Jun 11 10:58:08 2025 +0800 branch-3.0: [Fix](Iceberg-hadoop-catalog)Fix Kerberos-authenticated HadoopCatalog insert failures due to missing kerberos credentials #51245 (#51336) Cherry-picked from #51245 --------- Co-authored-by: Calvin Kirs <guoqi...@selectdb.com> --- .../authentication/PreExecutionAuthenticator.java | 48 ++++------- .../datasource/iceberg/IcebergTransaction.java | 16 +++- .../test_iceberg_hadoop_catalog_kerberos.groovy | 98 ++++++++++++++++++++++ 3 files changed, 124 insertions(+), 38 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java index 0d7cf60c6f7..93967247330 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticator.java @@ -19,7 +19,6 @@ package org.apache.doris.common.security.authentication; import org.apache.hadoop.conf.Configuration; -import java.security.PrivilegedExceptionAction; import java.util.concurrent.Callable; /** @@ -69,14 +68,26 @@ public class PreExecutionAuthenticator { public <T> T execute(Callable<T> task) throws Exception { if (hadoopAuthenticator != null) { // Adapts Callable to PrivilegedExceptionAction for use with Hadoop authentication - PrivilegedExceptionAction<T> action = new CallableToPrivilegedExceptionActionAdapter<>(task); - return hadoopAuthenticator.doAs(action); + return hadoopAuthenticator.doAs(task::call); } else { // Executes the task directly if no authentication is needed return task.call(); } } + public void execute(Runnable task) throws Exception { + if (hadoopAuthenticator != null) { + // Adapts Runnable to PrivilegedExceptionAction for use with Hadoop authentication + hadoopAuthenticator.doAs(() -> { + task.run(); + return null; + }); + } else { + // Executes the task directly if no authentication is needed + task.run(); + } + } + /** * Retrieves the current HadoopAuthenticator. * <p>This allows checking if a HadoopAuthenticator is configured or @@ -97,35 +108,4 @@ public class PreExecutionAuthenticator { public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { this.hadoopAuthenticator = hadoopAuthenticator; } - - /** - * Adapter class to convert a Callable into a PrivilegedExceptionAction. - * <p>This is necessary to run the task within a privileged context, - * particularly for Hadoop operations with Kerberos. - * - * @param <T> The type of result returned by the action - */ - public class CallableToPrivilegedExceptionActionAdapter<T> implements PrivilegedExceptionAction<T> { - private final Callable<T> callable; - - /** - * Constructs an adapter that wraps a Callable into a PrivilegedExceptionAction. - * - * @param callable The Callable to be adapted - */ - public CallableToPrivilegedExceptionActionAdapter(Callable<T> callable) { - this.callable = callable; - } - - /** - * Executes the wrapped Callable as a PrivilegedExceptionAction. - * - * @return The result of the callable's call method - * @throws Exception If an exception occurs during callable execution - */ - @Override - public T run() throws Exception { - return callable.call(); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index d0cca11b0af..e36db86022e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -73,10 +73,18 @@ public class IcebergTransaction implements Transaction { } } - public void beginInsert(SimpleTableInfo tableInfo) { - this.tableInfo = tableInfo; - this.table = getNativeTable(tableInfo); - this.transaction = table.newTransaction(); + public void beginInsert(SimpleTableInfo tableInfo) throws UserException { + try { + ops.getPreExecutionAuthenticator().execute(() -> { + // create and start the iceberg transaction + this.tableInfo = tableInfo; + this.table = getNativeTable(tableInfo); + this.transaction = table.newTransaction(); + }); + } catch (Exception e) { + throw new UserException("Failed to begin insert for iceberg table " + tableInfo, e); + } + } public void finishInsert(SimpleTableInfo tableInfo, Optional<InsertCommandContext> insertCtx) { diff --git a/regression-test/suites/external_table_p0/kerberos/test_iceberg_hadoop_catalog_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_iceberg_hadoop_catalog_kerberos.groovy new file mode 100644 index 00000000000..61b567e1d83 --- /dev/null +++ b/regression-test/suites/external_table_p0/kerberos/test_iceberg_hadoop_catalog_kerberos.groovy @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_hadoop_catalog_kerberos", "p0,external,kerberos,external_docker,external_docker_kerberos") { + String enabled = context.config.otherConfigs.get("enableKerberosTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + return + } + def String catalog_name = "iceberg_hadoop_catalog_kerberos_test" + def database_name = "test_iceberg_hadoop_db" + def String test_tbl_name="iceberg_test_table" + def keytab_root_dir = "/keytabs" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + sql """ + drop catalog if exists ${catalog_name} + """ + sql """ + CREATE CATALOG IF NOT EXISTS ${catalog_name} + PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type' = 'hadoop', + 'warehouse' = 'hdfs://${externalEnvIp}:8520/tmp/iceberg/catalog', + "hadoop.security.authentication" = "kerberos", + "hadoop.security.auth_to_local" = "RULE:[2:\$1@\$0](.*@LABS.TERADATA.COM)s/@.*// + RULE:[2:\$1@\$0](.*@OTHERLABS.TERADATA.COM)s/@.*// + RULE:[2:\$1@\$0](.*@OTHERREALM.COM)s/@.*// + DEFAULT", + "hadoop.kerberos.principal" = "hive/presto-master.docker.clus...@labs.teradata.com", + "hadoop.kerberos.min.seconds.before.relogin" = "5", + "hadoop.kerberos.keytab.login.autorenewal.enabled" = "false", + "hadoop.kerberos.keytab" = "${keytab_root_dir}/hive-presto-master.keytab", + "fs.defaultFS" = "hdfs://${externalEnvIp}:8520" + ); + """ + + sql """ switch ${catalog_name} """ + sql """ drop database if exists ${database_name} """ + sql """ create database if not exists ${database_name} """ + def database = sql """ show databases like '%${database_name}' """ + assert database.size() == 1 + sql """ use ${database_name} """ + sql """ drop table if exists ${test_tbl_name}""" + sql """ + CREATE TABLE ${test_tbl_name} ( + `ts` DATETIME COMMENT 'ts', + `col1` BOOLEAN COMMENT 'col1', + `col2` INT COMMENT 'col2', + `col3` BIGINT COMMENT 'col3', + `col4` FLOAT COMMENT 'col4', + `col5` DOUBLE COMMENT 'col5', + `col6` DECIMAL(9,4) COMMENT 'col6', + `col7` STRING COMMENT 'col7', + `col8` DATE COMMENT 'col8', + `col9` DATETIME COMMENT 'col9', + `pt1` STRING COMMENT 'pt1', + `pt2` STRING COMMENT 'pt2' + ) ENGINE=iceberg + PARTITION BY LIST (DAY(ts), pt1, pt2) () + PROPERTIES ( + 'write-format'='orc', + 'compression-codec'='zlib' + ); + """ + def table = sql """ show tables like '%${test_tbl_name}' """ + assert table.size() == 1 + sql """ + insert into ${test_tbl_name} values ( + '2024-05-26 12:34:56', + true, + 123, + 1234567890123, + 12.34, + 56.789, + 12345.6789, + 'example text', + '2024-05-26', + '2024-05-26 14:00:00', + 'partition_val1', + 'partition_val2' + ); + """ + def dataResult = sql """select count(1) from ${test_tbl_name} """ + assert dataResult.get(0).get(0) == 1 +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org