This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new eea082b4540 [refactor](iceberg) Refactor iceberg system tables to use
native table execution path (#61646)
eea082b4540 is described below
commit eea082b4540ae8f31a5b9deee4f9fefdeefc050e
Author: Socrates <[email protected]>
AuthorDate: Tue Mar 31 23:32:46 2026 +0800
[refactor](iceberg) Refactor iceberg system tables to use native table
execution path (#61646)
### What problem does this PR solve?
Issue Number: N/A
Related PR: #60556
## Summary
- Refactor Iceberg system tables (`$snapshots`, `$history`,
`$manifests`, `$files`, `$entries`, `$metadata_log_entries`,
`$partitions`, `$refs`) to use the native sys-table path
(`NativeSysTable -> FileQueryScanNode -> IcebergScanNode`) instead of
the TVF path (`iceberg_meta()` -> `MetadataScanNode`)
- Introduce `IcebergSysExternalTable` as the native wrapper for Iceberg
metadata tables and route query / describe / auth / show-create flows
through the source Iceberg table
- Keep `position_deletes` explicitly unsupported with the existing
`SysTable position_deletes is not supported yet` behavior
- Add the minimum FE/BE protocol and scanner changes required to read
Iceberg system-table splits through `FORMAT_JNI`
- Remove the `iceberg_meta()` TVF exposure and migrate regression
coverage to direct `table$system_table` access
- Clean up the obsolete Iceberg metadata-scan path and tighten
Iceberg/Paimon system-table scanner behavior after the refactor
## Motivation
Previously, Iceberg system tables were queried through `iceberg_meta()`,
which:
- Forced system-table queries onto the `MetadataScanNode` path instead
of the regular file-query path
- Kept Iceberg system table planning and execution separate from normal
Iceberg scans
- Required dedicated TVF-only coverage and made auth / describe /
show-create handling more fragmented
- Left a second execution path to maintain even after native sys-table
support was added
This refactor moves Iceberg system tables onto the native path while
keeping the first implementation pragmatic: metadata rows are still
produced by the existing Java Iceberg scanner, but planning, privilege
checks, and scan-node routing now match the native execution model.
Follow-up cleanup in this PR also removes the now-dead TVF metadata path
and tightens scanner/schema handling to match the simplified execution
model.
## Architecture After Refactoring
### Native SysTable Path
```
User: SELECT * FROM table$snapshots
BindRelation / SysTableResolver
-> IcebergSysTable.createSysExternalTable(...)
-> IcebergSysExternalTable
-> LogicalFileScan
-> IcebergScanNode
-> FORMAT_JNI file ranges with serialized_split
-> IcebergSysTableJniReader / IcebergSysTableJniScanner
```
### Key Changes
- `IcebergSysTable` now extends `NativeSysTable` instead of
`TvfSysTable`
- `IcebergSysExternalTable` wraps the source `IcebergExternalTable`,
derives schema from the Iceberg metadata table, reuses the source table
descriptor for thrift, and now caches sys-table schema instead of
rebuilding it repeatedly
- `IcebergApiSource` / `IcebergScanNode` now recognize both normal
Iceberg external tables and Iceberg system tables
- `TIcebergFileDesc` adds `serialized_split`; FE sends one serialized
split per system-table range
- BE `file_scanner.cpp` now routes `FORMAT_JNI +
table_format_type=iceberg` to `IcebergSysTableJniReader`
- `IcebergSysTableJniReader` and `IcebergSysTableJniScanner` now only
serve the native file-scanner path; the old `serialized_splits` /
metadata-scan compatibility has been removed
- `IcebergSysTableJniScanner` now reads projected columns by
source-schema index instead of projection order, fixing incorrect reads
when projected column order differs from the underlying schema order
- `PhysicalPlanTranslator`, `UserAuthentication`, and `SHOW CREATE
TABLE` now resolve Iceberg system tables through the source table
correctly
### Cleanup After Refactoring
- Remove `IcebergTableValuedFunction`
- Remove Nereids `IcebergMeta`
- Remove builtin `iceberg_meta()` registration
- Remove the obsolete Iceberg branch from `MetaScanner`; Iceberg
metadata queries no longer go through `MetadataScanNode`
- Simplify the Iceberg JNI metadata reader/scanner to single-split
semantics, matching `FileQueryScanNode -> file_scanner` ownership of
split fanout
- Align related Paimon system-table cleanup by caching
`PaimonSysExternalTable` schema and adding explicit projected-column
validation in `PaimonJniScanner`
### Regression Coverage Updates
- Delete the obsolete `test_iceberg_meta` regression case
- Migrate Iceberg system-table regression checks to direct
`$system_table` access
- Update `test_iceberg_sys_table` to use runtime assertions instead of
unstable `.out` row matching, following the existing Paimon system-table
testing style
---
be/src/exec/scan/file_scanner.cpp | 6 +
be/src/exec/scan/meta_scanner.cpp | 11 +-
.../format/table/iceberg_sys_table_jni_reader.cpp | 36 +++-
be/src/format/table/iceberg_sys_table_jni_reader.h | 7 +-
docker/thirdparties/run-thirdparties-docker.sh | 5 +-
.../doris/iceberg/IcebergSysTableJniScanner.java | 60 +++---
.../org/apache/doris/paimon/PaimonJniScanner.java | 6 +-
.../doris/catalog/BuiltinTableValuedFunctions.java | 2 -
.../main/java/org/apache/doris/catalog/Env.java | 19 +-
.../datasource/iceberg/IcebergExternalTable.java | 13 ++
.../iceberg/IcebergSysExternalTable.java | 177 ++++++++++++++++++
.../iceberg/source/IcebergApiSource.java | 35 ++--
.../datasource/iceberg/source/IcebergScanNode.java | 54 +++++-
.../datasource/iceberg/source/IcebergSplit.java | 12 ++
.../datasource/paimon/PaimonSysExternalTable.java | 73 +++++---
.../doris/datasource/systable/IcebergSysTable.java | 53 +++---
.../apache/doris/datasource/systable/SysTable.java | 2 +-
.../doris/datasource/systable/TvfSysTable.java | 4 +-
.../glue/translator/PhysicalPlanTranslator.java | 3 +-
.../nereids/rules/analysis/UserAuthentication.java | 4 +
.../expressions/functions/table/IcebergMeta.java | 67 -------
.../visitor/TableValuedFunctionVisitor.java | 5 -
.../plans/commands/ShowCreateTableCommand.java | 45 ++++-
.../trees/plans/logical/LogicalFileScan.java | 3 +-
.../tablefunction/IcebergTableValuedFunction.java | 168 -----------------
.../doris/tablefunction/TableValuedFunctionIf.java | 2 -
.../systable/IcebergSysTableResolverTest.java | 99 ++++++++++
.../rules/analysis/UserAuthenticationTest.java | 61 +++++++
gensrc/thrift/PlanNodes.thrift | 1 +
.../external_table_p2/tvf/test_iceberg_meta.out | 22 ---
.../iceberg_branch_retention_and_snapshot.groovy | 18 +-
.../iceberg_tag_retention_and_consistency.groovy | 20 +-
.../iceberg/iceberg_branch_tag_operate.groovy | 2 +-
...st_iceberg_schema_change_with_timetravel.groovy | 2 +-
.../iceberg/test_iceberg_sys_table.groovy | 202 +++++++--------------
.../iceberg/test_iceberg_sys_table_auth.groovy | 64 -------
.../external_table_p0/polaris/test_polaris.groovy | 5 +-
.../iceberg_rest_on_hdfs.groovy | 5 +-
.../iceberg_on_hms_and_filesystem_and_dlf.groovy | 10 +-
.../iceberg_rest_s3_storage_vended_test.groovy | 5 +-
.../iceberg_rest_storage_test.groovy | 5 +-
.../external_table_p2/tvf/test_iceberg_meta.groovy | 90 ---------
42 files changed, 748 insertions(+), 735 deletions(-)
diff --git a/be/src/exec/scan/file_scanner.cpp
b/be/src/exec/scan/file_scanner.cpp
index ed74cba7d7e..64a0616674f 100644
--- a/be/src/exec/scan/file_scanner.cpp
+++ b/be/src/exec/scan/file_scanner.cpp
@@ -67,6 +67,7 @@
#include "format/table/hudi_jni_reader.h"
#include "format/table/hudi_reader.h"
#include "format/table/iceberg_reader.h"
+#include "format/table/iceberg_sys_table_jni_reader.h"
#include "format/table/jdbc_jni_reader.h"
#include "format/table/max_compute_jni_reader.h"
#include "format/table/paimon_cpp_reader.h"
@@ -1049,6 +1050,11 @@ Status FileScanner::_get_next_reader() {
_cur_reader = JdbcJniReader::create_unique(_file_slot_descs,
_state, _profile,
jdbc_params);
init_status =
((JdbcJniReader*)(_cur_reader.get()))->init_reader();
+ } else if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type ==
"iceberg") {
+ _cur_reader =
IcebergSysTableJniReader::create_unique(_file_slot_descs, _state,
+
_profile, range, _params);
+ init_status =
((IcebergSysTableJniReader*)(_cur_reader.get()))->init_reader();
}
// Set col_name_to_block_idx for JNI readers to avoid repeated map
creation
if (_cur_reader) {
diff --git a/be/src/exec/scan/meta_scanner.cpp
b/be/src/exec/scan/meta_scanner.cpp
index 7060cdac583..b51d7d3c08b 100644
--- a/be/src/exec/scan/meta_scanner.cpp
+++ b/be/src/exec/scan/meta_scanner.cpp
@@ -37,7 +37,6 @@
#include "core/column/column_vector.h"
#include "core/data_type/define_primitive_type.h"
#include "core/types.h"
-#include "format/table/iceberg_sys_table_jni_reader.h"
#include "format/table/parquet_metadata_reader.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
@@ -65,15 +64,7 @@ MetaScanner::MetaScanner(RuntimeState* state,
ScanLocalStateBase* local_state, T
Status MetaScanner::_open_impl(RuntimeState* state) {
VLOG_CRITICAL << "MetaScanner::open";
RETURN_IF_ERROR(Scanner::_open_impl(state));
- if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) {
- // TODO: refactor this code
- auto reader =
IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile,
-
_scan_range.meta_scan_range);
- RETURN_IF_ERROR(reader->init_reader());
- static_cast<IcebergSysTableJniReader*>(reader.get())
- ->set_col_name_to_block_idx(&_src_block_name_to_idx);
- _reader = std::move(reader);
- } else if (_scan_range.meta_scan_range.metadata_type ==
TMetadataType::PARQUET) {
+ if (_scan_range.meta_scan_range.metadata_type == TMetadataType::PARQUET) {
auto reader =
ParquetMetadataReader::create_unique(_tuple_desc->slots(), state, _profile,
_scan_range.meta_scan_range);
RETURN_IF_ERROR(reader->init_reader());
diff --git a/be/src/format/table/iceberg_sys_table_jni_reader.cpp
b/be/src/format/table/iceberg_sys_table_jni_reader.cpp
index 35bf92db3c9..499069a0a93 100644
--- a/be/src/format/table/iceberg_sys_table_jni_reader.cpp
+++ b/be/src/format/table/iceberg_sys_table_jni_reader.cpp
@@ -17,6 +17,7 @@
#include "iceberg_sys_table_jni_reader.h"
+#include "common/logging.h"
#include "format/jni/jni_data_bridge.h"
#include "runtime/runtime_state.h"
#include "util/string_util.h"
@@ -26,9 +27,27 @@ namespace doris {
static const std::string HADOOP_OPTION_PREFIX = "hadoop.";
+Status IcebergSysTableJniReader::validate_scan_range(const TFileRangeDesc&
range) {
+ if (!range.__isset.table_format_params) {
+ return Status::InternalError(
+ "missing table_format_params for iceberg sys table jni
reader");
+ }
+ if (!range.table_format_params.__isset.iceberg_params) {
+ return Status::InternalError("missing iceberg_params for iceberg sys
table jni reader");
+ }
+ if (!range.table_format_params.iceberg_params.__isset.serialized_split ||
+ range.table_format_params.iceberg_params.serialized_split.empty()) {
+ return Status::InternalError(
+ "missing serialized_split for iceberg sys table jni reader, "
+ "possibly caused by FE/BE protocol mismatch");
+ }
+ return Status::OK();
+}
+
IcebergSysTableJniReader::IcebergSysTableJniReader(
const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState*
state,
- RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
+ RuntimeProfile* profile, const TFileRangeDesc& range,
+ const TFileScanRangeParams* range_params)
: JniReader(
file_slot_descs, state, profile,
"org/apache/doris/iceberg/IcebergSysTableJniScanner",
@@ -41,12 +60,16 @@ IcebergSysTableJniReader::IcebergSysTableJniReader(
JniDataBridge::get_jni_type_with_different_string(desc->type()));
}
std::map<std::string, std::string> params;
- params["serialized_splits"] =
join(meta_scan_range.serialized_splits, ",");
+ params["serialized_split"] =
+
range.table_format_params.iceberg_params.serialized_split;
params["required_fields"] = join(required_fields, ",");
params["required_types"] = join(required_types, "#");
params["time_zone"] = state->timezone();
- for (const auto& kv : meta_scan_range.hadoop_props) {
- params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
+ if (range_params != nullptr &&
range_params->__isset.properties &&
+ !range_params->properties.empty()) {
+ for (const auto& kv : range_params->properties) {
+ params[HADOOP_OPTION_PREFIX + kv.first] =
kv.second;
+ }
}
return params;
}(),
@@ -56,9 +79,12 @@ IcebergSysTableJniReader::IcebergSysTableJniReader(
names.emplace_back(desc->col_name());
}
return names;
- }()) {}
+ }(),
+ range.__isset.self_split_weight ? range.self_split_weight :
-1),
+ _init_status(validate_scan_range(range)) {}
Status IcebergSysTableJniReader::init_reader() {
+ RETURN_IF_ERROR(_init_status);
return open(_state, _profile);
}
diff --git a/be/src/format/table/iceberg_sys_table_jni_reader.h
b/be/src/format/table/iceberg_sys_table_jni_reader.h
index 1d52bdc1cf6..f5bc69f6776 100644
--- a/be/src/format/table/iceberg_sys_table_jni_reader.h
+++ b/be/src/format/table/iceberg_sys_table_jni_reader.h
@@ -45,11 +45,16 @@ class IcebergSysTableJniReader : public JniReader {
public:
IcebergSysTableJniReader(const std::vector<SlotDescriptor*>&
file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
- const TMetaScanRange& meta_scan_range);
+ const TFileRangeDesc& range, const
TFileScanRangeParams* range_params);
~IcebergSysTableJniReader() override = default;
+ static Status validate_scan_range(const TFileRangeDesc& range);
+
Status init_reader();
+
+private:
+ Status _init_status;
};
#include "common/compile_check_end.h"
diff --git a/docker/thirdparties/run-thirdparties-docker.sh
b/docker/thirdparties/run-thirdparties-docker.sh
index 12e2e9b7ba4..15757f57fa4 100755
--- a/docker/thirdparties/run-thirdparties-docker.sh
+++ b/docker/thirdparties/run-thirdparties-docker.sh
@@ -240,7 +240,6 @@ JUICEFS_LOCAL_BIN="${JUICEFS_RUNTIME_ROOT}/bin/juicefs"
find_juicefs_hadoop_jar() {
local -a jar_globs=(
"${JUICEFS_RUNTIME_ROOT}/lib/juicefs-hadoop-[0-9]*.jar"
- "${ROOT}/docker-compose/hive/scripts/auxlib/juicefs-hadoop-[0-9]*.jar"
"${DORIS_ROOT}/thirdparty/installed/juicefs_libs/juicefs-hadoop-[0-9]*.jar"
"${DORIS_ROOT}/output/fe/lib/juicefs/juicefs-hadoop-[0-9]*.jar"
"${DORIS_ROOT}/output/be/lib/java_extensions/juicefs/juicefs-hadoop-[0-9]*.jar"
@@ -359,6 +358,10 @@ ensure_juicefs_hadoop_jar_for_hive() {
fi
mkdir -p "${auxlib_dir}"
+ if [[ "${source_jar}" == "${auxlib_dir}/$(basename "${source_jar}")" ]];
then
+ echo "JuiceFS Hadoop jar already exists in hive auxlib: $(basename
"${source_jar}")"
+ return 0
+ fi
cp -f "${source_jar}" "${auxlib_dir}/"
echo "Synced JuiceFS Hadoop jar to hive auxlib: $(basename
"${source_jar}")"
}
diff --git
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
index d1b648a9801..b014a42706f 100644
---
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
+++
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
@@ -36,8 +36,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
@@ -51,21 +49,19 @@ public class IcebergSysTableJniScanner extends JniScanner {
private static final String HADOOP_OPTION_PREFIX = "hadoop.";
private final ClassLoader classLoader;
private final PreExecutionAuthenticator preExecutionAuthenticator;
- private final Iterator<FileScanTask> scanTasks;
- private final List<NestedField> fields;
+ private final FileScanTask scanTask;
+ private final List<SelectedField> fields;
private final String timezone;
private CloseableIterator<StructLike> reader;
public IcebergSysTableJniScanner(int batchSize, Map<String, String>
params) {
this.classLoader = this.getClass().getClassLoader();
- List<FileScanTask> scanTasks =
Arrays.stream(params.get("serialized_splits").split(","))
- .map(SerializationUtil::deserializeFromBase64)
- .map(obj -> (FileScanTask) obj)
- .collect(Collectors.toList());
- Preconditions.checkState(!scanTasks.isEmpty(), "scanTasks shoudle not
be empty");
- this.scanTasks = scanTasks.iterator();
+ String serializedSplitParams = params.get("serialized_split");
+ Preconditions.checkArgument(serializedSplitParams != null &&
!serializedSplitParams.isEmpty(),
+ "serialized_split should not be empty");
+ this.scanTask =
SerializationUtil.deserializeFromBase64(serializedSplitParams);
String[] requiredFields = params.get("required_fields").split(",");
- this.fields = selectSchema(scanTasks.get(0).schema().asStruct(),
requiredFields);
+ this.fields = selectSchema(scanTask.schema().asStruct(),
requiredFields);
this.timezone = params.getOrDefault("time_zone",
TimeZone.getDefault().getID());
Map<String, String> hadoopOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
@@ -79,13 +75,11 @@ public class IcebergSysTableJniScanner extends JniScanner {
@Override
public void open() throws IOException {
try (ThreadClassLoaderContext ignored = new
ThreadClassLoaderContext(classLoader)) {
- nextScanTask();
+ openReader();
}
}
- private void nextScanTask() throws IOException {
- Preconditions.checkArgument(scanTasks.hasNext());
- FileScanTask scanTask = scanTasks.next();
+ private void openReader() throws IOException {
try {
try (ThreadClassLoaderContext ignored = new
ThreadClassLoaderContext(classLoader)) {
preExecutionAuthenticator.execute(() -> {
@@ -96,7 +90,7 @@ public class IcebergSysTableJniScanner extends JniScanner {
}
} catch (Exception e) {
this.close();
- String msg = String.format("Failed to open next scan task: %s",
scanTask);
+ String msg = String.format("Failed to open scan task: %s",
scanTask);
LOG.error(msg, e);
throw new IOException(msg, e);
}
@@ -107,26 +101,20 @@ public class IcebergSysTableJniScanner extends JniScanner
{
try (ThreadClassLoaderContext ignored = new
ThreadClassLoaderContext(classLoader)) {
int rows = 0;
long startAppendDataTime = System.nanoTime();
- long scanTime = 0;
while (rows < getBatchSize()) {
- while (!reader.hasNext() && scanTasks.hasNext()) {
- long startScanTaskTime = System.nanoTime();
- nextScanTask();
- scanTime = System.nanoTime() - startScanTaskTime;
- }
if (!reader.hasNext()) {
break;
}
StructLike row = reader.next();
for (int i = 0; i < fields.size(); i++) {
- NestedField field = fields.get(i);
- Object value = row.get(i,
field.type().typeId().javaClass());
+ SelectedField field = fields.get(i);
+ Object value = row.get(field.sourceIndex,
field.field.type().typeId().javaClass());
ColumnValue columnValue = new
IcebergSysTableColumnValue(value, timezone);
appendData(i, columnValue);
}
rows++;
}
- appendDataTime += System.nanoTime() - startAppendDataTime -
scanTime;
+ appendDataTime += System.nanoTime() - startAppendDataTime;
return rows;
}
}
@@ -141,18 +129,34 @@ public class IcebergSysTableJniScanner extends JniScanner
{
}
}
- private static List<NestedField> selectSchema(StructType schema, String[]
requiredFields) {
- List<NestedField> selectedFields = new ArrayList<>();
+ private static List<SelectedField> selectSchema(StructType schema,
String[] requiredFields) {
+ List<NestedField> schemaFields = schema.fields();
+ List<SelectedField> selectedFields = new ArrayList<>();
for (String requiredField : requiredFields) {
NestedField field = schema.field(requiredField);
if (field == null) {
throw new IllegalArgumentException("RequiredField " +
requiredField + " not found in schema");
}
- selectedFields.add(field);
+ int sourceIndex = schemaFields.indexOf(field);
+ if (sourceIndex < 0) {
+ throw new IllegalArgumentException(
+ "RequiredField " + requiredField + " not found in
source schema fields");
+ }
+ selectedFields.add(new SelectedField(sourceIndex, field));
}
return selectedFields;
}
+ private static final class SelectedField {
+ private final int sourceIndex;
+ private final NestedField field;
+
+ private SelectedField(int sourceIndex, NestedField field) {
+ this.sourceIndex = sourceIndex;
+ this.field = field;
+ }
+ }
+
private static ColumnType[] parseRequiredTypes(String[] typeStrings,
String[] requiredFields) {
ColumnType[] requiredTypes = new ColumnType[typeStrings.length];
for (int i = 0; i < typeStrings.length; i++) {
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 8f64a51dc9b..081b5a554c6 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -123,7 +123,11 @@ public class PaimonJniScanner extends JniScanner {
}
private int[] getProjected() {
- return
Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray();
+ return Arrays.stream(fields).mapToInt(fieldName -> {
+ int index = paimonAllFieldNames.indexOf(fieldName);
+ Preconditions.checkArgument(index >= 0, "RequiredField %s not
found in schema", fieldName);
+ return index;
+ }).toArray();
}
private List<Predicate> getPredicates() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index a55ebedce9f..ca3a5b688c9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -28,7 +28,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.Http;
import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
import org.apache.doris.nereids.trees.expressions.functions.table.HudiMeta;
-import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
@@ -60,7 +59,6 @@ public class BuiltinTableValuedFunctions implements
FunctionHelper {
tableValued(GroupCommit.class, "group_commit"),
tableValued(Local.class, "local"),
tableValued(HudiMeta.class, "hudi_meta"),
- tableValued(IcebergMeta.class, "iceberg_meta"),
tableValued(Hdfs.class, "hdfs"),
tableValued(HttpStream.class, "http_stream"),
tableValued(Numbers.class, "numbers"),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index b62258262e3..cf5849e247a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -101,6 +101,7 @@ import org.apache.doris.datasource.es.EsExternalCatalog;
import org.apache.doris.datasource.hive.HiveTransactionMgr;
import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.paimon.PaimonSysExternalTable;
@@ -4344,7 +4345,14 @@ public class Env {
sb.append("\n-- Internal JDBC tables are deprecated. Please use
JDBC Catalog instead.");
} else if (table.getType() == TableType.ICEBERG_EXTERNAL_TABLE) {
addTableComment(table, sb);
- IcebergExternalTable icebergExternalTable = (IcebergExternalTable)
table;
+ IcebergExternalTable icebergExternalTable;
+ if (table instanceof IcebergExternalTable) {
+ icebergExternalTable = (IcebergExternalTable) table;
+ } else if (table instanceof IcebergSysExternalTable) {
+ icebergExternalTable = ((IcebergSysExternalTable)
table).getSourceTable();
+ } else {
+ throw new RuntimeException("Unexpected Iceberg table type: " +
table.getClass().getSimpleName());
+ }
if (icebergExternalTable.hasSortOrder()) {
sb.append("\n").append(icebergExternalTable.getSortOrderSql());
}
@@ -4731,7 +4739,14 @@ public class Env {
sb.append("\n-- Internal JDBC tables are deprecated. Please use
JDBC Catalog instead.");
} else if (table.getType() == TableType.ICEBERG_EXTERNAL_TABLE) {
addTableComment(table, sb);
- IcebergExternalTable icebergExternalTable = (IcebergExternalTable)
table;
+ IcebergExternalTable icebergExternalTable;
+ if (table instanceof IcebergExternalTable) {
+ icebergExternalTable = (IcebergExternalTable) table;
+ } else if (table instanceof IcebergSysExternalTable) {
+ icebergExternalTable = ((IcebergSysExternalTable)
table).getSourceTable();
+ } else {
+ throw new RuntimeException("Unexpected Iceberg table type: " +
table.getClass().getSimpleName());
+ }
if (icebergExternalTable.hasSortOrder()) {
sb.append("\n").append(icebergExternalTable.getSortOrderSql());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index ffdec64d3d9..6a3faf49949 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -318,6 +318,19 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
return IcebergSysTable.SUPPORTED_SYS_TABLES;
}
+ @Override
+ public Optional<SysTable> findSysTable(String tableNameWithSysTableName) {
+ Optional<SysTable> sysTable =
MTMVRelatedTableIf.super.findSysTable(tableNameWithSysTableName);
+ if (sysTable.isPresent()) {
+ return sysTable;
+ }
+ String sysTableName =
SysTable.getTableNameWithSysTableName(tableNameWithSysTableName).second;
+ if (IcebergSysTable.POSITION_DELETES.equals(sysTableName)) {
+ return
Optional.of(IcebergSysTable.UNSUPPORTED_POSITION_DELETES_TABLE);
+ }
+ return Optional.empty();
+ }
+
@Override
public boolean isView() {
makeSureInitialized();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSysExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSysExternalTable.java
new file mode 100644
index 00000000000..aeb539d1f3f
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSysExternalTable.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.NameMapping;
+import org.apache.doris.datasource.SchemaCacheKey;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.systable.SysTable;
+import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.ExternalAnalysisTask;
+import org.apache.doris.thrift.THiveTable;
+import org.apache.doris.thrift.TIcebergTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class IcebergSysExternalTable extends ExternalTable {
+ private final IcebergExternalTable sourceTable;
+ private final String sysTableType;
+ private volatile Table sysIcebergTable;
+ private volatile List<Column> fullSchema;
+ private volatile SchemaCacheValue schemaCacheValue;
+
+ public IcebergSysExternalTable(IcebergExternalTable sourceTable, String
sysTableType) {
+ super(generateSysTableId(sourceTable.getId(), sysTableType),
+ sourceTable.getName() + "$" + sysTableType,
+ sourceTable.getRemoteName() + "$" + sysTableType,
+ (IcebergExternalCatalog) sourceTable.getCatalog(),
+ (IcebergExternalDatabase) sourceTable.getDatabase(),
+ TableIf.TableType.ICEBERG_EXTERNAL_TABLE);
+ this.sourceTable = sourceTable;
+ this.sysTableType = sysTableType;
+ }
+
+ @Override
+ public String getMetaCacheEngine() {
+ return sourceTable.getMetaCacheEngine();
+ }
+
+ @Override
+ protected synchronized void makeSureInitialized() {
+ super.makeSureInitialized();
+ if (!objectCreated) {
+ objectCreated = true;
+ }
+ }
+
+ public IcebergExternalTable getSourceTable() {
+ return sourceTable;
+ }
+
+ public String getSysTableType() {
+ return sysTableType;
+ }
+
+ public Table getSysIcebergTable() {
+ if (sysIcebergTable == null) {
+ synchronized (this) {
+ if (sysIcebergTable == null) {
+ Table baseTable = sourceTable.getIcebergTable();
+ MetadataTableType tableType =
MetadataTableType.from(sysTableType);
+ if (tableType == null) {
+ throw new IllegalArgumentException("Unknown iceberg
system table type: " + sysTableType);
+ }
+ sysIcebergTable =
MetadataTableUtils.createMetadataTableInstance(baseTable, tableType);
+ }
+ }
+ }
+ return sysIcebergTable;
+ }
+
+ @Override
+ public List<Column> getFullSchema() {
+ return getOrCreateSchemaCacheValue().getSchema();
+ }
+
+ @Override
+ public NameMapping getOrBuildNameMapping() {
+ return sourceTable.getOrBuildNameMapping();
+ }
+
+ @Override
+ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
+ makeSureInitialized();
+ return new ExternalAnalysisTask(info);
+ }
+
+ @Override
+ public TTableDescriptor toThrift() {
+ List<Column> schema = getFullSchema();
+ if (sourceTable.getIcebergCatalogType().equals("hms")) {
+ THiveTable tHiveTable = new THiveTable(getDbName(), getName(), new
HashMap<>());
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.HIVE_TABLE, schema.size(), 0,
+ getName(), getDbName());
+ tTableDescriptor.setHiveTable(tHiveTable);
+ return tTableDescriptor;
+ } else {
+ TIcebergTable icebergTable = new TIcebergTable(getDbName(),
getName(), new HashMap<>());
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.ICEBERG_TABLE,
+ schema.size(), 0, getName(), getDbName());
+ tTableDescriptor.setIcebergTable(icebergTable);
+ return tTableDescriptor;
+ }
+ }
+
+ @Override
+ public long fetchRowCount() {
+ return UNKNOWN_ROW_COUNT;
+ }
+
+ @Override
+ public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
+ return Optional.of(getOrCreateSchemaCacheValue());
+ }
+
+ @Override
+ public Optional<SchemaCacheValue> getSchemaCacheValue() {
+ return Optional.of(getOrCreateSchemaCacheValue());
+ }
+
+ @Override
+ public Map<String, SysTable> getSupportedSysTables() {
+ return sourceTable.getSupportedSysTables();
+ }
+
+ @Override
+ public String getComment() {
+ return "Iceberg system table: " + sysTableType + " for " +
sourceTable.getName();
+ }
+
+ private static long generateSysTableId(long sourceTableId, String
sysTableType) {
+ return sourceTableId ^ (sysTableType.hashCode() * 31L);
+ }
+
+ private SchemaCacheValue getOrCreateSchemaCacheValue() {
+ if (schemaCacheValue == null) {
+ synchronized (this) {
+ if (schemaCacheValue == null) {
+ if (fullSchema == null) {
+ fullSchema =
IcebergUtils.parseSchema(getSysIcebergTable().schema(),
+ getCatalog().getEnableMappingVarbinary(),
+ getCatalog().getEnableMappingTimestampTz());
+ }
+ schemaCacheValue = new SchemaCacheValue(fullSchema);
+ }
+ }
+ }
+ return schemaCacheValue;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
index c233f732106..c1626e915c6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
@@ -21,7 +21,9 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.ColumnRange;
@@ -34,26 +36,25 @@ import java.util.Map;
*/
public class IcebergApiSource implements IcebergSource {
- private final IcebergExternalTable icebergExtTable;
+ private final ExternalTable targetTable;
private final Table originTable;
-
private final TupleDescriptor desc;
- public IcebergApiSource(IcebergExternalTable table, TupleDescriptor desc,
+ public IcebergApiSource(ExternalTable table, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
- // Theoretically, the IcebergScanNode is responsible for scanning data
from physical tables.
- // Views should not reach this point.
- // By adding this validation, we aim to ensure that if a view query
does end up here, it indicates a bug.
- // This helps us identify issues promptly.
-
- // when use legacy planner, query an iceberg view will enter this
- // we should set enable_fallback_to_original_planner=false
- // so that it will throw exception by first planner
- if (table.isView()) {
- throw new UnsupportedOperationException("IcebergApiSource does not
support view");
+ if (table instanceof IcebergExternalTable) {
+ IcebergExternalTable icebergExtTable = (IcebergExternalTable)
table;
+ if (icebergExtTable.isView()) {
+ throw new UnsupportedOperationException("IcebergApiSource does
not support view");
+ }
+ this.originTable = IcebergUtils.getIcebergTable(icebergExtTable);
+ } else if (table instanceof IcebergSysExternalTable) {
+ this.originTable = ((IcebergSysExternalTable)
table).getSysIcebergTable();
+ } else {
+ throw new IllegalArgumentException(
+ "Expected Iceberg table but got " +
table.getClass().getSimpleName());
}
- this.icebergExtTable = table;
- this.originTable = IcebergUtils.getIcebergTable(icebergExtTable);
+ this.targetTable = table;
this.desc = desc;
}
@@ -74,12 +75,12 @@ public class IcebergApiSource implements IcebergSource {
@Override
public TableIf getTargetTable() {
- return icebergExtTable;
+ return targetTable;
}
@Override
public ExternalCatalog getCatalog() {
- return icebergExtTable.getCatalog();
+ return targetTable.getCatalog();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 15883744cf3..121dc0ad152 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -41,6 +41,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalMetaCache;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.iceberg.cache.IcebergManifestCacheLoader;
import org.apache.doris.datasource.iceberg.cache.ManifestCacheValue;
@@ -96,6 +97,7 @@ import org.apache.iceberg.mapping.MappedFields;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.util.ScanTaskUtil;
+import org.apache.iceberg.util.SerializationUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -158,6 +160,7 @@ public class IcebergScanNode extends FileQueryScanNode {
private String cachedFsIdentifier;
private Boolean isBatchMode = null;
+ private boolean isSystemTable = false;
// ReferencedDataFile path -> List<DeleteFile> /
List<TIcebergDeleteFileDesc> (exclude equal delete)
public Map<String, List<DeleteFile>> deleteFilesByReferencedDataFile = new
HashMap<>();
@@ -182,8 +185,13 @@ public class IcebergScanNode extends FileQueryScanNode {
ExternalTable table = (ExternalTable) desc.getTable();
if (table instanceof HMSExternalTable) {
source = new IcebergHMSSource((HMSExternalTable) table, desc);
- } else if (table instanceof IcebergExternalTable) {
- String catalogType = ((IcebergExternalTable)
table).getIcebergCatalogType();
+ } else if (table instanceof IcebergExternalTable || table instanceof
IcebergSysExternalTable) {
+ if (table instanceof IcebergSysExternalTable) {
+ isSystemTable = true;
+ }
+ String catalogType = table instanceof IcebergExternalTable
+ ? ((IcebergExternalTable) table).getIcebergCatalogType()
+ : ((IcebergSysExternalTable)
table).getSourceTable().getIcebergCatalogType();
switch (catalogType) {
case IcebergExternalCatalog.ICEBERG_HMS:
case IcebergExternalCatalog.ICEBERG_REST:
@@ -192,7 +200,7 @@ public class IcebergScanNode extends FileQueryScanNode {
case IcebergExternalCatalog.ICEBERG_HADOOP:
case IcebergExternalCatalog.ICEBERG_JDBC:
case IcebergExternalCatalog.ICEBERG_S3_TABLES:
- source = new IcebergApiSource((IcebergExternalTable)
table, desc, columnNameToRange);
+ source = new IcebergApiSource(table, desc,
columnNameToRange);
break;
default:
Preconditions.checkState(false, "Unknown iceberg catalog
type: " + catalogType);
@@ -269,13 +277,21 @@ public class IcebergScanNode extends FileQueryScanNode {
private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit
icebergSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
+ TIcebergFileDesc fileDesc = new TIcebergFileDesc();
+ if (isSystemTable) {
+ rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
+ tableFormatFileDesc.setTableLevelRowCount(-1);
+ fileDesc.setSerializedSplit(icebergSplit.getSerializedSplit());
+ tableFormatFileDesc.setIcebergParams(fileDesc);
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
+ return;
+ }
if (tableLevelPushDownCount) {
tableFormatFileDesc.setTableLevelRowCount(icebergSplit.getTableLevelRowCount());
} else {
// MUST explicitly set to -1, to be distinct from valid row count
>= 0
tableFormatFileDesc.setTableLevelRowCount(-1);
}
- TIcebergFileDesc fileDesc = new TIcebergFileDesc();
fileDesc.setFormatVersion(formatVersion);
fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
if (icebergSplit.getPartitionSpecId() != null) {
@@ -853,7 +869,18 @@ public class IcebergScanNode extends FileQueryScanNode {
return split;
}
+ private Split createIcebergSysSplit(FileScanTask fileScanTask) {
+ long rowCount = fileScanTask.file() == null ? 1 :
fileScanTask.file().recordCount();
+ IcebergSplit split = IcebergSplit.newSysTableSplit(
+ SerializationUtil.serializeToBase64(fileScanTask), rowCount);
+ split.setTableFormatType(TableFormatType.ICEBERG);
+ return split;
+ }
+
private List<Split> doGetSplits(int numBackends) throws UserException {
+ if (isSystemTable) {
+ return doGetSystemTableSplits();
+ }
List<Split> splits = new ArrayList<>();
@@ -897,8 +924,24 @@ public class IcebergScanNode extends FileQueryScanNode {
return splits;
}
+ private List<Split> doGetSystemTableSplits() throws UserException {
+ List<Split> splits = new ArrayList<>();
+ TableScan scan = createTableScan();
+ try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles())
{
+ fileScanTasks.forEach(task ->
splits.add(createIcebergSysSplit(task)));
+ } catch (IOException e) {
+ throw new UserException(e.getMessage(), e);
+ }
+ selectedPartitionNum = 0;
+ return splits;
+ }
+
@Override
public boolean isBatchMode() {
+ if (isSystemTable) {
+ isBatchMode = false;
+ return false;
+ }
Boolean cached = isBatchMode;
if (cached != null) {
return cached;
@@ -992,6 +1035,9 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
public TFileFormatType getFileFormatType() throws UserException {
+ if (isSystemTable) {
+ return TFileFormatType.FORMAT_JNI;
+ }
TFileFormatType type;
String icebergFormat = source.getFileFormat();
if (icebergFormat.equalsIgnoreCase("parquet")) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 2596df5ad37..3af484abd6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -25,11 +25,13 @@ import lombok.Data;
import org.apache.iceberg.DeleteFile;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@Data
public class IcebergSplit extends FileSplit {
+ private static final LocationPath DUMMY_PATH =
LocationPath.of("/dummyPath");
// Doris will convert the schema in FileSystem to achieve the function of
natively reading files.
// For example, s3a:// will be converted to s3://.
@@ -49,6 +51,7 @@ public class IcebergSplit extends FileSplit {
private String partitionDataJson = null;
private Long firstRowId = null;
private Long lastUpdatedSequenceNumber = null;
+ private String serializedSplit;
// File path will be changed if the file is modified, so there's no need
to get modification time.
public IcebergSplit(LocationPath file, long start, long length, long
fileLength, String[] hosts,
@@ -66,4 +69,13 @@ public class IcebergSplit extends FileSplit {
this.deleteFileFilters = deleteFileFilters;
this.selfSplitWeight +=
deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum();
}
+
+ public static IcebergSplit newSysTableSplit(String serializedSplit, long
rowCount) {
+ IcebergSplit split = new IcebergSplit(DUMMY_PATH, 0, 0, 0, null, null,
+ Collections.emptyMap(),
+ Collections.emptyList(),
DUMMY_PATH.toStorageLocation().toString());
+ split.setSerializedSplit(serializedSplit);
+ split.setSelfSplitWeight(Math.max(rowCount, 1L));
+ return split;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
index db972c6b2b6..b6999b7c50c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSysExternalTable.java
@@ -70,6 +70,8 @@ public class PaimonSysExternalTable extends ExternalTable {
private final String sysTableType;
private volatile Boolean isDataTable;
private volatile Table paimonSysTable;
+ private volatile List<Column> fullSchema;
+ private volatile SchemaCacheValue schemaCacheValue;
/**
* Creates a new Paimon system external table.
@@ -93,6 +95,7 @@ public class PaimonSysExternalTable extends ExternalTable {
return PaimonExternalMetaCache.ENGINE;
}
+ @Override
protected synchronized void makeSureInitialized() {
super.makeSureInitialized();
if (!objectCreated) {
@@ -136,30 +139,7 @@ public class PaimonSysExternalTable extends ExternalTable {
*/
@Override
public List<Column> getFullSchema() {
- Table sysTable = getSysPaimonTable();
- List<DataField> fields = sysTable.rowType().getFields();
- List<Column> columns = Lists.newArrayListWithCapacity(fields.size());
-
- for (DataField field : fields) {
- Column column = new Column(
- field.name().toLowerCase(),
- PaimonUtil.paimonTypeToDorisType(
- field.type(),
- getCatalog().getEnableMappingVarbinary(),
- getCatalog().getEnableMappingTimestampTz()),
- true,
- null,
- true,
- field.description(),
- true,
- field.id());
- PaimonUtil.updatePaimonColumnUniqueId(column, field);
- if (field.type().getTypeRoot() ==
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
- column.setWithTZExtraInfo();
- }
- columns.add(column);
- }
- return columns;
+ return getOrCreateSchemaCacheValue().getSchema();
}
public PaimonExternalTable getSourceTable() {
@@ -232,12 +212,12 @@ public class PaimonSysExternalTable extends ExternalTable
{
@Override
public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
- return Optional.of(new SchemaCacheValue(getFullSchema()));
+ return Optional.of(getOrCreateSchemaCacheValue());
}
@Override
public Optional<SchemaCacheValue> getSchemaCacheValue() {
- return Optional.of(new SchemaCacheValue(getFullSchema()));
+ return Optional.of(getOrCreateSchemaCacheValue());
}
@Override
@@ -253,4 +233,45 @@ public class PaimonSysExternalTable extends ExternalTable {
public String getComment() {
return "Paimon system table: " + sysTableType + " for " +
sourceTable.getName();
}
+
+ private SchemaCacheValue getOrCreateSchemaCacheValue() {
+ if (schemaCacheValue == null) {
+ synchronized (this) {
+ if (schemaCacheValue == null) {
+ if (fullSchema == null) {
+ fullSchema = buildFullSchema();
+ }
+ schemaCacheValue = new SchemaCacheValue(fullSchema);
+ }
+ }
+ }
+ return schemaCacheValue;
+ }
+
+ private List<Column> buildFullSchema() {
+ Table sysTable = getSysPaimonTable();
+ List<DataField> fields = sysTable.rowType().getFields();
+ List<Column> columns = Lists.newArrayListWithCapacity(fields.size());
+
+ for (DataField field : fields) {
+ Column column = new Column(
+ field.name().toLowerCase(),
+ PaimonUtil.paimonTypeToDorisType(
+ field.type(),
+ getCatalog().getEnableMappingVarbinary(),
+ getCatalog().getEnableMappingTimestampTz()),
+ true,
+ null,
+ true,
+ field.description(),
+ true,
+ field.id());
+ PaimonUtil.updatePaimonColumnUniqueId(column, field);
+ if (field.type().getTypeRoot() ==
DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+ column.setWithTZExtraInfo();
+ }
+ columns.add(column);
+ }
+ return columns;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
index ed803640cef..72a739e7b9a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
@@ -17,17 +17,16 @@
package org.apache.doris.datasource.systable;
-import org.apache.doris.info.TableValuedFunctionRefInfo;
-import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
-import
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
-import org.apache.doris.tablefunction.IcebergTableValuedFunction;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
+import org.apache.doris.nereids.exceptions.AnalysisException;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.iceberg.MetadataTableType;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -38,13 +37,10 @@ import java.util.stream.Collectors;
* <p>Iceberg system tables provide access to table metadata such as
* snapshots, history, manifests, files, partitions, etc.
*
- * <p>Iceberg system tables currently use the TVF path (MetadataScanNode).
- *
* @see org.apache.iceberg.MetadataTableType for all supported system table
types
*/
-public class IcebergSysTable extends TvfSysTable {
-
- private static final String TVF_NAME = "iceberg_meta";
+public class IcebergSysTable extends NativeSysTable {
+ public static final String POSITION_DELETES =
MetadataTableType.POSITION_DELETES.name().toLowerCase(Locale.ROOT);
/**
* All supported Iceberg system tables.
@@ -52,14 +48,19 @@ public class IcebergSysTable extends TvfSysTable {
*/
public static final Map<String, SysTable> SUPPORTED_SYS_TABLES =
Collections.unmodifiableMap(
Arrays.stream(MetadataTableType.values())
- .map(type -> new
IcebergSysTable(type.name().toLowerCase()))
+ .filter(type -> type != MetadataTableType.POSITION_DELETES)
+ .map(type -> new
IcebergSysTable(type.name().toLowerCase(Locale.ROOT), true))
.collect(Collectors.toMap(SysTable::getSysTableName,
Function.identity())));
+ public static final SysTable UNSUPPORTED_POSITION_DELETES_TABLE =
+ new IcebergSysTable(POSITION_DELETES, false);
private final String tableName;
+ private final boolean supported;
- private IcebergSysTable(String tableName) {
- super(tableName, TVF_NAME);
+ private IcebergSysTable(String tableName, boolean supported) {
+ super(tableName);
this.tableName = tableName;
+ this.supported = supported;
}
@Override
@@ -68,22 +69,14 @@ public class IcebergSysTable extends TvfSysTable {
}
@Override
- public TableValuedFunction createFunction(String ctlName, String dbName,
String sourceNameWithMetaName) {
- return IcebergMeta.createIcebergMeta(
- Lists.newArrayList(ctlName, dbName,
getSourceTableName(sourceNameWithMetaName)),
- getSysTableName());
- }
-
- @Override
- public TableValuedFunctionRefInfo createFunctionRef(String ctlName, String
dbName, String sourceNameWithMetaName) {
- String tableName = String.format("%s.%s.%s", ctlName, dbName,
getSourceTableName(sourceNameWithMetaName));
- try {
- java.util.Map<String, String> params = Maps.newHashMap();
- params.put(IcebergTableValuedFunction.TABLE, tableName);
- params.put(IcebergTableValuedFunction.QUERY_TYPE,
getSysTableName());
- return new TableValuedFunctionRefInfo(tvfName, null, params);
- } catch (org.apache.doris.common.AnalysisException e) {
- throw new RuntimeException("Failed to create iceberg_meta tvf
ref", e);
+ public ExternalTable createSysExternalTable(ExternalTable sourceTable) {
+ if (!supported) {
+ throw new AnalysisException("SysTable " + tableName + " is not
supported yet");
+ }
+ if (!(sourceTable instanceof IcebergExternalTable)) {
+ throw new IllegalArgumentException(
+ "Expected IcebergExternalTable but got " +
sourceTable.getClass().getSimpleName());
}
+ return new IcebergSysExternalTable((IcebergExternalTable) sourceTable,
getSysTableName());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SysTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SysTable.java
index 0dc6efb609c..2a70843546c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SysTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SysTable.java
@@ -35,7 +35,7 @@ import org.apache.doris.common.Pair;
* <p>Subclasses should extend one of the specialized abstract classes:
* <ul>
* <li>{@link NativeSysTable} - for tables using native execution path
(FileQueryScanNode)</li>
- * <li>{@link TvfSysTable} - for tables using TVF execution path
(MetadataScanNode)</li>
+ * <li>{@link TvfSysTable} - for tables still using TVF execution path
(MetadataScanNode)</li>
* </ul>
*
* @see NativeSysTable
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/TvfSysTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/TvfSysTable.java
index d79bd6ac232..f33b2d54617 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/TvfSysTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/TvfSysTable.java
@@ -27,7 +27,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFun
* This is the legacy execution path, still used for:
* <ul>
* <li>Hive partition tables (partition_values TVF)</li>
- * <li>Iceberg metadata tables (iceberg_meta TVF)</li>
* </ul>
*
* <p>Subclasses must implement:
@@ -37,7 +36,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFun
* </ul>
*
* @see PartitionsSysTable
- * @see IcebergSysTable
*/
public abstract class TvfSysTable extends SysTable {
@@ -59,7 +57,7 @@ public abstract class TvfSysTable extends SysTable {
/**
* Get the TVF name used for this system table.
*
- * @return the TVF name (e.g., "partition_values", "iceberg_meta")
+ * @return the TVF name (e.g., "partition_values")
*/
public String getTvfName() {
return tvfName;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index cb2ecdba15b..b1d2b41d31c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -54,6 +54,7 @@ import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.source.HudiScanNode;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergMergeOperation;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.datasource.jdbc.sink.JdbcTableSink;
@@ -743,7 +744,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
default:
throw new RuntimeException("do not support DLA type " +
((HMSExternalTable) table).getDlaType());
}
- } else if (table instanceof IcebergExternalTable) {
+ } else if (table instanceof IcebergExternalTable || table instanceof
IcebergSysExternalTable) {
scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
context.getScanContext());
} else if (table.getType() == TableIf.TableType.PAIMON_EXTERNAL_TABLE)
{
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/UserAuthentication.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/UserAuthentication.java
index ccba89fece9..1f67afe4fae 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/UserAuthentication.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/UserAuthentication.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
import org.apache.doris.datasource.paimon.PaimonSysExternalTable;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -56,6 +57,9 @@ public class UserAuthentication {
if (table instanceof PaimonSysExternalTable) {
authTable = ((PaimonSysExternalTable) table).getSourceTable();
authColumns = Collections.emptySet();
+ } else if (table instanceof IcebergSysExternalTable) {
+ authTable = ((IcebergSysExternalTable) table).getSourceTable();
+ authColumns = Collections.emptySet();
}
String tableName = authTable.getName();
DatabaseIf db = authTable.getDatabase();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
deleted file mode 100644
index 8be995beaef..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.trees.expressions.functions.table;
-
-import org.apache.doris.catalog.FunctionSignature;
-import org.apache.doris.nereids.exceptions.AnalysisException;
-import org.apache.doris.nereids.trees.expressions.Properties;
-import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
-import org.apache.doris.nereids.types.coercion.AnyDataType;
-import org.apache.doris.tablefunction.IcebergTableValuedFunction;
-import org.apache.doris.tablefunction.TableValuedFunctionIf;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-
-/** iceberg_meta */
-public class IcebergMeta extends TableValuedFunction {
- public IcebergMeta(Properties properties) {
- super("iceberg_meta", properties);
- }
-
- public static IcebergMeta createIcebergMeta(List<String> nameParts, String
queryType) {
- Map<String, String> prop = Maps.newHashMap();
- prop.put(IcebergTableValuedFunction.TABLE,
Joiner.on(".").join(nameParts));
- prop.put(IcebergTableValuedFunction.QUERY_TYPE, queryType);
- return new IcebergMeta(new Properties(prop));
- }
-
- @Override
- public FunctionSignature customSignature() {
- return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX,
getArgumentsTypes());
- }
-
- @Override
- protected TableValuedFunctionIf toCatalogFunction() {
- try {
- Map<String, String> arguments = getTVFProperties().getMap();
- return IcebergTableValuedFunction.create(arguments);
- } catch (Throwable t) {
- throw new AnalysisException("Can not build
IcebergTableValuedFunction by "
- + this + ": " + t.getMessage(), t);
- }
- }
-
- @Override
- public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
- return visitor.visitIcebergMeta(this, context);
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 31b3162e647..8ae391e1f0e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -28,7 +28,6 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.Http;
import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
import org.apache.doris.nereids.trees.expressions.functions.table.HudiMeta;
-import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
@@ -109,10 +108,6 @@ public interface TableValuedFunctionVisitor<R, C> {
return visitTableValuedFunction(hudiMeta, context);
}
- default R visitIcebergMeta(IcebergMeta icebergMeta, C context) {
- return visitTableValuedFunction(icebergMeta, context);
- }
-
default R visitLocal(Local local, C context) {
return visitTableValuedFunction(local, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowCreateTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowCreateTableCommand.java
index 86332f6a57f..c534d62d01d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowCreateTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowCreateTableCommand.java
@@ -29,11 +29,15 @@ import org.apache.doris.catalog.stream.BaseTableStream;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.systable.SysTable;
+import org.apache.doris.datasource.systable.SysTableResolver;
import org.apache.doris.info.TableNameInfo;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
@@ -43,10 +47,12 @@ import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
/**
* Represents the command for SHOW CREATE TABLE.
@@ -90,9 +96,10 @@ public class ShowCreateTableCommand extends ShowCommand {
private void validate(ConnectContext ctx) throws AnalysisException {
tblNameInfo.analyze(ctx);
- TableIf tableIf = Env.getCurrentEnv().getCatalogMgr()
+ DatabaseIf db = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrAnalysisException(tblNameInfo.getCtl())
-
.getDbOrAnalysisException(tblNameInfo.getDb()).getTableOrAnalysisException(tblNameInfo.getTbl());
+ .getDbOrAnalysisException(tblNameInfo.getDb());
+ TableIf tableIf = resolveShowCreateTarget(db);
if (tableIf instanceof MTMV) {
ErrorReport.reportAnalysisException("not support async
materialized view, "
@@ -106,12 +113,15 @@ public class ShowCreateTableCommand extends ShowCommand {
wanted = PrivPredicate.SHOW;
}
+ String authTableName = tableIf instanceof IcebergSysExternalTable
+ ? ((IcebergSysExternalTable)
tableIf).getSourceTable().getName()
+ : tableIf.getName();
if
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
- tblNameInfo.getCtl(), tblNameInfo.getDb(),
tblNameInfo.getTbl(), wanted)) {
+ tblNameInfo.getCtl(), tblNameInfo.getDb(), authTableName,
wanted)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"SHOW CREATE TABLE",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
- tblNameInfo.getDb() + ": " +
tblNameInfo.getTbl());
+ tblNameInfo.getDb() + ": " +
authTableName);
}
}
@@ -132,7 +142,10 @@ public class ShowCreateTableCommand extends ShowCommand {
// Fetch the catalog, database, and table metadata
DatabaseIf db =
ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblNameInfo.getCtl())
.getDbOrMetaException(tblNameInfo.getDb());
- TableIf table = db.getTableOrMetaException(tblNameInfo.getTbl());
+ TableIf table = resolveShowCreateTarget(db);
+ if (table instanceof IcebergSysExternalTable) {
+ table = ((IcebergSysExternalTable) table).getSourceTable();
+ }
List<List<String>> rows = Lists.newArrayList();
@@ -173,4 +186,26 @@ public class ShowCreateTableCommand extends ShowCommand {
}
}
+ private TableIf resolveShowCreateTarget(DatabaseIf db) throws
AnalysisException {
+ TableIf table = db.getTableNullable(tblNameInfo.getTbl());
+ if (table != null) {
+ return table;
+ }
+ Pair<String, String> tableNameWithSysTableName =
SysTable.getTableNameWithSysTableName(tblNameInfo.getTbl());
+ if (Strings.isNullOrEmpty(tableNameWithSysTableName.second)) {
+ return db.getTableOrAnalysisException(tblNameInfo.getTbl());
+ }
+ TableIf sourceTable =
db.getTableOrAnalysisException(tableNameWithSysTableName.first);
+ Optional<SysTableResolver.SysTableDescribe> sysTableDescribeOpt =
SysTableResolver.resolveForDescribe(
+ sourceTable, tblNameInfo.getCtl(), tblNameInfo.getDb(),
tblNameInfo.getTbl());
+ if (!sysTableDescribeOpt.isPresent()) {
+ throw new AnalysisException("sys table not found: " +
tableNameWithSysTableName.second);
+ }
+ SysTableResolver.SysTableDescribe sysTableDescribe =
sysTableDescribeOpt.get();
+ if (sysTableDescribe.isNative()) {
+ return sysTableDescribe.getSysExternalTable();
+ }
+ return sourceTable;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
index aa9c6af0b5d..30072b97e70 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.IdGenerator;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
@@ -228,7 +229,7 @@ public class LogicalFileScan extends LogicalCatalogRelation
implements SupportPr
@Override
public boolean supportPruneNestedColumn() {
ExternalTable table = getTable();
- if (table instanceof IcebergExternalTable) {
+ if (table instanceof IcebergExternalTable || table instanceof
IcebergSysExternalTable) {
return true;
} else if (table instanceof HMSExternalTable) {
HMSExternalTable hmsTable = (HMSExternalTable) table;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
deleted file mode 100644
index 5585805536f..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
+++ /dev/null
@@ -1,168 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.tablefunction;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
-import org.apache.doris.datasource.CatalogIf;
-import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.ExternalTable;
-import org.apache.doris.datasource.iceberg.IcebergUtils;
-import org.apache.doris.info.TableNameInfo;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.thrift.TMetaScanRange;
-import org.apache.doris.thrift.TMetadataType;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.MetadataTableUtils;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.util.SerializationUtil;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/**
- * The class of table valued function for iceberg metadata.
- * iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots").
- */
-public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
-
- public static final String NAME = "iceberg_meta";
- public static final String TABLE = "table";
- public static final String QUERY_TYPE = "query_type";
-
- private static final ImmutableSet<String> PROPERTIES_SET =
ImmutableSet.of(TABLE, QUERY_TYPE);
-
- private final String queryType;
- private final Table sysTable;
- private final List<Column> schema;
- private final Map<String, String> hadoopProps;
- private final ExecutionAuthenticator preExecutionAuthenticator;
-
- public static IcebergTableValuedFunction create(Map<String, String> params)
- throws AnalysisException {
- Map<String, String> validParams = Maps.newHashMap();
- for (String key : params.keySet()) {
- if (!PROPERTIES_SET.contains(key.toLowerCase())) {
- throw new AnalysisException("'" + key + "' is invalid
property");
- }
- // check ctl, db, tbl
- validParams.put(key.toLowerCase(), params.get(key));
- }
- String tableName = validParams.get(TABLE);
- String queryType = validParams.get(QUERY_TYPE);
- if (tableName == null || queryType == null) {
- throw new AnalysisException("Invalid iceberg metadata query");
- }
- // TODO: support position_deletes in future;
- if (queryType.equalsIgnoreCase("position_deletes")) {
- throw new AnalysisException("SysTable " + queryType + " is not
supported yet");
- }
- String[] names = tableName.split("\\.");
- if (names.length != 3) {
- throw new AnalysisException("The iceberg table name contains the
catalogName, databaseName, and tableName");
- }
- TableNameInfo icebergTableName = new TableNameInfo(names[0], names[1],
names[2]);
- // check auth
- if (!Env.getCurrentEnv().getAccessManager()
- .checkTblPriv(ConnectContext.get(), icebergTableName,
PrivPredicate.SELECT)) {
-
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"SELECT",
- ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
- icebergTableName.getDb() + ": " +
icebergTableName.getTbl());
- }
- return new IcebergTableValuedFunction(icebergTableName, queryType);
- }
-
- public IcebergTableValuedFunction(TableNameInfo icebergTableName, String
queryType)
- throws AnalysisException {
- this.queryType = queryType;
- CatalogIf<?> catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(icebergTableName.getCtl());
- if (!(catalog instanceof ExternalCatalog)) {
- throw new AnalysisException("Catalog " + icebergTableName.getCtl()
+ " is not an external catalog");
- }
- ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
- hadoopProps =
externalCatalog.getCatalogProperty().getBackendStorageProperties();
- preExecutionAuthenticator =
externalCatalog.getExecutionAuthenticator();
-
- TableIf dorisTable =
externalCatalog.getDbOrAnalysisException(icebergTableName.getDb())
- .getTableOrAnalysisException(icebergTableName.getTbl());
- if (!(dorisTable instanceof ExternalTable)) {
- throw new AnalysisException("Table " + icebergTableName + " is not
an iceberg table");
- }
- Table icebergTable = IcebergUtils.getIcebergTable((ExternalTable)
dorisTable);
- if (icebergTable == null) {
- throw new AnalysisException("Iceberg table " + icebergTableName +
" does not exist");
- }
- MetadataTableType tableType = MetadataTableType.from(queryType);
- if (tableType == null) {
- throw new AnalysisException("Unrecognized queryType for iceberg
metadata: " + queryType);
- }
- this.sysTable =
MetadataTableUtils.createMetadataTableInstance(icebergTable, tableType);
- this.schema = IcebergUtils.parseSchema(sysTable.schema(),
externalCatalog.getEnableMappingVarbinary(),
- externalCatalog.getEnableMappingTimestampTz());
- }
-
- @Override
- public TMetadataType getMetadataType() {
- return TMetadataType.ICEBERG;
- }
-
- @Override
- public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
- CloseableIterable<FileScanTask> tasks;
- try {
- tasks = preExecutionAuthenticator.execute(() -> {
- return sysTable.newScan().select(requiredFileds).planFiles();
- });
- } catch (Exception e) {
- throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
- }
-
- TMetaScanRange tMetaScanRange = new TMetaScanRange();
- tMetaScanRange.setMetadataType(TMetadataType.ICEBERG);
- tMetaScanRange.setHadoopProps(hadoopProps);
-
tMetaScanRange.setSerializedTable(SerializationUtil.serializeToBase64(sysTable));
-
tMetaScanRange.setSerializedSplits(StreamSupport.stream(tasks.spliterator(),
false)
- .map(SerializationUtil::serializeToBase64)
- .collect(Collectors.toList()));
- return tMetaScanRange;
- }
-
- @Override
- public String getTableName() {
- return "IcebergTableValuedFunction<" + queryType + ">";
- }
-
- @Override
- public List<Column> getTableColumns() {
- return schema;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index f6655192fbd..6e489d08c93 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -57,8 +57,6 @@ public abstract class TableValuedFunctionIf {
return new HttpStreamTableValuedFunction(params);
case LocalTableValuedFunction.NAME:
return new LocalTableValuedFunction(params);
- case IcebergTableValuedFunction.NAME:
- return IcebergTableValuedFunction.create(params);
case HudiTableValuedFunction.NAME:
return new HudiTableValuedFunction(params);
case BackendsTableValuedFunction.NAME:
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/systable/IcebergSysTableResolverTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/systable/IcebergSysTableResolverTest.java
new file mode 100644
index 00000000000..e6c5b0be328
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/systable/IcebergSysTableResolverTest.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.systable;
+
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+public class IcebergSysTableResolverTest {
+ @Mocked
+ private IcebergExternalCatalog catalog;
+ @Mocked
+ private IcebergExternalDatabase db;
+
+ @Test
+ public void testSupportedSysTablesExcludePositionDeletes() {
+
Assertions.assertFalse(IcebergSysTable.SUPPORTED_SYS_TABLES.containsKey(IcebergSysTable.POSITION_DELETES));
+ }
+
+ @Test
+ public void testResolveForPlanAndDescribeUseNativePath() throws Exception {
+ IcebergExternalTable sourceTable = newIcebergTable();
+
+ Optional<SysTableResolver.SysTablePlan> plan =
SysTableResolver.resolveForPlan(
+ sourceTable, "test_ctl", "test_db", "tbl$snapshots");
+ Assertions.assertTrue(plan.isPresent());
+ Assertions.assertTrue(plan.get().isNative());
+ Assertions.assertTrue(plan.get().getSysExternalTable() instanceof
IcebergSysExternalTable);
+ Assertions.assertEquals("tbl$snapshots",
plan.get().getSysExternalTable().getName());
+
+ Optional<SysTableResolver.SysTableDescribe> describe =
SysTableResolver.resolveForDescribe(
+ sourceTable, "test_ctl", "test_db", "tbl$snapshots");
+ Assertions.assertTrue(describe.isPresent());
+ Assertions.assertTrue(describe.get().isNative());
+ Assertions.assertTrue(describe.get().getSysExternalTable() instanceof
IcebergSysExternalTable);
+ Assertions.assertEquals("tbl$snapshots",
describe.get().getSysExternalTable().getName());
+ }
+
+ @Test
+ public void testPositionDeletesKeepsUnsupportedError() throws Exception {
+ IcebergExternalTable sourceTable = newIcebergTable();
+ Assertions.assertThrows(AnalysisException.class, () ->
+ SysTableResolver.resolveForPlan(sourceTable, "test_ctl",
"test_db", "tbl$position_deletes"));
+ }
+
+ private IcebergExternalTable newIcebergTable() throws Exception {
+ new Expectations() {
+ {
+ catalog.getId();
+ minTimes = 0;
+ result = 1L;
+
+ db.getFullName();
+ minTimes = 0;
+ result = "test_db";
+
+ db.getRemoteName();
+ minTimes = 0;
+ result = "test_db";
+
+ catalog.getDbOrAnalysisException("test_db");
+ minTimes = 0;
+ result = db;
+
+ db.getId();
+ minTimes = 0;
+ result = 2L;
+
+ db.makeSureInitialized();
+ minTimes = 0;
+ }
+ };
+ return new IcebergExternalTable(3L, "tbl", "tbl", catalog, db);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/UserAuthenticationTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/UserAuthenticationTest.java
index d440d32dd18..547e2b0d2c6 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/UserAuthenticationTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/UserAuthenticationTest.java
@@ -25,6 +25,8 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergSysExternalTable;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@@ -55,6 +57,10 @@ public class UserAuthenticationTest {
private DatabaseIf db;
@Mocked
private CatalogIf catalog;
+ @Mocked
+ private IcebergSysExternalTable icebergSysTable;
+ @Mocked
+ private IcebergExternalTable icebergSourceTable;
private String originalMinPrivilege;
@@ -345,4 +351,59 @@ public class UserAuthenticationTest {
Assertions.assertDoesNotThrow(() ->
UserAuthentication.checkPermission(table, connectContext,
null));
}
+
+ @Test
+ public void testIcebergSysTableUsesSourceTablePrivilege() throws Exception
{
+ new Expectations() {
+ {
+ icebergSysTable.getSourceTable();
+ minTimes = 0;
+ result = icebergSourceTable;
+
+ connectContext.getSessionVariable();
+ minTimes = 0;
+ result = sessionVariable;
+
+ sessionVariable.isPlayNereidsDump();
+ minTimes = 0;
+ result = false;
+
+ icebergSourceTable.getName();
+ minTimes = 0;
+ result = "source_tbl";
+
+ icebergSourceTable.getDatabase();
+ minTimes = 0;
+ result = db;
+
+ db.getFullName();
+ minTimes = 0;
+ result = "test_db";
+
+ db.getCatalog();
+ minTimes = 0;
+ result = catalog;
+
+ catalog.getName();
+ minTimes = 0;
+ result = "test_ctl";
+
+ connectContext.getEnv();
+ minTimes = 0;
+ result = env;
+
+ env.getAccessManager();
+ minTimes = 0;
+ result = accessControllerManager;
+
+ accessControllerManager.checkTblPriv(connectContext,
"test_ctl", "test_db",
+ "source_tbl", PrivPredicate.SELECT);
+ minTimes = 1;
+ result = true;
+ }
+ };
+
+ Assertions.assertDoesNotThrow(() ->
+ UserAuthentication.checkPermission(icebergSysTable,
connectContext, null));
+ }
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1f4ae4df04a..60c5e01ecf8 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -319,6 +319,7 @@ struct TIcebergFileDesc {
10: optional i64 first_row_id;
// Only for format_version >= 3, the sequence number which last updated
this file.
11: optional i64 last_updated_sequence_number;
+ 12: optional string serialized_split;
}
struct TPaimonDeletionFileDesc {
diff --git a/regression-test/data/external_table_p2/tvf/test_iceberg_meta.out
b/regression-test/data/external_table_p2/tvf/test_iceberg_meta.out
deleted file mode 100644
index b62e2d7510a..00000000000
--- a/regression-test/data/external_table_p2/tvf/test_iceberg_meta.out
+++ /dev/null
@@ -1,22 +0,0 @@
--- This file is automatically generated. You should know what you did if you
want to edit this
--- !q01 --
-2879562
-
--- !q02 --
-1
-11
-3
-5
-6
-7
-8
-
--- !tvf_1 --
-2023-10-16T21:01:06 4012471924714711043 5784892960796156942 append
-2023-10-16T21:01:06 5784892960796156942 -1 append
-2023-10-16T21:01:06 7235593032487457798 4012471924714711043 append
-2023-10-16T21:01:07 1953697979105284524 7235593032487457798 append
-
--- !tvf_2 --
-2023-10-16T21:01:06 7235593032487457798 4012471924714711043 append
-
diff --git
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
index fe0cd29c5e9..cd3f77fe96d 100644
---
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
@@ -55,7 +55,7 @@ suite("iceberg_branch_retention_and_snapshot", "p0,external")
{
sql """ insert into ${table_name_expire} values (4, 'd') """
sql """ insert into ${table_name_expire} values (5, 'e') """
- List<List<Object>> snapshots_expire = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_retention.${table_name_expire}", "query_type" =
"snapshots") order by committed_at; """
+ List<List<Object>> snapshots_expire = sql """ select snapshot_id from
${catalog_name}.test_db_retention.${table_name_expire}\$snapshots order by
committed_at; """
String s_expire_0 = snapshots_expire.get(0)[0]
String s_expire_1 = snapshots_expire.get(1)[0]
String s_expire_2 = snapshots_expire.get(2)[0]
@@ -73,7 +73,7 @@ suite("iceberg_branch_retention_and_snapshot", "p0,external")
{
sql """ insert into ${table_name_expire}@branch(b_expire_test) values (10,
'j') """
// Get the current snapshot count before expire
- def snapshot_count_before_expire = sql """ select count(*) from
iceberg_meta("table" =
"${catalog_name}.test_db_retention.${table_name_expire}", "query_type" =
"snapshots") """
+ def snapshot_count_before_expire = sql """ select count(*) from
${catalog_name}.test_db_retention.${table_name_expire}\$snapshots """
logger.info("Snapshot count before expire:
${snapshot_count_before_expire[0][0]}")
// Get branch ref snapshot
@@ -107,7 +107,7 @@ suite("iceberg_branch_retention_and_snapshot",
"p0,external") {
sql """ create table ${table_name_retain_count} (id int, name string) """
sql """ insert into ${table_name_retain_count} values (1, 'a') """
- List<List<Object>> snapshots_retain = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_retention.${table_name_retain_count}", "query_type" =
"snapshots") order by committed_at; """
+ List<List<Object>> snapshots_retain = sql """ select snapshot_id from
${catalog_name}.test_db_retention.${table_name_retain_count}\$snapshots order
by committed_at; """
def s_retain_0 = snapshots_retain[0][0].toString()
// Create branch with snapshot retention count of 3
@@ -124,7 +124,7 @@ suite("iceberg_branch_retention_and_snapshot",
"p0,external") {
def branch_snapshot_id = sql """ select snapshot_id from
${table_name_retain_count}\$refs where name = 'b_retain_count' """
// Count snapshots before expire
- def snapshot_count_retain_before = sql """ select count(*) from
iceberg_meta("table" =
"${catalog_name}.test_db_retention.${table_name_retain_count}", "query_type" =
"snapshots") """
+ def snapshot_count_retain_before = sql """ select count(*) from
${catalog_name}.test_db_retention.${table_name_retain_count}\$snapshots """
// Call expire_snapshots - older snapshots beyond retention count may be
expired, but branch snapshot should be protected
sql """
@@ -150,11 +150,11 @@ suite("iceberg_branch_retention_and_snapshot",
"p0,external") {
sql """ insert into ${table_name_unref} values (2, 'old2') """
sql """ insert into ${table_name_unref} values (3, 'new') """
- List<List<Object>> snapshots_unref = sql """ select snapshot_id,
committed_at from iceberg_meta("table" =
"${catalog_name}.test_db_retention.${table_name_unref}", "query_type" =
"snapshots") order by committed_at; """
+ List<List<Object>> snapshots_unref = sql """ select snapshot_id,
committed_at from
${catalog_name}.test_db_retention.${table_name_unref}\$snapshots order by
committed_at; """
def old_snapshot_id = snapshots_unref[0][0]
// Create a tag pointing to the newest snapshot (not the old ones)
- List<List<Object>> latest_snapshot = sql """ select snapshot_id from
iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}",
"query_type" = "snapshots") order by committed_at desc limit 1; """
+ List<List<Object>> latest_snapshot = sql """ select snapshot_id from
${catalog_name}.test_db_retention.${table_name_unref}\$snapshots order by
committed_at desc limit 1; """
def latest_snap_id = latest_snapshot[0][0]
sql """ alter table ${table_name_unref} create tag t_latest AS OF VERSION
${latest_snap_id} """
@@ -162,7 +162,7 @@ suite("iceberg_branch_retention_and_snapshot",
"p0,external") {
qt_unref_tag_before_expire """ select * from
${table_name_unref}@tag(t_latest) order by id """ // Should have 1, old2, 3 rows
// Count snapshots before expire
- def snapshot_count_unref_before = sql """ select count(*) from
iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}",
"query_type" = "snapshots") """
+ def snapshot_count_unref_before = sql """ select count(*) from
${catalog_name}.test_db_retention.${table_name_unref}\$snapshots """
logger.info("Snapshot count before expire:
${snapshot_count_unref_before[0][0]}")
// Call expire_snapshots - old unreferenced snapshots should be expired
@@ -172,7 +172,7 @@ suite("iceberg_branch_retention_and_snapshot",
"p0,external") {
"""
// Count snapshots after expire
- def snapshot_count_unref_after = sql """ select count(*) from
iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}",
"query_type" = "snapshots") """
+ def snapshot_count_unref_after = sql """ select count(*) from
${catalog_name}.test_db_retention.${table_name_unref}\$snapshots """
// Refresh catalog to ensure we see the latest state after expire_snapshots
sql """refresh catalog ${catalog_name}"""
@@ -181,7 +181,7 @@ suite("iceberg_branch_retention_and_snapshot",
"p0,external") {
qt_unref_tag_accessible """ select * from
${table_name_unref}@tag(t_latest) order by id """ // Should have data
// Verify old snapshot is no longer accessible if it was expired
- def old_snapshot_exists = sql """ select count(*) from
iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}",
"query_type" = "snapshots") where snapshot_id = '${old_snapshot_id}' """
+ def old_snapshot_exists = sql """ select count(*) from
${catalog_name}.test_db_retention.${table_name_unref}\$snapshots where
snapshot_id = '${old_snapshot_id}' """
logger.info("Old snapshot exists after expire:
${old_snapshot_exists[0][0]}")
// The tag-protected snapshot should still be in refs
diff --git
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy
index 3c37ff7ba04..242a4f7fcf7 100644
---
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_tag_retention_and_consistency.groovy
@@ -55,7 +55,7 @@ suite("iceberg_tag_retention_and_consistency", "p0,external")
{
sql """ insert into ${table_name_expire} values (4, 'd', 'snapshot4') """
sql """ insert into ${table_name_expire} values (5, 'e', 'snapshot5') """
- List<List<Object>> snapshots_expire = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_tag_retention.${table_name_expire}", "query_type" =
"snapshots") order by committed_at; """
+ List<List<Object>> snapshots_expire = sql """ select snapshot_id from
${catalog_name}.test_db_tag_retention.${table_name_expire}\$snapshots order by
committed_at; """
String s_expire_0 = snapshots_expire.get(0)[0]
String s_expire_1 = snapshots_expire.get(1)[0]
String s_expire_2 = snapshots_expire.get(2)[0]
@@ -69,7 +69,7 @@ suite("iceberg_tag_retention_and_consistency", "p0,external")
{
logger.info("Created tags t_early (snapshot ${s_expire_1}) and t_middle
(snapshot ${s_expire_3})")
// Get snapshot count before expire
- def snapshot_count_before_expire = sql """ select count(*) from
iceberg_meta("table" =
"${catalog_name}.test_db_tag_retention.${table_name_expire}", "query_type" =
"snapshots") """
+ def snapshot_count_before_expire = sql """ select count(*) from
${catalog_name}.test_db_tag_retention.${table_name_expire}\$snapshots """
logger.info("Snapshot count before expire:
${snapshot_count_before_expire[0][0]}")
// Call expire_snapshots via Spark - should not delete snapshots
referenced by tags
@@ -95,7 +95,7 @@ suite("iceberg_tag_retention_and_consistency", "p0,external")
{
sql """ insert into ${table_name_multi_tag} values (2, 'second') """
sql """ insert into ${table_name_multi_tag} values (3, 'third') """
- List<List<Object>> snapshots_multi = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_tag_retention.${table_name_multi_tag}", "query_type" =
"snapshots") order by committed_at; """
+ List<List<Object>> snapshots_multi = sql """ select snapshot_id from
${catalog_name}.test_db_tag_retention.${table_name_multi_tag}\$snapshots order
by committed_at; """
String s_multi_1 = snapshots_multi.get(1)[0]
// Create multiple tags pointing to the same snapshot
@@ -126,7 +126,7 @@ suite("iceberg_tag_retention_and_consistency",
"p0,external") {
sql """ insert into ${table_name_immutable} values (1, 'version1', 1000)
"""
sql """ insert into ${table_name_immutable} values (2, 'version2', 2000)
"""
- List<List<Object>> snapshots_imm = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_tag_retention.${table_name_immutable}", "query_type" =
"snapshots") order by committed_at; """
+ List<List<Object>> snapshots_imm = sql """ select snapshot_id from
${catalog_name}.test_db_tag_retention.${table_name_immutable}\$snapshots order
by committed_at; """
String s_imm_v1 = snapshots_imm.get(1)[0]
// Create a tag at this point
@@ -157,11 +157,11 @@ suite("iceberg_tag_retention_and_consistency",
"p0,external") {
sql """ create table ${table_name_replace} (id int, status string) """
sql """ insert into ${table_name_replace} values (1, 'pending') """
- List<List<Object>> snapshots_replace = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_tag_retention.${table_name_replace}", "query_type" =
"snapshots") order by committed_at; """
+ List<List<Object>> snapshots_replace = sql """ select snapshot_id from
${catalog_name}.test_db_tag_retention.${table_name_replace}\$snapshots order by
committed_at; """
String s_rep_v1 = snapshots_replace.get(0)[0]
sql """ insert into ${table_name_replace} values (2, 'approved') """
- List<List<Object>> snapshots_replace_2 = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_tag_retention.${table_name_replace}", "query_type" =
"snapshots") order by committed_at; """
+ List<List<Object>> snapshots_replace_2 = sql """ select snapshot_id from
${catalog_name}.test_db_tag_retention.${table_name_replace}\$snapshots order by
committed_at; """
String s_rep_v2 = snapshots_replace_2.get(1)[0]
// Create initial tag
@@ -185,7 +185,7 @@ suite("iceberg_tag_retention_and_consistency",
"p0,external") {
sql """ insert into ${table_name_time_travel} values (2, 'v1.0',
'2024-01-01') """
sql """ insert into ${table_name_time_travel} values (3, 'v1.0',
'2024-01-01') """
- List<List<Object>> snapshots_tt = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_tag_retention.${table_name_time_travel}", "query_type"
= "snapshots") order by committed_at; """
+ List<List<Object>> snapshots_tt = sql """ select snapshot_id from
${catalog_name}.test_db_tag_retention.${table_name_time_travel}\$snapshots
order by committed_at; """
String s_tt_1 = snapshots_tt.get(0)[0]
String s_tt_2 = snapshots_tt.get(1)[0]
String s_tt_3 = snapshots_tt.get(2)[0]
@@ -219,7 +219,7 @@ suite("iceberg_tag_retention_and_consistency",
"p0,external") {
sql """ insert into ${table_name_agg} values (3, 'A', 150) """
sql """ insert into ${table_name_agg} values (4, 'C', 300) """
- List<List<Object>> snapshots_agg = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_tag_retention.${table_name_agg}", "query_type" =
"snapshots") order by committed_at; """
+ List<List<Object>> snapshots_agg = sql """ select snapshot_id from
${catalog_name}.test_db_tag_retention.${table_name_agg}\$snapshots order by
committed_at; """
String s_agg_all = snapshots_agg.get(3)[0]
// Create tag for aggregate queries
@@ -245,7 +245,7 @@ suite("iceberg_tag_retention_and_consistency",
"p0,external") {
sql """ insert into ${table_name_interaction} values (1, 'item1', 'main')
"""
sql """ insert into ${table_name_interaction} values (2, 'item2', 'main')
"""
- List<List<Object>> snapshots_inter = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_tag_retention.${table_name_interaction}", "query_type"
= "snapshots") order by committed_at; """
+ List<List<Object>> snapshots_inter = sql """ select snapshot_id from
${catalog_name}.test_db_tag_retention.${table_name_interaction}\$snapshots
order by committed_at; """
String s_inter_baseline = snapshots_inter.get(1)[0]
// Create a tag for baseline
@@ -276,7 +276,7 @@ suite("iceberg_tag_retention_and_consistency",
"p0,external") {
sql """ create table ${table_name_write_fail} (id int, value string) """
sql """ insert into ${table_name_write_fail} values (1, 'data1') """
- List<List<Object>> snapshots_fail = sql """ select snapshot_id from
iceberg_meta("table" =
"${catalog_name}.test_db_tag_retention.${table_name_write_fail}", "query_type"
= "snapshots") order by committed_at; """
+ List<List<Object>> snapshots_fail = sql """ select snapshot_id from
${catalog_name}.test_db_tag_retention.${table_name_write_fail}\$snapshots order
by committed_at; """
String s_fail = snapshots_fail.get(0)[0]
// Create a tag
diff --git
a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
index 53b4581f604..4bf1d347950 100644
---
a/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/iceberg_branch_tag_operate.groovy
@@ -88,7 +88,7 @@ suite("iceberg_branch_tag_operate", "p0,external") {
assertTrue(update_time2 > update_time1);
sleep(1000)
- List<List<Object>> snapshots = sql """ select snapshot_id from
iceberg_meta("table" = "${catalog_name}.test_db.${table_name}", "query_type" =
"snapshots") order by committed_at; """
+ List<List<Object>> snapshots = sql """ select snapshot_id from
${catalog_name}.test_db.${table_name}\$snapshots order by committed_at; """
String s0 = snapshots.get(0)[0]
String s1 = snapshots.get(1)[0]
String s2 = snapshots.get(2)[0]
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
index f014ed6ecb7..4ecf4aef86e 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
@@ -48,7 +48,7 @@ suite("iceberg_schema_change_with_timetravel", "p0,external")
{
sql """ use test_db;"""
def executeTimeTravelingQueries = { String tableName ->
- def snapshots = sql """ select snapshot_id from iceberg_meta("table" =
"${catalog_name}.test_db.${tableName}", "query_type" = "snapshots") order by
committed_at; """
+ def snapshots = sql """ select snapshot_id from
${catalog_name}.test_db.${tableName}\$snapshots order by committed_at; """
def snapshotIds = [
s0: snapshots.get(0)[0],
s1: snapshots.get(1)[0],
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
index ba3a4621224..de4d582a83a 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
@@ -43,48 +43,13 @@ suite("test_iceberg_sys_table", "p0,external") {
sql """switch ${catalog_name}"""
sql """use ${db_name}"""
- def test_iceberg_systable = { tblName, systableType ->
- def systableName = "${tblName}\$${systableType}"
-
- order_qt_desc_systable1 """desc ${systableName}"""
- order_qt_desc_systable2 """desc ${db_name}.${systableName}"""
- order_qt_desc_systable3 """desc
${catalog_name}.${db_name}.${systableName}"""
-
- List<List<Object>> schema = sql """desc ${systableName}"""
- String key = String.valueOf(schema[1][0])
-
- order_qt_tbl1_systable """select * from ${systableName}"""
- order_qt_tbl1_systable_select """select ${key} from ${systableName}"""
- order_qt_tbl1_systable_count """select count(*) from ${systableName}"""
- order_qt_tbl1_systable_count_select """select count(${key}) from
${systableName}"""
-
- List<List<Object>> res1 = sql """select ${key} from ${systableName}
order by ${key}"""
- List<List<Object>> res2 = sql """select ${key} from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.${tblName}",
- "query_type" = "${systableType}") order by ${key};
- """
- assertEquals(res1.size(), res2.size());
- for (int i = 0; i < res1.size(); i++) {
- for (int j = 0; j < res1[i].size(); j++) {
- assertEquals(res1[i][j], res2[i][j]);
- }
- }
-
- if (res1.isEmpty()) {
- return
- }
-
- String value = String.valueOf(res1[0][0])
- order_qt_tbl1_systable_where """
- select * from ${systableName} where ${key}="${value}";
- """
- order_qt_tbl1_systable_where_count """
- select count(*) from ${systableName} where ${key}="${value}";
- """
- order_qt_systable_where2 """select * from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.${tblName}",
- "query_type" = "${systableType}") where ${key}="${value}";
- """
+ def assertQueryRowsMatchCount = { String countSql, String querySql, String
label ->
+ List<List<Object>> countResult = sql countSql
+ assertEquals(1, countResult.size())
+ long expectedRows = ((Number) countResult[0][0]).longValue()
+ List<List<Object>> queryResult = sql querySql
+ assertNotNull(queryResult, "${label} result should not be null")
+ assertEquals(expectedRows, (long) queryResult.size(), "${label} row
count should match count query")
}
def test_systable_entries = { table, systableType ->
@@ -103,10 +68,16 @@ suite("test_iceberg_sys_table", "p0,external") {
}
}
- order_qt_select_entries """select status, sequence_number,
file_sequence_number from ${systableName}"""
order_qt_select_entries_count """select count(*) from
${systableName}"""
- order_qt_select_entries_where """select status, sequence_number,
file_sequence_number from ${systableName} where status="0";"""
order_qt_select_entries_where_count """select count(status) from
${systableName} where status="0";"""
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select status, sequence_number, file_sequence_number from
${systableName}""",
+ systableName)
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName} where status="0";""",
+ """select status, sequence_number, file_sequence_number from
${systableName} where status="0";""",
+ "${systableName} filtered by status")
}
def test_systable_files = { table, systableType ->
@@ -125,10 +96,16 @@ suite("test_iceberg_sys_table", "p0,external") {
}
}
- order_qt_select_files """select content, file_format, record_count,
lower_bounds, upper_bounds from ${systableName}"""
order_qt_select_files_count """select count(*) from ${systableName}"""
- order_qt_select_files_where """select content, file_format,
record_count, lower_bounds, upper_bounds from ${systableName} where
content="0";"""
order_qt_select_files_where_count """select count(content) from
${systableName} where content="0";"""
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select content, file_format, record_count, lower_bounds,
upper_bounds from ${systableName}""",
+ systableName)
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName} where content="0";""",
+ """select content, file_format, record_count, lower_bounds,
upper_bounds from ${systableName} where content="0";""",
+ "${systableName} filtered by content")
}
def test_systable_history = { table ->
@@ -148,17 +125,10 @@ suite("test_iceberg_sys_table", "p0,external") {
}
order_qt_select_history_count """select count(*) from
${systableName}"""
-
- List<List<Object>> res1 = sql """select * from ${systableName} order
by snapshot_id"""
- List<List<Object>> res2 = sql """select * from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.${table}",
- "query_type" = "history") order by snapshot_id"""
- assertEquals(res1.size(), res2.size());
- for (int i = 0; i < res1.size(); i++) {
- for (int j = 0; j < res1[i].size(); j++) {
- assertEquals(res1[i][j], res2[i][j]);
- }
- }
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select * from ${systableName} order by snapshot_id""",
+ systableName)
}
def test_systable_metadata_log_entries = { table ->
@@ -178,17 +148,10 @@ suite("test_iceberg_sys_table", "p0,external") {
}
order_qt_select_metadata_log_entries_count """select count(*) from
${systableName}"""
-
- List<List<Object>> res1 = sql """select * from ${systableName} order
by timestamp"""
- List<List<Object>> res2 = sql """select * from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.${table}",
- "query_type" = "metadata_log_entries") order by timestamp"""
- assertEquals(res1.size(), res2.size());
- for (int i = 0; i < res1.size(); i++) {
- for (int j = 0; j < res1[i].size(); j++) {
- assertEquals(res1[i][j], res2[i][j]);
- }
- }
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select * from ${systableName} order by timestamp""",
+ systableName)
}
def test_systable_snapshots = { table ->
@@ -207,19 +170,15 @@ suite("test_iceberg_sys_table", "p0,external") {
}
}
- order_qt_select_snapshots """select operation from ${systableName}"""
order_qt_select_snapshots_count """select count(*) from
${systableName}"""
-
- List<List<Object>> res1 = sql """select * from ${systableName} order
by committed_at"""
- List<List<Object>> res2 = sql """select * from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.${table}",
- "query_type" = "snapshots") order by committed_at"""
- assertEquals(res1.size(), res2.size());
- for (int i = 0; i < res1.size(); i++) {
- for (int j = 0; j < res1[i].size(); j++) {
- assertEquals(res1[i][j], res2[i][j]);
- }
- }
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select operation from ${systableName}""",
+ "${systableName} projected query")
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select * from ${systableName} order by committed_at""",
+ systableName)
}
def test_systable_refs = { table ->
@@ -238,19 +197,15 @@ suite("test_iceberg_sys_table", "p0,external") {
}
}
- order_qt_select_refs """select name, type from ${systableName}"""
order_qt_select_refs_count """select count(*) from ${systableName}"""
-
- List<List<Object>> res1 = sql """select * from ${systableName} order
by snapshot_id"""
- List<List<Object>> res2 = sql """select * from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.${table}",
- "query_type" = "refs") order by snapshot_id"""
- assertEquals(res1.size(), res2.size());
- for (int i = 0; i < res1.size(); i++) {
- for (int j = 0; j < res1[i].size(); j++) {
- assertEquals(res1[i][j], res2[i][j]);
- }
- }
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select name, type from ${systableName}""",
+ "${systableName} projected query")
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select * from ${systableName} order by snapshot_id""",
+ systableName)
}
def test_systable_manifests = { table, systableType ->
@@ -272,27 +227,15 @@ suite("test_iceberg_sys_table", "p0,external") {
order_qt_select_manifests_count """select count(*) from
${systableName}"""
if (systableType.equals("manifests")) {
- List<List<Object>> res1 = sql """select * from ${systableName}
order by path"""
- List<List<Object>> res2 = sql """select * from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.${table}",
- "query_type" = "${systableType}") order by path"""
- assertEquals(res1.size(), res2.size());
- for (int i = 0; i < res1.size(); i++) {
- for (int j = 0; j < res1[i].size(); j++) {
- assertEquals(res1[i][j], res2[i][j]);
- }
- }
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select * from ${systableName} order by path""",
+ systableName)
} else {
- List<List<Object>> res1 = sql """select * from ${systableName}
order by path, reference_snapshot_id"""
- List<List<Object>> res2 = sql """select * from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.${table}",
- "query_type" = "${systableType}") order by path,
reference_snapshot_id"""
- assertEquals(res1.size(), res2.size());
- for (int i = 0; i < res1.size(); i++) {
- for (int j = 0; j < res1[i].size(); j++) {
- assertEquals(res1[i][j], res2[i][j]);
- }
- }
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select * from ${systableName} order by path,
reference_snapshot_id""",
+ systableName)
}
}
@@ -313,14 +256,10 @@ suite("test_iceberg_sys_table", "p0,external") {
}
order_qt_select_partitions_count """select count(*) from
${systableName}"""
-
-
- List<List<Object>> res1 = sql """select * from ${systableName};"""
- List<List<Object>> res2 = sql """select * from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.${table}",
- "query_type" = "partitions");"""
- assertEquals(res1.size(), res2.size());
- // just test can be selected successully
+ assertQueryRowsMatchCount(
+ """select count(*) from ${systableName}""",
+ """select * from ${systableName};""",
+ systableName)
}
def test_table_systables = { table ->
@@ -374,14 +313,6 @@ suite("test_iceberg_sys_table", "p0,external") {
sql """create database if not exists internal.regression_test"""
sql """grant select_priv on internal.regression_test.* to ${user}"""
connect(user, "${pwd}", context.config.jdbcUrl) {
- test {
- sql """
- select committed_at, snapshot_id, parent_id, operation from
iceberg_meta(
- "table" =
"${catalog_name}.${db_name}.test_iceberg_systable_tbl1",
- "query_type" = "snapshots");
- """
- exception "denied"
- }
test {
sql """
select committed_at, snapshot_id, parent_id, operation from
${catalog_name}.${db_name}.test_iceberg_systable_tbl1\$snapshots
@@ -391,11 +322,6 @@ suite("test_iceberg_sys_table", "p0,external") {
}
sql """grant select_priv on
${catalog_name}.${db_name}.test_iceberg_systable_tbl1 to ${user}"""
connect(user, "${pwd}", context.config.jdbcUrl) {
- sql """
- select committed_at, snapshot_id, parent_id, operation from
iceberg_meta(
- "table" =
"${catalog_name}.${db_name}.test_iceberg_systable_tbl1",
- "query_type" = "snapshots");
- """
sql """select committed_at, snapshot_id, parent_id, operation from
${catalog_name}.${db_name}.test_iceberg_systable_tbl1\$snapshots"""
}
try_sql("DROP USER ${user}")
@@ -417,5 +343,13 @@ suite("test_iceberg_sys_table", "p0,external") {
sql """use ${db_name}"""
order_qt_varbinary_sys_table_desc """desc
test_iceberg_systable_unpartitioned\$files"""
- order_qt_varbinary_sys_table_select """select content, file_format,
record_count, lower_bounds, upper_bounds from
test_iceberg_systable_unpartitioned\$files;"""
+ List<List<Object>> varbinaryRows = sql """
+ select content, file_format, record_count, lower_bounds, upper_bounds
+ from test_iceberg_systable_unpartitioned\$files;
+ """
+ assertTrue(varbinaryRows.size() > 0, "Varbinary system table query should
return data")
+ assertTrue(String.valueOf(varbinaryRows[0][3]).contains("0x"),
+ "Expected lower_bounds to use varbinary hex output")
+ assertTrue(String.valueOf(varbinaryRows[0][4]).contains("0x"),
+ "Expected upper_bounds to use varbinary hex output")
}
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table_auth.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table_auth.groovy
index e2b059b78cb..99f035f7674 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table_auth.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table_auth.groovy
@@ -70,15 +70,6 @@ suite("test_iceberg_sys_table_auth", "p0,external") {
// Test that user without table permission cannot query system tables
connect(user, "${pwd}", context.config.jdbcUrl) {
- // Test snapshots system table via iceberg_meta function
- test {
- sql """
- select committed_at, snapshot_id, parent_id, operation from
iceberg_meta(
- "table" =
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
- "query_type" = "snapshots");
- """
- exception "denied"
- }
// Test snapshots system table via direct access
test {
sql """
@@ -86,15 +77,6 @@ suite("test_iceberg_sys_table_auth", "p0,external") {
"""
exception "denied"
}
- // Test files system table via iceberg_meta function
- test {
- sql """
- select * from iceberg_meta(
- "table" =
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
- "query_type" = "files");
- """
- exception "denied"
- }
// Test files system table via direct access
test {
sql """
@@ -102,15 +84,6 @@ suite("test_iceberg_sys_table_auth", "p0,external") {
"""
exception "denied"
}
- // Test entries system table via iceberg_meta function
- test {
- sql """
- select * from iceberg_meta(
- "table" =
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
- "query_type" = "entries");
- """
- exception "denied"
- }
// Test entries system table via direct access
test {
sql """
@@ -118,15 +91,6 @@ suite("test_iceberg_sys_table_auth", "p0,external") {
"""
exception "denied"
}
- // Test history system table via iceberg_meta function
- test {
- sql """
- select * from iceberg_meta(
- "table" =
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
- "query_type" = "history");
- """
- exception "denied"
- }
// Test history system table via direct access
test {
sql """
@@ -139,38 +103,10 @@ suite("test_iceberg_sys_table_auth", "p0,external") {
// Grant permission and verify user can query system tables
sql """GRANT SELECT_PRIV ON
${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1 TO '${user}'@'%'"""
connect(user, "${pwd}", context.config.jdbcUrl) {
- // Test snapshots system table with permission
- sql """
- select committed_at, snapshot_id, parent_id, operation from
iceberg_meta(
- "table" =
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
- "query_type" = "snapshots");
- """
sql """select committed_at, snapshot_id, parent_id, operation from
${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1\$snapshots"""
-
- // Test files system table with permission
- sql """
- select * from iceberg_meta(
- "table" =
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
- "query_type" = "files");
- """
sql """select * from
${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1\$files"""
-
- // Test entries system table with permission
- sql """
- select * from iceberg_meta(
- "table" =
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
- "query_type" = "entries");
- """
sql """select * from
${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1\$entries"""
-
- // Test history system table with permission
- sql """
- select * from iceberg_meta(
- "table" =
"${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1",
- "query_type" = "history");
- """
sql """select * from
${catalog_name}.${db_name}.test_iceberg_systable_auth_tbl1\$history"""
}
try_sql("DROP USER '${user}'@'%'")
}
-
diff --git
a/regression-test/suites/external_table_p0/polaris/test_polaris.groovy
b/regression-test/suites/external_table_p0/polaris/test_polaris.groovy
index b1bb71a31c4..e8fe34af37e 100644
--- a/regression-test/suites/external_table_p0/polaris/test_polaris.groovy
+++ b/regression-test/suites/external_table_p0/polaris/test_polaris.groovy
@@ -158,10 +158,7 @@ suite("test_polaris", "p0,external") {
// ======= TIME TRAVEL TEST =======
def iceberg_meta_result = sql """
- SELECT snapshot_id FROM iceberg_meta(
- 'table' = '${catalog_name}.${db_name}.${table_name}',
- 'query_type' = 'snapshots'
- ) order by committed_at desc;
+ SELECT snapshot_id FROM
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
"""
def first_snapshot_id = iceberg_meta_result.get(0).get(0);
def time_travel = sql """
diff --git
a/regression-test/suites/external_table_p0/refactor_storage_param/iceberg_rest_on_hdfs.groovy
b/regression-test/suites/external_table_p0/refactor_storage_param/iceberg_rest_on_hdfs.groovy
index d2b6cee8965..0bfba69e9a5 100644
---
a/regression-test/suites/external_table_p0/refactor_storage_param/iceberg_rest_on_hdfs.groovy
+++
b/regression-test/suites/external_table_p0/refactor_storage_param/iceberg_rest_on_hdfs.groovy
@@ -158,10 +158,7 @@ suite("iceberg_rest_on_hdfs", "p0,external") {
// ======= TIME TRAVEL TEST =======
def iceberg_meta_result = sql """
- SELECT snapshot_id FROM iceberg_meta(
- 'table' = '${catalog_name}.${db_name}.${table_name}',
- 'query_type' = 'snapshots'
- ) order by committed_at desc;
+ SELECT snapshot_id FROM
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
"""
def first_snapshot_id = iceberg_meta_result.get(0).get(0);
def time_travel = sql """
diff --git
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy
index 8032a42b2a3..45215c76d12 100644
---
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy
+++
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_on_hms_and_filesystem_and_dlf.groovy
@@ -128,10 +128,7 @@ suite("iceberg_on_hms_and_filesystem_and_dlf",
"p2,external") {
println "iceberg_meta_result SUCCESS" + catalog_name
def iceberg_meta_result = sql """
- SELECT snapshot_id FROM iceberg_meta(
- 'table' = '${catalog_name}.${db_name}.${table_name}',
- 'query_type' = 'snapshots'
- ) order by committed_at desc;
+ SELECT snapshot_id FROM
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
"""
def first_snapshot_id = iceberg_meta_result.get(0).get(0);
@@ -291,10 +288,7 @@ suite("iceberg_on_hms_and_filesystem_and_dlf",
"p2,external") {
println "iceberg_meta_result SUCCESS" + catalog_name
def iceberg_meta_result = sql """
- SELECT snapshot_id FROM iceberg_meta(
- 'table' = '${catalog_name}.${db_name}.${table_name}',
- 'query_type' = 'snapshots'
- ) order by committed_at desc;
+ SELECT snapshot_id FROM
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
"""
def first_snapshot_id = iceberg_meta_result.get(0).get(0);
diff --git
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_s3_storage_vended_test.groovy
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_s3_storage_vended_test.groovy
index 7584e388ff2..3954bfdadb4 100644
---
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_s3_storage_vended_test.groovy
+++
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_s3_storage_vended_test.groovy
@@ -160,10 +160,7 @@ suite("iceberg_rest_s3_storage_vended_test",
"p2,external") {
// ======= TIME TRAVEL TEST =======
def iceberg_meta_result = sql """
- SELECT snapshot_id FROM iceberg_meta(
- 'table' = '${catalog_name}.${db_name}.${table_name}',
- 'query_type' = 'snapshots'
- ) order by committed_at desc;
+ SELECT snapshot_id FROM
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
"""
def first_snapshot_id = iceberg_meta_result.get(0).get(0);
def time_travel = sql """
diff --git
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_storage_test.groovy
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_storage_test.groovy
index d4fc6b1be44..955637db101 100644
---
a/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_storage_test.groovy
+++
b/regression-test/suites/external_table_p2/refactor_catalog_param/iceberg_rest_storage_test.groovy
@@ -158,10 +158,7 @@ suite("iceberg_rest_storage_test", "p2,external") {
// ======= TIME TRAVEL TEST =======
def iceberg_meta_result = sql """
- SELECT snapshot_id FROM iceberg_meta(
- 'table' = '${catalog_name}.${db_name}.${table_name}',
- 'query_type' = 'snapshots'
- ) order by committed_at desc;
+ SELECT snapshot_id FROM
${catalog_name}.${db_name}.${table_name}\$snapshots order by committed_at desc;
"""
def first_snapshot_id = iceberg_meta_result.get(0).get(0);
def time_travel = sql """
diff --git
a/regression-test/suites/external_table_p2/tvf/test_iceberg_meta.groovy
b/regression-test/suites/external_table_p2/tvf/test_iceberg_meta.groovy
deleted file mode 100644
index c6f16fe2395..00000000000
--- a/regression-test/suites/external_table_p2/tvf/test_iceberg_meta.groovy
+++ /dev/null
@@ -1,90 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-suite("test_iceberg_meta", "p2,external") {
- String suiteName = "test_iceberg_meta"
- Boolean ignoreP2 = true;
- if (ignoreP2) {
- logger.info("disable p2 test");
- return;
- }
-
- String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
- if (enabled != null && enabled.equalsIgnoreCase("true")) {
- String iceberg_catalog_name = "test_iceberg_meta_tvf"
- String extHiveHmsHost =
context.config.otherConfigs.get("extHiveHmsHost")
- String extHdfsPort = context.config.otherConfigs.get("extHdfsPort")
- String db = "multi_catalog"
- sql """drop catalog if exists ${iceberg_catalog_name};"""
- sql """
- create catalog if not exists ${iceberg_catalog_name} properties (
- 'type'='iceberg',
- 'iceberg.catalog.type'='hadoop',
- 'warehouse' =
'hdfs://${extHiveHmsHost}:${extHdfsPort}/usr/hive/warehouse/hadoop_catalog'
- );
- """
-
- sql """switch ${iceberg_catalog_name};"""
- sql """ use `${db}`; """
-
- order_qt_q01 """ select count(*) from iceberg_hadoop_catalog """
- order_qt_q02 """ select c_custkey from iceberg_hadoop_catalog group by
c_custkey order by c_custkey limit 7 """
-
- order_qt_tvf_1 """ select committed_at, snapshot_id, parent_id,
operation from iceberg_meta(
- "table" =
"${iceberg_catalog_name}.${db}.multi_partition",
- "query_type" = "snapshots");
- """
-
- order_qt_tvf_2 """ select committed_at, snapshot_id, parent_id,
operation from iceberg_meta(
- "table" =
"${iceberg_catalog_name}.${db}.multi_partition",
- "query_type" = "snapshots")
- where snapshot_id = 7235593032487457798;
- """
- String user = "${suiteName}_user"
- String pwd = 'C123_567p'
- try_sql("DROP USER ${user}")
- sql """CREATE USER '${user}' IDENTIFIED BY '${pwd}'"""
- //cloud-mode
- if (isCloudMode()) {
- def clusters = sql " SHOW CLUSTERS; "
- assertTrue(!clusters.isEmpty())
- def validCluster = clusters[0][0]
- sql """GRANT USAGE_PRIV ON CLUSTER `${validCluster}` TO
${user}""";
- }
-
- sql """grant select_priv on regression_test to ${user}"""
- connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
- test {
- sql """
- select committed_at, snapshot_id, parent_id, operation
from iceberg_meta(
- "table" =
"${iceberg_catalog_name}.${db}.multi_partition",
- "query_type" = "snapshots");
- """
- exception "denied"
- }
- }
- sql """grant select_priv on
${iceberg_catalog_name}.${db}.multi_partition to ${user}"""
- connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
- sql """
- select committed_at, snapshot_id, parent_id, operation from
iceberg_meta(
- "table" =
"${iceberg_catalog_name}.${db}.multi_partition",
- "query_type" = "snapshots");
- """
- }
- try_sql("DROP USER ${user}")
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]