morrySnow commented on code in PR #63068:
URL: https://github.com/apache/doris/pull/63068#discussion_r3354546688
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java:
##########
@@ -133,6 +141,90 @@ protected void initLocalObjectsImpl() {
metadataOps = ops;
}
+ @Override
+ protected List<String> listDatabaseNames() {
Review Comment:
**nit:** This override of `listDatabaseNames()` is a no-op — it just calls
`super.listDatabaseNames()`. Unless this is needed to widen visibility from
package-private to protected, it should be removed to avoid confusion with the
overloaded `listDatabaseNames(SessionContext ctx)` below.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java:
##########
@@ -158,19 +250,45 @@ public String getIcebergCatalogType() {
@Override
public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
makeSureInitialized();
- return metadataOps.tableExist(dbName, tblName);
+ return ((IcebergMetadataOps) metadataOps).tableExist(ctx, dbName,
tblName);
}
@Override
protected List<String> listTableNamesFromRemote(SessionContext ctx, String
dbName) {
// On the Doris side, the result of SHOW TABLES for Iceberg external
tables includes both tables and views,
// so the combined set of tables and views is used here.
- List<String> tableNames = metadataOps.listTableNames(dbName);
- List<String> viewNames = metadataOps.listViewNames(dbName);
+ IcebergMetadataOps ops = (IcebergMetadataOps) metadataOps;
+ List<String> tableNames = ops.listTableNames(ctx, dbName);
+ List<String> viewNames = ops.listViewNames(ctx, dbName);
tableNames.addAll(viewNames);
return tableNames;
}
+ @Override
+ protected boolean shouldBypassTableNameCache(SessionContext ctx) {
+ return ctx != null && ctx.hasDelegatedCredential() &&
isIcebergRestUserSessionEnabled();
+ }
+
+ public boolean isIcebergRestUserSessionEnabled() {
+ makeSureInitialized();
+ return msProperties instanceof IcebergRestProperties
+ && ((IcebergRestProperties)
msProperties).isIcebergRestUserSessionEnabled();
+ }
+
+ private boolean isIcebergRestUserSessionPropertyEnabled() {
Review Comment:
**Initialization inconsistency:** `isIcebergRestUserSessionEnabled()` (line
272) calls `makeSureInitialized()` before accessing `msProperties`, but
`isIcebergRestUserSessionPropertyEnabled()` (here, line 278) does NOT call
`makeSureInitialized()` before accessing
`catalogProperty.getMetastoreProperties()`.
If the catalog hasn't been initialized,
`catalogProperty.getMetastoreProperties()` could return null or an unexpected
state. This is currently called from `listDatabaseNames(SessionContext ctx)` →
`getDbNames()`, and `getDbNames()` does call `makeSureInitialized()`. But the
inconsistency is fragile — a future refactor could introduce a call path that
hits this without prior initialization.
Consider adding `makeSureInitialized()` here, or rename the two methods to
make the initialization contract explicit.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java:
##########
@@ -158,19 +250,45 @@ public String getIcebergCatalogType() {
@Override
public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
makeSureInitialized();
- return metadataOps.tableExist(dbName, tblName);
+ return ((IcebergMetadataOps) metadataOps).tableExist(ctx, dbName,
tblName);
}
@Override
protected List<String> listTableNamesFromRemote(SessionContext ctx, String
dbName) {
// On the Doris side, the result of SHOW TABLES for Iceberg external
tables includes both tables and views,
// so the combined set of tables and views is used here.
- List<String> tableNames = metadataOps.listTableNames(dbName);
- List<String> viewNames = metadataOps.listViewNames(dbName);
+ IcebergMetadataOps ops = (IcebergMetadataOps) metadataOps;
+ List<String> tableNames = ops.listTableNames(ctx, dbName);
+ List<String> viewNames = ops.listViewNames(ctx, dbName);
tableNames.addAll(viewNames);
return tableNames;
}
+ @Override
+ protected boolean shouldBypassTableNameCache(SessionContext ctx) {
+ return ctx != null && ctx.hasDelegatedCredential() &&
isIcebergRestUserSessionEnabled();
+ }
+
+ public boolean isIcebergRestUserSessionEnabled() {
+ makeSureInitialized();
+ return msProperties instanceof IcebergRestProperties
+ && ((IcebergRestProperties)
msProperties).isIcebergRestUserSessionEnabled();
+ }
+
+ private boolean isIcebergRestUserSessionPropertyEnabled() {
+ return catalogProperty.getMetastoreProperties() instanceof
IcebergRestProperties
+ && ((IcebergRestProperties)
catalogProperty.getMetastoreProperties()).isIcebergRestUserSessionEnabled();
+ }
+
+ private static SessionContext currentSessionContext() {
Review Comment:
**DRY / maintainability:** The `currentSessionContext()` static helper is
duplicated identically in at least 4 places:
- `IcebergExternalCatalog` (here, lines 283–290)
- `IcebergExternalTable` (line 342)
- `IcebergMetadataOps` (line 1255)
- `IcebergUtils` (line 1712)
Consider extracting this to a shared location, e.g. a static method on
`SessionContext` or a `SessionContextUtil` helper class. This would reduce
duplication and ensure consistent null-safety behavior across all call sites.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/DelegatedCredential.java:
##########
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+
+public class DelegatedCredential {
+ private final Type type;
+ private final String token;
+ private final Long expiresAtMillis;
+
+ public DelegatedCredential(Type type, String token) {
+ this(type, token, OptionalLong.empty());
+ }
+
+ public DelegatedCredential(Type type, String token, OptionalLong
expiresAtMillis) {
+ this.type = Objects.requireNonNull(type, "type is required");
+ this.token = Objects.requireNonNull(token, "token is required");
+ Objects.requireNonNull(expiresAtMillis, "expiresAtMillis is required");
+ this.expiresAtMillis = expiresAtMillis.isPresent() ?
expiresAtMillis.getAsLong() : null;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public String getToken() {
+ return token;
+ }
+
+ public OptionalLong getExpiresAtMillis() {
+ return expiresAtMillis == null ? OptionalLong.empty() :
OptionalLong.of(expiresAtMillis);
+ }
+
+ public boolean isExpired(long currentTimeMillis) {
Review Comment:
**nit:** `isExpired()` uses inclusive comparison `currentTimeMillis >=
expiresAtMillis`. At exact expiration time, the credential is considered
already expired. The PR description correctly notes that "The Iceberg REST
server remains the authority for whether the delegated credential is valid" —
so this is an advisory client-side check. Consider a brief comment documenting
that this is a conservative guard and the server makes the final decision.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java:
##########
@@ -1641,7 +1664,58 @@ private static Optional<SchemaCacheValue>
loadTableSchemaCacheValue(ExternalTabl
}
}
}
- return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns));
+ return new IcebergSchemaCacheValue(schema, tmpColumns);
+ }
+
+ private static IcebergSnapshotCacheValue
loadSnapshotCacheValue(ExternalTable dorisTable, Table icebergTable) {
Review Comment:
**Potential runtime failure:** `loadSnapshotCacheValue()` unconditionally
casts `dorisTable` to `MTMVRelatedTableIf` and throws a `RuntimeException` if
the cast fails:
```java
if (!(dorisTable instanceof MTMVRelatedTableIf)) {
throw new RuntimeException("Table X is not a valid MTMV related table.");
}
MTMVRelatedTableIf table = (MTMVRelatedTableIf) dorisTable;
```
This method is called from `getLatestSnapshotCacheValue()` for ALL
session-catalog tables (line 1576), not just MTMV-related ones. A regular
Iceberg external table that doesn't implement `MTMVRelatedTableIf` will crash
here.
The non-session code path
(`icebergExternalMetaCache(dorisTable).getSnapshotCache(dorisTable)`) handles
non-MTMV tables gracefully via the cache. The session path should do the same —
either fall back to a simpler snapshot loading approach for non-MTMV tables, or
throw a `DdlException`/`UserException` with a clearer message explaining
session-catalog limitations.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSessionCatalogAdapter.java:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.datasource.DelegatedCredential;
+import org.apache.doris.datasource.SessionContext;
+import
org.apache.doris.datasource.property.metastore.IcebergRestProperties.DelegatedTokenMode;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.catalog.BaseSessionCatalog;
+import org.apache.iceberg.catalog.BaseViewSessionCatalog;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.ViewCatalog;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Adapts Doris session-scoped delegated credentials to Iceberg REST {@link
BaseSessionCatalog} calls.
+ *
+ * <p>When Doris has a delegated credential in {@link SessionContext}, Iceberg
REST user-session mode requires the
+ * request to use a session-bound {@link Catalog} or {@link ViewCatalog}. This
adapter keeps the plain catalog path for
+ * requests without delegated credentials and switches to Iceberg's session
catalog only for user-session requests.
+ */
+class IcebergSessionCatalogAdapter {
+ private static final String SESSION_CATALOG_FIELD = "sessionCatalog";
+
+ private final Catalog catalog;
+ private final Optional<BaseSessionCatalog> sessionCatalog;
+ private final DelegatedTokenMode delegatedTokenMode;
+
+ IcebergSessionCatalogAdapter(Catalog catalog) {
+ this(catalog, DelegatedTokenMode.ACCESS_TOKEN);
+ }
+
+ IcebergSessionCatalogAdapter(Catalog catalog, DelegatedTokenMode
delegatedTokenMode) {
+ this.catalog = catalog;
+ this.sessionCatalog = extractSessionCatalog(catalog);
+ this.delegatedTokenMode = delegatedTokenMode;
+ }
+
+ Catalog catalog(SessionContext context) {
+ if (!hasDelegatedCredential(context)) {
+ return catalog;
+ }
+ BaseSessionCatalog activeSessionCatalog = requireSessionCatalog();
+ return activeSessionCatalog.asCatalog(toIcebergSessionContext(context,
delegatedTokenMode));
+ }
+
+ SupportsNamespaces namespaces(SessionContext context) {
+ return (SupportsNamespaces) catalog(context);
+ }
+
+ Catalog delegatedCatalog(SessionContext context) {
+ return requireSessionCatalog().asCatalog(toIcebergSessionContext(
+ requireDelegatedCredential(context), delegatedTokenMode));
+ }
+
+ SupportsNamespaces delegatedNamespaces(SessionContext context) {
+ return (SupportsNamespaces) delegatedCatalog(context);
+ }
+
+ Optional<ViewCatalog> delegatedViewCatalog(SessionContext context) {
+ BaseSessionCatalog activeSessionCatalog = requireSessionCatalog();
+ if (activeSessionCatalog instanceof BaseViewSessionCatalog) {
+ return Optional.of(((BaseViewSessionCatalog) activeSessionCatalog)
+
.asViewCatalog(toIcebergSessionContext(requireDelegatedCredential(context),
delegatedTokenMode)));
+ }
+ requireDelegatedCredential(context);
+ return Optional.empty();
+ }
+
+ Optional<ViewCatalog> viewCatalog(SessionContext context) {
+ if (!hasDelegatedCredential(context)) {
+ return catalog instanceof ViewCatalog ? Optional.of((ViewCatalog)
catalog) : Optional.empty();
+ }
+ BaseSessionCatalog sessionCatalog = requireSessionCatalog();
+ if (sessionCatalog instanceof BaseViewSessionCatalog) {
+ return Optional.of(((BaseViewSessionCatalog) sessionCatalog)
+ .asViewCatalog(toIcebergSessionContext(context,
delegatedTokenMode)));
+ }
+ return Optional.empty();
+ }
+
+ private Optional<BaseSessionCatalog> extractSessionCatalog(Catalog
catalog) {
Review Comment:
**Fragile / maintainability:** `extractSessionCatalog()` uses Java
reflection to walk the class hierarchy and access the private `sessionCatalog`
field. If Iceberg renames, removes, or changes the type of this field in a
future release, the code will silently return `Optional.empty()` and later fail
at runtime with `"Iceberg REST user session requires a session-aware Iceberg
catalog"`.
Suggestions:
1. **At minimum:** log a WARN when the field is not found so operators can
diagnose the issue.
2. Consider contributing a public accessor upstream to Apache Iceberg
(`BaseSessionCatalog.getSessionCatalog()`) so this reflection hack can be
removed.
3. If Iceberg already provides a stable way to construct or access a session
catalog (e.g. via `CatalogUtil`), use that instead.
##########
fe/fe-core/src/main/java/org/apache/doris/mysql/authenticate/AuthenticatorManager.java:
##########
@@ -240,7 +244,38 @@ private Optional<AuthenticateRequest>
resolveAuthenticateRequest(Authenticator a
private AuthenticateResponse authenticateWith(Authenticator authenticator,
AuthenticateRequest request) throws IOException {
- return authenticator.authenticate(request);
+ AuthenticateResponse response = authenticator.authenticate(request);
+ attachDelegatedCredential(response, request);
+ return response;
+ }
+
+ private void attachDelegatedCredential(AuthenticateResponse response,
AuthenticateRequest request) {
+ if (!response.isSuccess() || request.getCredential() == null ||
response.getDelegatedCredential() != null) {
+ return;
+ }
+ DelegatedCredential.Type type =
delegatedCredentialType(request.getCredentialType());
+ if (type == null) {
+ return;
+ }
+ OptionalLong expiresAtMillis = response.getCredentialExpiresAtMillis();
+ response.setDelegatedCredential(new DelegatedCredential(type,
+ new String(request.getCredential(), StandardCharsets.UTF_8),
expiresAtMillis));
Review Comment:
**Security consideration:** `new String(request.getCredential(),
StandardCharsets.UTF_8)` converts the sensitive credential token into a Java
`String`, which lives in the heap and cannot be reliably zeroed out (Strings
are immutable and may be duplicated by GC).
Since the token flows into `DelegatedCredential` which also stores it as a
`String`, this is consistent with the PR's approach. For defense-in-depth,
consider whether `DelegatedCredential` could hold the token as a `byte[]` or
`char[]` that can be cleared after use. This is a broader design consideration,
but worth noting since this is where the credential first enters FE memory.
##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java:
##########
@@ -1169,6 +1219,70 @@ public List<String> listViewNames(String db) {
}
}
+ private Catalog catalog(SessionContext ctx) {
+ if (useSessionCatalog(ctx)) {
+ return sessionCatalogAdapter.delegatedCatalog(ctx);
+ }
+ return catalog;
+ }
+
+ private SupportsNamespaces namespaces(SessionContext ctx) {
+ if (useSessionCatalog(ctx)) {
+ return sessionCatalogAdapter.delegatedNamespaces(ctx);
+ }
+ return nsCatalog;
+ }
+
+ private Optional<ViewCatalog> viewCatalog(SessionContext ctx) {
+ if (!isViewCatalogEnabled()) {
+ return Optional.empty();
+ }
+ if (useSessionCatalog(ctx)) {
+ return sessionCatalogAdapter.delegatedViewCatalog(ctx);
+ }
+ return Optional.of((ViewCatalog) catalog);
+ }
+
+ /**
+ * Iceberg REST user-session auth needs both Doris user credential
material and an Iceberg SessionCatalog.
+ * The delegated credential provides the user token; the SessionCatalog is
the Iceberg API that attaches it
+ * to metadata requests.
+ */
+ private boolean useSessionCatalog(SessionContext ctx) {
+ return ctx != null && ctx.hasDelegatedCredential() &&
isIcebergRestUserSessionEnabled();
+ }
+
+ private static SessionContext currentSessionContext() {
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ return SessionContext.empty();
+ }
+ SessionContext sessionContext = context.getSessionContext();
+ return sessionContext == null ? SessionContext.empty() :
sessionContext;
+ }
+
+ private boolean isIcebergRestUserSessionEnabled() {
+ if (!(dorisCatalog instanceof IcebergRestExternalCatalog)) {
+ return false;
+ }
+ MetastoreProperties metaProps =
+ ((IcebergRestExternalCatalog)
dorisCatalog).getCatalogProperty().getMetastoreProperties();
+ return metaProps instanceof IcebergRestProperties
+ && ((IcebergRestProperties)
metaProps).isIcebergRestUserSessionEnabled();
+ }
+
+ private static IcebergRestProperties.DelegatedTokenMode
delegatedTokenMode(ExternalCatalog dorisCatalog) {
Review Comment:
**Silent fallback:** `delegatedTokenMode()` defaults to `ACCESS_TOKEN` when
the catalog is not an `IcebergRestExternalCatalog` or its metastore properties
are not `IcebergRestProperties`. If someone configures
`iceberg.rest.oauth2.delegated-token-mode = token_exchange` on a non-REST
Iceberg catalog, the config is silently ignored. Consider emitting a WARN log
in this path, or adding a validation in `initNormalizeAndCheckProps()` to
reject `token_exchange` for non-REST catalog types.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]