This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 56bef2da7 [iceberg] minor improvements (#2015)
56bef2da7 is described below
commit 56bef2da78181a16f73f4f72ba0a19650533f020
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Nov 25 03:07:38 2025 +0100
[iceberg] minor improvements (#2015)
---
.../lake/iceberg/tiering/IcebergLakeCommitter.java | 4 ++--
.../bucketing/IcebergBucketingFunctionTest.java | 17 ++++++++---------
.../lake/iceberg/source/IcebergRecordReaderTest.java | 20 ++++++++++----------
.../testutils/FlinkIcebergTieringTestBase.java | 4 ++--
4 files changed, 22 insertions(+), 23 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index 6f054adbc..ec36cb4b9 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -212,13 +212,13 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
public void abort(IcebergCommittable committable) {
List<String> dataFilesToDelete =
committable.getDataFiles().stream()
- .map(file -> file.path().toString())
+ .map(ContentFile::location)
.collect(Collectors.toList());
CatalogUtil.deleteFiles(icebergTable.io(), dataFilesToDelete, "data
file", true);
List<String> deleteFilesToDelete =
committable.getDeleteFiles().stream()
- .map(file -> file.path().toString())
+ .map(ContentFile::location)
.collect(Collectors.toList());
CatalogUtil.deleteFiles(icebergTable.io(), deleteFilesToDelete,
"delete file", true);
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/bucketing/IcebergBucketingFunctionTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/bucketing/IcebergBucketingFunctionTest.java
index 77f6b1b75..6200effe7 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/bucketing/IcebergBucketingFunctionTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/bucketing/IcebergBucketingFunctionTest.java
@@ -31,7 +31,6 @@ import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -44,7 +43,7 @@ import static org.assertj.core.api.Assertions.assertThat;
class IcebergBucketingFunctionTest {
@Test
- void testIntegerHash() throws IOException {
+ void testIntegerHash() {
int testValue = 42;
int bucketNum = 10;
@@ -69,7 +68,7 @@ class IcebergBucketingFunctionTest {
}
@Test
- void testLongHash() throws IOException {
+ void testLongHash() {
long testValue = 1234567890123456789L;
int bucketNum = 10;
@@ -94,7 +93,7 @@ class IcebergBucketingFunctionTest {
}
@Test
- void testStringHash() throws IOException {
+ void testStringHash() {
String testValue = "Hello Iceberg, Fluss this side!";
int bucketNum = 10;
@@ -120,7 +119,7 @@ class IcebergBucketingFunctionTest {
}
@Test
- void testDecimalHash() throws IOException {
+ void testDecimalHash() {
BigDecimal testValue = new BigDecimal("123.45");
Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2);
int bucketNum = 10;
@@ -148,7 +147,7 @@ class IcebergBucketingFunctionTest {
}
@Test
- void testTimestampEncodingHash() throws IOException {
+ void testTimestampEncodingHash() {
// Iceberg expects microseconds for TIMESTAMP type
long millis = 1698235273182L;
int nanos = 123456;
@@ -179,7 +178,7 @@ class IcebergBucketingFunctionTest {
}
@Test
- void testDateHash() throws IOException {
+ void testDateHash() {
int dateValue = 19655;
int bucketNum = 10;
@@ -204,7 +203,7 @@ class IcebergBucketingFunctionTest {
}
@Test
- void testTimeHashing() throws IOException {
+ void testTimeHashing() {
// Fluss stores time as int (milliseconds since midnight)
int timeMillis = 34200000;
long timeMicros = timeMillis * 1000L; // Convert to microseconds for
Iceberg
@@ -231,7 +230,7 @@ class IcebergBucketingFunctionTest {
}
@Test
- void testBinaryEncoding() throws IOException {
+ void testBinaryEncoding() {
byte[] testValue = "Hello i only understand binary data".getBytes();
int bucketNum = 10;
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
index 0ae2c049c..cc86e2c8d 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
@@ -153,16 +153,16 @@ class IcebergRecordReaderTest extends
IcebergSourceTestBase {
List<Row> projectExpect = new ArrayList<>();
try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
for (FileScanTask task : fileScanTasks) {
- org.apache.iceberg.io.CloseableIterator<Record> iterator =
- reader.open(task).iterator();
- IcebergRecordAsFlussRow recordAsFlussRow = new
IcebergRecordAsFlussRow();
- projectExpect.addAll(
- convertToFlinkRow(
- projectFieldGetters,
- TransformingCloseableIterator.transform(
- CloseableIterator.wrap(iterator),
-
recordAsFlussRow::replaceIcebergRecord)));
- iterator.close();
+ try (org.apache.iceberg.io.CloseableIterator<Record> iterator =
+ reader.open(task).iterator()) {
+ IcebergRecordAsFlussRow recordAsFlussRow = new
IcebergRecordAsFlussRow();
+ projectExpect.addAll(
+ convertToFlinkRow(
+ projectFieldGetters,
+ TransformingCloseableIterator.transform(
+ CloseableIterator.wrap(iterator),
+
recordAsFlussRow::replaceIcebergRecord)));
+ }
}
}
assertThat(projectActual).containsExactlyInAnyOrderElementsOf(projectExpect);
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index e9bd27bd5..a2997f98c 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -437,7 +437,7 @@ public class FlinkIcebergTieringTestBase {
for (DataFile file : files) {
Iterable<Record> iterable =
-
Parquet.read(table.io().newInputFile(file.path().toString()))
+ Parquet.read(table.io().newInputFile(file.location()))
.project(table.schema())
.createReaderFunc(
fileSchema ->
@@ -471,7 +471,7 @@ public class FlinkIcebergTieringTestBase {
}
protected void checkSnapshotPropertyInIceberg(
- TablePath tablePath, Map<String, String> expectedProperties)
throws Exception {
+ TablePath tablePath, Map<String, String> expectedProperties) {
org.apache.iceberg.Table table =
icebergCatalog.loadTable(toIceberg(tablePath));
Snapshot snapshot = table.currentSnapshot();
assertThat(snapshot.summary()).containsAllEntriesOf(expectedProperties);