This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 56bef2da7 [iceberg] minor improvements (#2015)
56bef2da7 is described below

commit 56bef2da78181a16f73f4f72ba0a19650533f020
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Nov 25 03:07:38 2025 +0100

    [iceberg] minor improvements (#2015)
---
 .../lake/iceberg/tiering/IcebergLakeCommitter.java   |  4 ++--
 .../bucketing/IcebergBucketingFunctionTest.java      | 17 ++++++++---------
 .../lake/iceberg/source/IcebergRecordReaderTest.java | 20 ++++++++++----------
 .../testutils/FlinkIcebergTieringTestBase.java       |  4 ++--
 4 files changed, 22 insertions(+), 23 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index 6f054adbc..ec36cb4b9 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -212,13 +212,13 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
     public void abort(IcebergCommittable committable) {
         List<String> dataFilesToDelete =
                 committable.getDataFiles().stream()
-                        .map(file -> file.path().toString())
+                        .map(ContentFile::location)
                         .collect(Collectors.toList());
         CatalogUtil.deleteFiles(icebergTable.io(), dataFilesToDelete, "data 
file", true);
 
         List<String> deleteFilesToDelete =
                 committable.getDeleteFiles().stream()
-                        .map(file -> file.path().toString())
+                        .map(ContentFile::location)
                         .collect(Collectors.toList());
         CatalogUtil.deleteFiles(icebergTable.io(), deleteFilesToDelete, 
"delete file", true);
     }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/bucketing/IcebergBucketingFunctionTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/bucketing/IcebergBucketingFunctionTest.java
index 77f6b1b75..6200effe7 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/bucketing/IcebergBucketingFunctionTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/bucketing/IcebergBucketingFunctionTest.java
@@ -31,7 +31,6 @@ import org.apache.iceberg.transforms.Transforms;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -44,7 +43,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 class IcebergBucketingFunctionTest {
 
     @Test
-    void testIntegerHash() throws IOException {
+    void testIntegerHash() {
         int testValue = 42;
         int bucketNum = 10;
 
@@ -69,7 +68,7 @@ class IcebergBucketingFunctionTest {
     }
 
     @Test
-    void testLongHash() throws IOException {
+    void testLongHash() {
         long testValue = 1234567890123456789L;
         int bucketNum = 10;
 
@@ -94,7 +93,7 @@ class IcebergBucketingFunctionTest {
     }
 
     @Test
-    void testStringHash() throws IOException {
+    void testStringHash() {
         String testValue = "Hello Iceberg, Fluss this side!";
         int bucketNum = 10;
 
@@ -120,7 +119,7 @@ class IcebergBucketingFunctionTest {
     }
 
     @Test
-    void testDecimalHash() throws IOException {
+    void testDecimalHash() {
         BigDecimal testValue = new BigDecimal("123.45");
         Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2);
         int bucketNum = 10;
@@ -148,7 +147,7 @@ class IcebergBucketingFunctionTest {
     }
 
     @Test
-    void testTimestampEncodingHash() throws IOException {
+    void testTimestampEncodingHash() {
         // Iceberg expects microseconds for TIMESTAMP type
         long millis = 1698235273182L;
         int nanos = 123456;
@@ -179,7 +178,7 @@ class IcebergBucketingFunctionTest {
     }
 
     @Test
-    void testDateHash() throws IOException {
+    void testDateHash() {
         int dateValue = 19655;
         int bucketNum = 10;
 
@@ -204,7 +203,7 @@ class IcebergBucketingFunctionTest {
     }
 
     @Test
-    void testTimeHashing() throws IOException {
+    void testTimeHashing() {
         // Fluss stores time as int (milliseconds since midnight)
         int timeMillis = 34200000;
         long timeMicros = timeMillis * 1000L; // Convert to microseconds for 
Iceberg
@@ -231,7 +230,7 @@ class IcebergBucketingFunctionTest {
     }
 
     @Test
-    void testBinaryEncoding() throws IOException {
+    void testBinaryEncoding() {
         byte[] testValue = "Hello i only understand binary data".getBytes();
         int bucketNum = 10;
 
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
index 0ae2c049c..cc86e2c8d 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
@@ -153,16 +153,16 @@ class IcebergRecordReaderTest extends 
IcebergSourceTestBase {
         List<Row> projectExpect = new ArrayList<>();
         try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
             for (FileScanTask task : fileScanTasks) {
-                org.apache.iceberg.io.CloseableIterator<Record> iterator =
-                        reader.open(task).iterator();
-                IcebergRecordAsFlussRow recordAsFlussRow = new 
IcebergRecordAsFlussRow();
-                projectExpect.addAll(
-                        convertToFlinkRow(
-                                projectFieldGetters,
-                                TransformingCloseableIterator.transform(
-                                        CloseableIterator.wrap(iterator),
-                                        
recordAsFlussRow::replaceIcebergRecord)));
-                iterator.close();
+                try (org.apache.iceberg.io.CloseableIterator<Record> iterator =
+                        reader.open(task).iterator()) {
+                    IcebergRecordAsFlussRow recordAsFlussRow = new 
IcebergRecordAsFlussRow();
+                    projectExpect.addAll(
+                            convertToFlinkRow(
+                                    projectFieldGetters,
+                                    TransformingCloseableIterator.transform(
+                                            CloseableIterator.wrap(iterator),
+                                            
recordAsFlussRow::replaceIcebergRecord)));
+                }
             }
         }
         
assertThat(projectActual).containsExactlyInAnyOrderElementsOf(projectExpect);
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index e9bd27bd5..a2997f98c 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -437,7 +437,7 @@ public class FlinkIcebergTieringTestBase {
 
             for (DataFile file : files) {
                 Iterable<Record> iterable =
-                        
Parquet.read(table.io().newInputFile(file.path().toString()))
+                        Parquet.read(table.io().newInputFile(file.location()))
                                 .project(table.schema())
                                 .createReaderFunc(
                                         fileSchema ->
@@ -471,7 +471,7 @@ public class FlinkIcebergTieringTestBase {
     }
 
     protected void checkSnapshotPropertyInIceberg(
-            TablePath tablePath, Map<String, String> expectedProperties) 
throws Exception {
+            TablePath tablePath, Map<String, String> expectedProperties) {
         org.apache.iceberg.Table table = 
icebergCatalog.loadTable(toIceberg(tablePath));
         Snapshot snapshot = table.currentSnapshot();
         
assertThat(snapshot.summary()).containsAllEntriesOf(expectedProperties);

Reply via email to