This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 10b052f6fcf [fix](paimon) adapt FE for paimon 1.3.1 and iceberg 1.10.1
(#60876)
10b052f6fcf is described below
commit 10b052f6fcfae5a8c2af348ca5f666ec5a00952d
Author: Chenjunwei <[email protected]>
AuthorDate: Sun Mar 8 05:19:25 2026 +0800
[fix](paimon) adapt FE for paimon 1.3.1 and iceberg 1.10.1 (#60876)
## Summary
- bump `doris.hive.catalog.shade.version` to
`3.1.0-PAIMON_JUNWEI_SNAPSHOT`
- bump `iceberg.version` to `1.10.1`
- bump `paimon.version` to `1.3.1`
- adapt paimon metrics API changes in FE:
- `MetricRegistry` is now an interface (use `implements`)
- `withMetricsRegistry` renamed to `withMetricRegistry`
## Verification
- `cd fe &&
/mnt/disk2/yunyou/chenjunwei/doris_tools/apache-maven-3.9.9/bin/mvn
-DskipTests -pl fe-core -am compile`
- build result: SUCCESS
---------
Co-authored-by: Calvin Kirs <[email protected]>
---
.../paimon/profile/PaimonMetricRegistry.java | 4 +-
.../datasource/paimon/source/PaimonScanNode.java | 2 +-
.../doris/datasource/iceberg/IcebergUtilsTest.java | 44 +++++++++++-----------
.../paimon/source/PaimonScanNodeTest.java | 23 +++++++----
fe/pom.xml | 6 +--
5 files changed, 45 insertions(+), 34 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonMetricRegistry.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonMetricRegistry.java
index ba229fa61dd..4904c71faab 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonMetricRegistry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/profile/PaimonMetricRegistry.java
@@ -27,13 +27,13 @@ import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class PaimonMetricRegistry extends MetricRegistry {
+public class PaimonMetricRegistry implements MetricRegistry {
private static final Logger LOG =
LoggerFactory.getLogger(PaimonMetricRegistry.class);
private static final String TABLE_TAG_KEY = "table";
private final ConcurrentHashMap<String, MetricGroup> groups = new
ConcurrentHashMap<>();
@Override
- protected MetricGroup createMetricGroup(String name, Map<String, String>
tags) {
+ public MetricGroup createMetricGroup(String name, Map<String, String>
tags) {
MetricGroup group = new MetricGroupImpl(name, tags);
String table = tags == null ? "" : tags.getOrDefault(TABLE_TAG_KEY,
"");
groups.put(buildKey(name, table), group);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 6323972dd16..6e274dd98a2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -536,7 +536,7 @@ public class PaimonScanNode extends FileQueryScanNode {
.newScan();
PaimonMetricRegistry registry = new PaimonMetricRegistry();
if (scan instanceof InnerTableScan) {
- scan = ((InnerTableScan) scan).withMetricsRegistry(registry);
+ scan = ((InnerTableScan) scan).withMetricRegistry(registry);
}
List<org.apache.paimon.table.source.Split> splits =
scan.plan().splits();
PaimonScanMetricsReporter.report(source.getTargetTable(),
paimonTable.name(), registry);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
index 25fdc37fca4..60b39384f8e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
@@ -23,7 +23,6 @@ import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo;
import com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.GenericManifestFile;
import org.apache.iceberg.GenericPartitionFieldSummary;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.ManifestContent;
@@ -107,19 +106,19 @@ public class IcebergUtilsTest {
public void testGetMatchingManifest() {
// partition : 100 - 200
- GenericManifestFile f1 =
getGenericManifestFileForDataTypeWithPartitionSummary(
+ ManifestFile f1 = getManifestFileForDataTypeWithPartitionSummary(
"manifest_f1.avro",
Collections.singletonList(new GenericPartitionFieldSummary(
false, false, getByteBufferForLong(100),
getByteBufferForLong(200))));
// partition : 300 - 400
- GenericManifestFile f2 =
getGenericManifestFileForDataTypeWithPartitionSummary(
+ ManifestFile f2 = getManifestFileForDataTypeWithPartitionSummary(
"manifest_f2.avro",
Collections.singletonList(new GenericPartitionFieldSummary(
false, false, getByteBufferForLong(300),
getByteBufferForLong(400))));
// partition : 500 - 600
- GenericManifestFile f3 =
getGenericManifestFileForDataTypeWithPartitionSummary(
+ ManifestFile f3 = getManifestFileForDataTypeWithPartitionSummary(
"manifest_f3.avro",
Collections.singletonList(new GenericPartitionFieldSummary(
false, false, getByteBufferForLong(500),
getByteBufferForLong(600))));
@@ -204,25 +203,28 @@ public class IcebergUtilsTest {
return Conversions.toByteBuffer(Types.LongType.get(), num);
}
- private GenericManifestFile
getGenericManifestFileForDataTypeWithPartitionSummary(
+ private ManifestFile getManifestFileForDataTypeWithPartitionSummary(
String path,
List<PartitionFieldSummary> partitionFieldSummaries) {
- return new GenericManifestFile(
- path,
- 1024L,
- 0,
- ManifestContent.DATA,
- 1,
- 1,
- 123456789L,
- 2,
- 100,
- 0,
- 0,
- 0,
- 0,
- partitionFieldSummaries,
- null);
+ ManifestFile file = Mockito.mock(ManifestFile.class);
+ Mockito.when(file.path()).thenReturn(path);
+ Mockito.when(file.length()).thenReturn(1024L);
+ Mockito.when(file.partitionSpecId()).thenReturn(0);
+ Mockito.when(file.content()).thenReturn(ManifestContent.DATA);
+ Mockito.when(file.sequenceNumber()).thenReturn(1L);
+ Mockito.when(file.minSequenceNumber()).thenReturn(1L);
+ Mockito.when(file.snapshotId()).thenReturn(123456789L);
+ Mockito.when(file.partitions()).thenReturn(partitionFieldSummaries);
+ Mockito.when(file.addedFilesCount()).thenReturn(1);
+ Mockito.when(file.addedRowsCount()).thenReturn(100L);
+ Mockito.when(file.existingFilesCount()).thenReturn(0);
+ Mockito.when(file.existingRowsCount()).thenReturn(0L);
+ Mockito.when(file.deletedFilesCount()).thenReturn(0);
+ Mockito.when(file.deletedRowsCount()).thenReturn(0L);
+ Mockito.when(file.hasAddedFiles()).thenReturn(true);
+ Mockito.when(file.hasExistingFiles()).thenReturn(false);
+ Mockito.when(file.copy()).thenReturn(file);
+ return file;
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 77b2c474742..07dc64a8ec9 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -30,6 +30,7 @@ import org.apache.doris.qe.SessionVariable;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.RawFile;
@@ -65,8 +66,9 @@ public class PaimonScanNodeTest {
paimonScanNode.setSource(new PaimonSource());
- DataFileMeta dfm1 = DataFileMeta.forAppend("f1.parquet", 64 * 1024 *
1024, 1, SimpleStats.EMPTY_STATS, 1, 1, 1,
- Collections.emptyList(), null, null, null, null);
+ DataFileMeta dfm1 = DataFileMeta.forAppend("f1.parquet", 64L * 1024 *
1024, 1L, SimpleStats.EMPTY_STATS,
+ 1L, 1L, 1L, Collections.<String>emptyList(), null,
FileSource.APPEND,
+ Collections.<String>emptyList(), null, null,
Collections.<String>emptyList());
BinaryRow binaryRow1 = BinaryRow.singleColumn(1);
DataSplit ds1 = DataSplit.builder()
.rawConvertible(true)
@@ -76,8 +78,9 @@ public class PaimonScanNodeTest {
.withDataFiles(Collections.singletonList(dfm1))
.build();
- DataFileMeta dfm2 = DataFileMeta.forAppend("f2.parquet", 32 * 1024 *
1024, 2, SimpleStats.EMPTY_STATS, 1, 1, 1,
- Collections.emptyList(), null, null, null, null);
+ DataFileMeta dfm2 = DataFileMeta.forAppend("f2.parquet", 32L * 1024 *
1024, 2L, SimpleStats.EMPTY_STATS,
+ 1L, 1L, 1L, Collections.<String>emptyList(), null,
FileSource.APPEND,
+ Collections.<String>emptyList(), null, null,
Collections.<String>emptyList());
BinaryRow binaryRow2 = BinaryRow.singleColumn(1);
DataSplit ds2 = DataSplit.builder()
.rawConvertible(true)
@@ -106,8 +109,13 @@ public class PaimonScanNodeTest {
java.lang.reflect.Field field =
FileQueryScanNode.class.getDeclaredField("fileSplitter");
field.setAccessible(true);
field.set(spyPaimonScanNode, fileSplitter);
+
+ java.lang.reflect.Field storagePropertiesField =
+
PaimonScanNode.class.getDeclaredField("storagePropertiesMap");
+ storagePropertiesField.setAccessible(true);
+ storagePropertiesField.set(spyPaimonScanNode,
Collections.emptyMap());
} catch (NoSuchFieldException | IllegalAccessException e) {
- throw new RuntimeException("Failed to inject FileSplitter into
PaimonScanNode test", e);
+ throw new RuntimeException("Failed to inject test fields into
PaimonScanNode", e);
}
// Note: The original PaimonSource is sufficient for this test
@@ -390,8 +398,9 @@ public class PaimonScanNodeTest {
PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1),
desc, false, sv);
PaimonScanNode spyPaimonScanNode = Mockito.spy(paimonScanNode);
- DataFileMeta dfm = DataFileMeta.forAppend("f1.parquet", 64 * 1024 *
1024, 1, SimpleStats.EMPTY_STATS, 1, 1, 1,
- Collections.emptyList(), null, null, null, null);
+ DataFileMeta dfm = DataFileMeta.forAppend("f1.parquet", 64L * 1024 *
1024, 1L, SimpleStats.EMPTY_STATS,
+ 1L, 1L, 1L, Collections.<String>emptyList(), null,
FileSource.APPEND,
+ Collections.<String>emptyList(), null, null,
Collections.<String>emptyList());
BinaryRow binaryRow = BinaryRow.singleColumn(1);
DataSplit dataSplit = DataSplit.builder()
.rawConvertible(true)
diff --git a/fe/pom.xml b/fe/pom.xml
index 2621d70f3e7..61a81e3c7e3 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -252,7 +252,7 @@ under the License.
<module>fe-type</module>
</modules>
<properties>
-
<doris.hive.catalog.shade.version>3.1.0</doris.hive.catalog.shade.version>
+
<doris.hive.catalog.shade.version>3.1.1</doris.hive.catalog.shade.version>
<!-- iceberg 1.9.1 depends avro on 1.12 -->
<avro.version>1.12.1</avro.version>
<parquet.version>1.16.0</parquet.version>
@@ -358,7 +358,7 @@ under the License.
<!-- ATTN: avro version must be consistent with Iceberg version -->
<!-- Please modify iceberg.version and avro.version together,
you can find avro version info in iceberg mvn repository -->
- <iceberg.version>1.9.1</iceberg.version>
+ <iceberg.version>1.10.1</iceberg.version>
<!-- 0.56.1 has bug that "SplitMode" in query response may not be
set-->
<maxcompute.version>0.53.2-public</maxcompute.version>
<!-- Arrow 19.0.1 will MacOS compile error and decimal type error when
convert to Parquet
@@ -411,7 +411,7 @@ under the License.
<quartz.version>2.3.2</quartz.version>
<aircompressor.version>0.27</aircompressor.version>
<!-- paimon -->
- <paimon.version>1.1.1</paimon.version>
+ <paimon.version>1.3.1</paimon.version>
<disruptor.version>3.4.4</disruptor.version>
<!-- arrow flight sql -->
<arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]