This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new c06affb44 [lake/iceberg] Support partitioned table (#1565)
c06affb44 is described below
commit c06affb447d2f078ac107e7ed93f4364d0ba140a
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 19 21:29:42 2025 +0800
[lake/iceberg] Support partitioned table (#1565)
---
.../lake/iceberg/tiering/IcebergLakeWriter.java | 37 +----
.../fluss/lake/iceberg/tiering/RecordWriter.java | 10 +-
...ndOnlyWriter.java => AppendOnlyTaskWriter.java} | 37 ++---
.../iceberg/tiering/writer/DeltaTaskWriter.java | 32 ++--
.../writer/GenericRecordAppendOnlyWriter.java | 66 ++++++++
...taWriter.java => GenericRecordDeltaWriter.java} | 27 +++-
.../lake/iceberg/utils/IcebergConversions.java | 24 +++
.../fluss/lake/iceberg/IcebergTieringTest.java | 168 ++++++++++++++-------
.../lake/iceberg/utils/IcebergConversionsTest.java | 107 +++++++++++++
.../lake/paimon/tiering/PaimonTieringTest.java | 84 ++---------
10 files changed, 379 insertions(+), 213 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
index dd8e55e37..8c3feb64e 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
@@ -17,7 +17,7 @@
package com.alibaba.fluss.lake.iceberg.tiering;
-import com.alibaba.fluss.lake.iceberg.tiering.writer.AppendOnlyWriter;
+import com.alibaba.fluss.lake.iceberg.tiering.writer.AppendOnlyTaskWriter;
import com.alibaba.fluss.lake.iceberg.tiering.writer.DeltaTaskWriter;
import com.alibaba.fluss.lake.writer.LakeWriter;
import com.alibaba.fluss.lake.writer.WriterInitContext;
@@ -25,7 +25,6 @@ import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.record.LogRecord;
import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -37,7 +36,6 @@ import org.apache.iceberg.util.PropertyUtil;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import static
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
@@ -65,7 +63,6 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
private RecordWriter createRecordWriter(WriterInitContext
writerInitContext) {
Schema schema = icebergTable.schema();
List<Integer> equalityFieldIds = new
ArrayList<>(schema.identifierFieldIds());
- PartitionSpec spec = icebergTable.spec();
// Get target file size from table properties
long targetFileSize = targetFileSize(icebergTable);
@@ -79,34 +76,12 @@ public class IcebergLakeWriter implements
LakeWriter<IcebergWriteResult> {
.format(format)
.build();
- if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
- if (spec.isUnpartitioned()) {
- return new AppendOnlyWriter(
- icebergTable,
- writerInitContext.schema().getRowType(),
- writerInitContext.tableBucket(),
- null, // No partition for non-partitioned table
- Collections.emptyList(), // No partition keys
- format,
- outputFileFactory,
- targetFileSize);
- } else {
- return null;
- }
+ if (equalityFieldIds.isEmpty()) {
+ return new AppendOnlyTaskWriter(
+ icebergTable, writerInitContext, format,
outputFileFactory, targetFileSize);
} else {
- if (spec.isUnpartitioned()) {
- return new DeltaTaskWriter(
- icebergTable,
- writerInitContext.schema().getRowType(),
- writerInitContext.tableBucket(),
- null, // No partition for non-partitioned table
- Collections.emptyList(), // No partition keys);
- format,
- outputFileFactory,
- targetFileSize);
- } else {
- return null;
- }
+ return new DeltaTaskWriter(
+ icebergTable, writerInitContext, format,
outputFileFactory, targetFileSize);
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
index d6cde3394..444338e3e 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
@@ -26,30 +26,22 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
-import javax.annotation.Nullable;
-
-import java.util.List;
-
/** A base interface to write {@link LogRecord} to Iceberg. */
public abstract class RecordWriter implements AutoCloseable {
protected final TaskWriter<Record> taskWriter;
protected final Schema icebergSchema;
protected final int bucket;
- @Nullable protected final String partition;
protected final FlussRecordAsIcebergRecord flussRecordAsIcebergRecord;
public RecordWriter(
TaskWriter<Record> taskWriter,
Schema icebergSchema,
RowType flussRowType,
- TableBucket tableBucket,
- @Nullable String partition,
- List<String> partitionKeys) {
+ TableBucket tableBucket) {
this.taskWriter = taskWriter;
this.icebergSchema = icebergSchema;
this.bucket = tableBucket.getBucket();
- this.partition = partition; // null for non-partitioned tables
this.flussRecordAsIcebergRecord =
new FlussRecordAsIcebergRecord(
tableBucket.getBucket(), icebergSchema, flussRowType);
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
similarity index 74%
rename from
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
rename to
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
index 0b8ace18c..59c05f7d6 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
@@ -18,9 +18,8 @@
package com.alibaba.fluss.lake.iceberg.tiering.writer;
import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
-import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
import com.alibaba.fluss.record.LogRecord;
-import com.alibaba.fluss.types.RowType;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
@@ -29,48 +28,40 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
-import org.apache.iceberg.io.UnpartitionedWriter;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
/** A {@link RecordWriter} to write to Iceberg's append-only table. */
-public class AppendOnlyWriter extends RecordWriter {
+public class AppendOnlyTaskWriter extends RecordWriter {
- public AppendOnlyWriter(
+ public AppendOnlyTaskWriter(
Table icebergTable,
- RowType flussRowType,
- TableBucket tableBucket,
- @Nullable String partition,
- List<String> partitionKeys,
+ WriterInitContext writerInitContext,
FileFormat format,
OutputFileFactory outputFileFactory,
long targetFileSize) {
super(
- createTaskWriter(icebergTable, format, outputFileFactory,
targetFileSize),
+ createTaskWriter(
+ icebergTable, writerInitContext, format,
outputFileFactory, targetFileSize),
icebergTable.schema(),
- flussRowType,
- tableBucket,
- partition,
- partitionKeys);
+ writerInitContext.schema().getRowType(),
+ writerInitContext.tableBucket());
}
private static TaskWriter<Record> createTaskWriter(
Table icebergTable,
+ WriterInitContext writerInitContext,
FileFormat format,
OutputFileFactory outputFileFactory,
long targetFileSize) {
FileAppenderFactory<Record> fileAppenderFactory =
- new GenericAppenderFactory(icebergTable.schema());
-
- return new UnpartitionedWriter<>(
- icebergTable.spec(),
+ new GenericAppenderFactory(icebergTable.schema(),
icebergTable.spec());
+ return new GenericRecordAppendOnlyWriter(
+ icebergTable,
format,
fileAppenderFactory,
outputFileFactory,
icebergTable.io(),
- targetFileSize);
+ targetFileSize,
+ writerInitContext);
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
index 4f16a5ce5..573ca1265 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
@@ -18,9 +18,8 @@
package com.alibaba.fluss.lake.iceberg.tiering.writer;
import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
-import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
import com.alibaba.fluss.record.LogRecord;
-import com.alibaba.fluss.types.RowType;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
@@ -31,8 +30,6 @@ import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.List;
@@ -41,27 +38,24 @@ public class DeltaTaskWriter extends RecordWriter {
public DeltaTaskWriter(
Table icebergTable,
- RowType flussRowType,
- TableBucket tableBucket,
- @Nullable String partition,
- List<String> partitionKeys,
+ WriterInitContext writerInitContext,
FileFormat format,
OutputFileFactory outputFileFactory,
long targetFileSize) {
super(
- createTaskWriter(icebergTable, format, outputFileFactory,
targetFileSize),
+ createTaskWriter(
+ icebergTable, format, outputFileFactory,
targetFileSize, writerInitContext),
icebergTable.schema(),
- flussRowType,
- tableBucket,
- partition,
- partitionKeys);
+ writerInitContext.schema().getRowType(),
+ writerInitContext.tableBucket());
}
private static TaskWriter<Record> createTaskWriter(
Table icebergTable,
FileFormat format,
OutputFileFactory outputFileFactory,
- long targetFileSize) {
+ long targetFileSize,
+ WriterInitContext writerInitContext) {
int[] equalityFieldIds =
icebergTable.schema().identifierFieldIds().stream()
.mapToInt(Integer::intValue)
@@ -79,20 +73,20 @@ public class DeltaTaskWriter extends RecordWriter {
columns.add(icebergTable.schema().findField(fieldId).name());
}
Schema deleteSchema = icebergTable.schema().select(columns);
- return new GenericTaskDeltaWriter(
- icebergTable.schema(),
+ return new GenericRecordDeltaWriter(
+ icebergTable,
deleteSchema,
- icebergTable.spec(),
format,
appenderFactory,
outputFileFactory,
icebergTable.io(),
- targetFileSize);
+ targetFileSize,
+ writerInitContext);
}
@Override
public void write(LogRecord record) throws Exception {
- GenericTaskDeltaWriter deltaWriter = (GenericTaskDeltaWriter)
taskWriter;
+ GenericRecordDeltaWriter deltaWriter = (GenericRecordDeltaWriter)
taskWriter;
flussRecordAsIcebergRecord.setFlussRecord(record);
switch (record.getChangeType()) {
case INSERT:
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
new file mode 100644
index 000000000..864414ee7
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering.writer;
+
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+
+import java.io.IOException;
+
+import static
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toPartition;
+
+/** A generic task writer to write {@link Record} for append-only table. * */
+public class GenericRecordAppendOnlyWriter extends BaseTaskWriter<Record> {
+
+ private final RollingFileWriter currentWriter;
+
+ protected GenericRecordAppendOnlyWriter(
+ Table icebergTable,
+ FileFormat format,
+ FileAppenderFactory<Record> appenderFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSize,
+ WriterInitContext writerInitContext) {
+ super(icebergTable.spec(), format, appenderFactory, fileFactory, io,
targetFileSize);
+ currentWriter =
+ new RollingFileWriter(
+ toPartition(
+ icebergTable,
+ writerInitContext.partition(),
+ writerInitContext.tableBucket().getBucket()));
+ }
+
+ @Override
+ public void write(Record record) throws IOException {
+ currentWriter.write(record);
+ }
+
+ @Override
+ public void close() throws IOException {
+ currentWriter.close();
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
similarity index 73%
rename from
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
rename to
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
index 65ab644dd..5adae1a3a 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
@@ -17,11 +17,13 @@
package com.alibaba.fluss.lake.iceberg.tiering.writer;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
-import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileAppenderFactory;
@@ -30,21 +32,30 @@ import org.apache.iceberg.io.OutputFileFactory;
import java.io.IOException;
+import static
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toPartition;
+
/** A generic task equality delta writer. * */
-class GenericTaskDeltaWriter extends BaseTaskWriter<Record> {
+class GenericRecordDeltaWriter extends BaseTaskWriter<Record> {
private final GenericEqualityDeltaWriter deltaWriter;
- public GenericTaskDeltaWriter(
- Schema schema,
+ public GenericRecordDeltaWriter(
+ Table icebergTable,
Schema deleteSchema,
- PartitionSpec spec,
FileFormat format,
FileAppenderFactory<Record> appenderFactory,
OutputFileFactory fileFactory,
FileIO io,
- long targetFileSize) {
- super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
- this.deltaWriter = new GenericEqualityDeltaWriter(null, schema,
deleteSchema);
+ long targetFileSize,
+ WriterInitContext writerInitContext) {
+ super(icebergTable.spec(), format, appenderFactory, fileFactory, io,
targetFileSize);
+ this.deltaWriter =
+ new GenericEqualityDeltaWriter(
+ toPartition(
+ icebergTable,
+ writerInitContext.partition(),
+ writerInitContext.tableBucket().getBucket()),
+ icebergTable.schema(),
+ deleteSchema);
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
index de237c7bf..617f33959 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -19,8 +19,16 @@ package com.alibaba.fluss.lake.iceberg.utils;
import com.alibaba.fluss.metadata.TablePath;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
+import javax.annotation.Nullable;
+
+import static
com.alibaba.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
+
/** Utility class for static conversions between Fluss and Iceberg types. */
public class IcebergConversions {
@@ -28,4 +36,20 @@ public class IcebergConversions {
public static TableIdentifier toIceberg(TablePath tablePath) {
return TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName());
}
+
+ public static PartitionKey toPartition(
+ Table table, @Nullable String partitionName, int bucket) {
+ PartitionSpec partitionSpec = table.spec();
+ Schema schema = table.schema();
+ PartitionKey partitionKey = new PartitionKey(partitionSpec, schema);
+ int pos = 0;
+ if (partitionName != null) {
+ String[] partitionArr = partitionName.split("\\" +
PARTITION_SPEC_SEPARATOR);
+ for (String partition : partitionArr) {
+ partitionKey.set(pos++, partition);
+ }
+ }
+ partitionKey.set(pos, bucket);
+ return partitionKey;
+ }
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
index 7e15f41e1..8a666b677 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
@@ -36,6 +36,7 @@ import com.alibaba.fluss.row.GenericRow;
import com.alibaba.fluss.types.DataTypes;
import com.alibaba.fluss.utils.types.Tuple2;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
@@ -44,7 +45,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeEach;
@@ -77,11 +77,14 @@ import static com.alibaba.fluss.record.ChangeType.DELETE;
import static com.alibaba.fluss.record.ChangeType.INSERT;
import static com.alibaba.fluss.record.ChangeType.UPDATE_AFTER;
import static com.alibaba.fluss.record.ChangeType.UPDATE_BEFORE;
+import static org.apache.iceberg.expressions.Expressions.equal;
import static org.assertj.core.api.Assertions.assertThat;
/** Unit test for tiering to Iceberg via {@link IcebergLakeTieringFactory}. */
class IcebergTieringTest {
+ private static final int BUCKET_NUM = 3;
+
private @TempDir File tempWarehouseDir;
private IcebergLakeTieringFactory icebergLakeTieringFactory;
private Catalog icebergCatalog;
@@ -99,24 +102,40 @@ class IcebergTieringTest {
}
private static Stream<Arguments> tieringWriteArgs() {
- return Stream.of(Arguments.of(true), Arguments.of(false));
+ return Stream.of(
+ // isPrimaryKeyTable, isPartitionedTable
+ Arguments.of(true, true),
+ Arguments.of(true, false),
+ Arguments.of(false, true),
+ Arguments.of(false, false));
}
@ParameterizedTest
@MethodSource("tieringWriteArgs")
- void testTieringWriteTable(boolean isPrimaryKeyTable) throws Exception {
- int bucketNum = 3;
+ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean
isPartitionedTable)
+ throws Exception {
TablePath tablePath =
TablePath.of(
"iceberg",
String.format(
- "test_tiering_table_%s",
- isPrimaryKeyTable ? "primary_key" : "log"));
- createTable(tablePath, isPrimaryKeyTable);
+ "test_tiering_table_%s_%s",
+ isPrimaryKeyTable ? "pk" : "log",
+ isPartitionedTable ? "partitioned" :
"unpartitioned"));
+ createTable(tablePath, isPrimaryKeyTable, isPartitionedTable);
Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
- Map<Integer, List<LogRecord>> recordsByBucket = new HashMap<>();
+ Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new
HashMap<>();
+ Map<Long, String> partitionIdAndName =
+ isPartitionedTable
+ ? new HashMap<Long, String>() {
+ {
+ put(1L, "p1");
+ put(2L, "p2");
+ put(3L, "p3");
+ }
+ }
+ : Collections.singletonMap(null, null);
List<IcebergWriteResult> icebergWriteResults = new ArrayList<>();
SimpleVersionedSerializer<IcebergWriteResult> writeResultSerializer =
@@ -125,24 +144,30 @@ class IcebergTieringTest {
icebergLakeTieringFactory.getCommittableSerializer();
// first, write data
- for (int bucket = 0; bucket < bucketNum; bucket++) {
- try (LakeWriter<IcebergWriteResult> writer =
createLakeWriter(tablePath, bucket)) {
- Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords
=
- isPrimaryKeyTable
- ? genPrimaryKeyTableRecords(bucket)
- : genLogTableRecords(bucket, 10);
-
- List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
- List<LogRecord> expectRecords = writeAndExpectRecords.f1;
- recordsByBucket.put(bucket, expectRecords);
- for (LogRecord record : writtenRecords) {
- writer.write(record);
+ for (int bucket = 0; bucket < BUCKET_NUM; bucket++) {
+ for (Map.Entry<Long, String> entry :
partitionIdAndName.entrySet()) {
+ String partition = entry.getValue();
+ try (LakeWriter<IcebergWriteResult> writer =
+ createLakeWriter(tablePath, bucket, partition,
entry.getKey())) {
+ Tuple2<String, Integer> partitionBucket =
Tuple2.of(partition, bucket);
+ Tuple2<List<LogRecord>, List<LogRecord>>
writeAndExpectRecords =
+ isPrimaryKeyTable
+ ? genPrimaryKeyTableRecords(partition,
bucket)
+ : genLogTableRecords(partition, bucket,
10);
+
+ List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+ List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+ recordsByBucket.put(partitionBucket, expectRecords);
+
+ for (LogRecord record : writtenRecords) {
+ writer.write(record);
+ }
+ IcebergWriteResult result = writer.complete();
+ byte[] serialized =
writeResultSerializer.serialize(result);
+ icebergWriteResults.add(
+ writeResultSerializer.deserialize(
+ writeResultSerializer.getVersion(),
serialized));
}
- IcebergWriteResult result = writer.complete();
- byte[] serialized = writeResultSerializer.serialize(result);
- icebergWriteResults.add(
- writeResultSerializer.deserialize(
- writeResultSerializer.getVersion(),
serialized));
}
}
@@ -166,17 +191,18 @@ class IcebergTieringTest {
// then, check data
for (int bucket = 0; bucket < 3; bucket++) {
- List<LogRecord> expectRecords = recordsByBucket.get(bucket);
- CloseableIterator<Record> actualRecords =
getIcebergRows(icebergTable, bucket);
- if (isPrimaryKeyTable) {
- verifyTableRecords(actualRecords, expectRecords, bucket);
- } else {
- verifyTableRecords(actualRecords, expectRecords, bucket);
+ for (String partition : partitionIdAndName.values()) {
+ Tuple2<String, Integer> partitionBucket = Tuple2.of(partition,
bucket);
+ List<LogRecord> expectRecords =
recordsByBucket.get(partitionBucket);
+ CloseableIterator<Record> actualRecords =
+ getIcebergRows(icebergTable, partition, bucket);
+ verifyTableRecords(actualRecords, expectRecords, bucket,
partition);
}
}
}
- private LakeWriter<IcebergWriteResult> createLakeWriter(TablePath
tablePath, int bucket)
+ private LakeWriter<IcebergWriteResult> createLakeWriter(
+ TablePath tablePath, int bucket, @Nullable String partition,
@Nullable Long partitionId)
throws IOException {
return icebergLakeTieringFactory.createLakeWriter(
new WriterInitContext() {
@@ -187,13 +213,13 @@ class IcebergTieringTest {
@Override
public TableBucket tableBucket() {
- return new TableBucket(0, null, bucket);
+ return new TableBucket(0, partitionId, bucket);
}
@Nullable
@Override
public String partition() {
- return null;
+ return partition;
}
@Override
@@ -218,13 +244,17 @@ class IcebergTieringTest {
}
private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
- int bucket, int numRecords) {
+ @Nullable String partition, int bucket, int numRecords) {
List<LogRecord> logRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
GenericRow genericRow = new GenericRow(3);
genericRow.setField(0, i);
genericRow.setField(1, BinaryString.fromString("bucket" + bucket +
"_" + i));
- genericRow.setField(2, BinaryString.fromString("bucket" + bucket));
+ if (partition != null) {
+ genericRow.setField(2, BinaryString.fromString(partition));
+ } else {
+ genericRow.setField(2, BinaryString.fromString("bucket" +
bucket));
+ }
LogRecord logRecord =
new GenericRecord(
@@ -234,10 +264,11 @@ class IcebergTieringTest {
return Tuple2.of(logRecords, logRecords);
}
- private Tuple2<List<LogRecord>, List<LogRecord>>
genPrimaryKeyTableRecords(int bucket) {
+ private Tuple2<List<LogRecord>, List<LogRecord>> genPrimaryKeyTableRecords(
+ @Nullable String partition, int bucket) {
int offset = -1;
// gen +I, -U, +U, -D
- List<GenericRow> rows = genKvRow(bucket, 0, 0, 4);
+ List<GenericRow> rows = genKvRow(partition, bucket, 0, 0, 4);
List<LogRecord> writtenLogRecords =
new ArrayList<>(
Arrays.asList(
@@ -248,7 +279,7 @@ class IcebergTieringTest {
List<LogRecord> expectLogRecords = new ArrayList<>();
// gen +I, -U, +U
- rows = genKvRow(bucket, 1, 4, 7);
+ rows = genKvRow(partition, bucket, 1, 4, 7);
writtenLogRecords.addAll(
Arrays.asList(
toRecord(++offset, rows.get(0), INSERT),
@@ -257,7 +288,7 @@ class IcebergTieringTest {
expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() -
1));
// gen +I, +U
- rows = genKvRow(bucket, 2, 7, 9);
+ rows = genKvRow(partition, bucket, 2, 7, 9);
writtenLogRecords.addAll(
Arrays.asList(
toRecord(++offset, rows.get(0), INSERT),
@@ -265,23 +296,27 @@ class IcebergTieringTest {
expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() -
1));
// gen +I
- rows = genKvRow(bucket, 3, 9, 10);
+ rows = genKvRow(partition, bucket, 3, 9, 10);
writtenLogRecords.add(toRecord(++offset, rows.get(0), INSERT));
expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() -
1));
return Tuple2.of(writtenLogRecords, expectLogRecords);
}
- private List<GenericRow> genKvRow(int bucket, int key, int from, int to) {
+ private List<GenericRow> genKvRow(
+ @Nullable String partition, int bucket, int key, int from, int to)
{
List<GenericRow> rows = new ArrayList<>();
for (int i = from; i < to; i++) {
GenericRow genericRow;
- // Non-partitioned table
genericRow = new GenericRow(3);
genericRow.setField(0, key);
genericRow.setField(1, BinaryString.fromString("bucket" + bucket +
"_" + i));
- genericRow.setField(2, BinaryString.fromString("bucket" + bucket));
+ if (partition != null) {
+ genericRow.setField(2, BinaryString.fromString(partition));
+ } else {
+ genericRow.setField(2, BinaryString.fromString("bucket" +
bucket));
+ }
rows.add(genericRow);
}
return rows;
@@ -291,7 +326,8 @@ class IcebergTieringTest {
return new GenericRecord(offset, System.currentTimeMillis(),
changeType, row);
}
- private void createTable(TablePath tablePath, boolean isPrimaryTable)
throws Exception {
+ private void createTable(
+ TablePath tablePath, boolean isPrimaryTable, boolean
isPartitionedTable) {
Namespace namespace = Namespace.of(tablePath.getDatabaseName());
if (icebergCatalog instanceof SupportsNamespaces) {
SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog;
@@ -303,6 +339,10 @@ class IcebergTieringTest {
Set<Integer> identifierFieldIds = new HashSet<>();
if (isPrimaryTable) {
identifierFieldIds.add(1);
+ if (isPartitionedTable) {
+ // we use c3 as the partition column, so c3 should also be
included the pk
+ identifierFieldIds.add(3);
+ }
}
org.apache.iceberg.Schema schema =
@@ -310,7 +350,7 @@ class IcebergTieringTest {
Arrays.asList(
Types.NestedField.required(1, "c1",
Types.IntegerType.get()),
Types.NestedField.optional(2, "c2",
Types.StringType.get()),
- Types.NestedField.optional(3, "c3",
Types.StringType.get()),
+ Types.NestedField.required(3, "c3",
Types.StringType.get()),
Types.NestedField.required(
4, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
Types.NestedField.required(
@@ -319,22 +359,40 @@ class IcebergTieringTest {
6, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone())),
identifierFieldIds);
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+ if (isPartitionedTable) {
+ builder.identity("c3");
+ }
+
+ PartitionSpec partitionSpec;
+ if (isPrimaryTable) {
+ partitionSpec = builder.bucket("c1", BUCKET_NUM).build();
+ } else {
+ partitionSpec = builder.identity(BUCKET_COLUMN_NAME).build();
+ }
+
TableIdentifier tableId =
TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName());
- icebergCatalog.createTable(tableId, schema);
+ icebergCatalog.createTable(tableId, schema, partitionSpec);
}
- private CloseableIterator<Record> getIcebergRows(Table table, int bucket) {
- return IcebergGenerics.read(table)
- .where(Expressions.equal(BUCKET_COLUMN_NAME, bucket))
- .build()
- .iterator();
+ private CloseableIterator<Record> getIcebergRows(
+ Table table, @Nullable String partition, int bucket) {
+ IcebergGenerics.ScanBuilder scanBuilder =
+ IcebergGenerics.read(table).where(equal(BUCKET_COLUMN_NAME,
bucket));
+ if (partition != null) {
+ String partitionCol = table.spec().fields().get(0).name();
+ scanBuilder = scanBuilder.where(equal(partitionCol, partition));
+ }
+
+ return scanBuilder.build().iterator();
}
private void verifyTableRecords(
CloseableIterator<Record> actualRecords,
List<LogRecord> expectRecords,
- int expectBucket) {
+ int expectBucket,
+ @Nullable String partition) {
for (LogRecord expectRecord : expectRecords) {
Record actualRecord = actualRecords.next();
// check business columns:
@@ -343,6 +401,10 @@ class IcebergTieringTest {
.isEqualTo(expectRecord.getRow().getString(1).toString());
assertThat(actualRecord.get(2, String.class))
.isEqualTo(expectRecord.getRow().getString(2).toString());
+ if (partition != null) {
+ assertThat(actualRecord.get(2,
String.class)).isEqualTo(partition);
+ }
+
// check system columns: __bucket, __offset, __timestamp
assertThat(actualRecord.get(3)).isEqualTo(expectBucket);
assertThat(actualRecord.get(4)).isEqualTo(expectRecord.logOffset());
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversionsTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversionsTest.java
new file mode 100644
index 000000000..9e294f924
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversionsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.utils;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.iceberg.conf.IcebergConfiguration;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Map;
+
+import static
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** UT for {@link IcebergConversions}. */
+class IcebergConversionsTest {;
+
+ @Test
+ void testToPartition(@TempDir File tempWarehouseDir) {
+ Catalog catalog = getIcebergCatalog(tempWarehouseDir);
+
+ TablePath tablePath = TablePath.of("default",
"fluss_non_partitioned_table");
+ // for non-multiple partition column partitioned table
+ Table table = createIcebergTable(catalog, tablePath, false);
+ PartitionKey partitionKey = IcebergConversions.toPartition(table,
null, 1);
+ assertThat(partitionKey.toPath()).isEqualTo("__bucket=1");
+
+ // for multiple partition columns partitioned table
+ tablePath = TablePath.of("default", "fluss_partitioned_table");
+ table = createIcebergTable(catalog, tablePath, true);
+ partitionKey = IcebergConversions.toPartition(table, "china$region1",
2);
+
assertThat(partitionKey.toPath()).isEqualTo("country=china/region=region1/__bucket=2");
+ }
+
+ private Catalog getIcebergCatalog(File tempWarehouseDir) {
+ Configuration configuration = new Configuration();
+ configuration.setString("warehouse",
tempWarehouseDir.toURI().toString());
+ configuration.setString("catalog-impl",
"org.apache.iceberg.inmemory.InMemoryCatalog");
+ configuration.setString("name", "fluss_test_catalog");
+
+ Map<String, String> icebergProps = configuration.toMap();
+ String catalogName = icebergProps.getOrDefault("name",
"default_iceberg_catalog");
+ return buildIcebergCatalog(
+ catalogName, icebergProps,
IcebergConfiguration.from(configuration).get());
+ }
+
+ private Table createIcebergTable(
+ Catalog catalog, TablePath tablePath, boolean
isMultiplePartitionKeyTable) {
+ Schema schema =
+ new Schema(
+ required(1, "id", Types.LongType.get()),
+ optional(2, "name", Types.StringType.get()),
+ optional(3, "country", Types.StringType.get()),
+ optional(4, "region", Types.StringType.get()),
+ optional(5, "__bucket", Types.IntegerType.get()));
+ PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema);
+ if (isMultiplePartitionKeyTable) {
+ specBuilder.identity("country").identity("region");
+ }
+
+ // we always us __bucket as partition spec
+ specBuilder.identity("__bucket");
+
+ TableIdentifier tableIdentifier = toIceberg(tablePath);
+
+ try {
+ ((SupportsNamespaces)
catalog).createNamespace(tableIdentifier.namespace());
+ } catch (AlreadyExistsException ignore) {
+ // ignore
+ }
+
+ catalog.createTable(tableIdentifier, schema, specBuilder.build());
+
+ return catalog.loadTable(tableIdentifier);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index b98f8aceb..c323a584f 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -197,12 +197,7 @@ class PaimonTieringTest {
List<LogRecord> expectRecords =
recordsByBucket.get(partitionBucket);
CloseableIterator<InternalRow> actualRecords =
getPaimonRows(tablePath, partition, isPrimaryKeyTable,
bucket);
- if (isPrimaryKeyTable) {
- verifyPrimaryKeyTableRecord(actualRecords, expectRecords,
bucket, partition);
- } else {
- verifyLogTableRecords(
- actualRecords, expectRecords, bucket,
isPartitioned, partition);
- }
+ verifyTableRecords(actualRecords, expectRecords, bucket,
partition);
}
}
@@ -365,43 +360,6 @@ class PaimonTieringTest {
}
}
- private void verifyLogTableRecords(
- CloseableIterator<InternalRow> actualRecords,
- List<LogRecord> expectRecords,
- int expectBucket,
- boolean isPartitioned,
- @Nullable String partition)
- throws Exception {
- for (LogRecord expectRecord : expectRecords) {
- InternalRow actualRow = actualRecords.next();
- // check business columns:
-
assertThat(actualRow.getInt(0)).isEqualTo(expectRecord.getRow().getInt(0));
- assertThat(actualRow.getString(1).toString())
- .isEqualTo(expectRecord.getRow().getString(1).toString());
-
- if (isPartitioned) {
- // For partitioned tables, partition field comes from metadata
-
assertThat(actualRow.getString(2).toString()).isEqualTo(partition);
- // check system columns: __bucket, __offset, __timestamp
- assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
-
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
- assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
- .isEqualTo(expectRecord.timestamp());
- } else {
- // For non-partitioned tables, c3 is business data
- assertThat(actualRow.getString(2).toString())
-
.isEqualTo(expectRecord.getRow().getString(2).toString());
- // check system columns: __bucket, __offset, __timestamp
- assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
-
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
- assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
- .isEqualTo(expectRecord.timestamp());
- }
- }
- assertThat(actualRecords.hasNext()).isFalse();
- actualRecords.close();
- }
-
private void verifyLogTableRecordsMultiPartition(
CloseableIterator<InternalRow> actualRecords,
List<LogRecord> expectRecords,
@@ -460,7 +418,7 @@ class PaimonTieringTest {
actualRecords.close();
}
- private void verifyPrimaryKeyTableRecord(
+ private void verifyTableRecords(
CloseableIterator<InternalRow> actualRecords,
List<LogRecord> expectRecords,
int expectBucket,
@@ -473,26 +431,16 @@ class PaimonTieringTest {
assertThat(actualRow.getString(1).toString())
.isEqualTo(expectRecord.getRow().getString(1).toString());
+ assertThat(actualRow.getString(2).toString())
+ .isEqualTo(expectRecord.getRow().getString(2).toString());
if (partition != null) {
- // For partitioned tables, partition field should match record
data
- assertThat(actualRow.getString(2).toString())
-
.isEqualTo(expectRecord.getRow().getString(2).toString());
- // check system columns: __bucket, __offset, __timestamp
- assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
-
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
- long actualTimestamp = actualRow.getTimestamp(5,
6).getMillisecond();
- long expectedTimestamp = expectRecord.timestamp();
- assertThat(Math.abs(actualTimestamp -
expectedTimestamp)).isLessThanOrEqualTo(10L);
- } else {
- // For non-partitioned tables
- assertThat(actualRow.getString(2).toString())
-
.isEqualTo(expectRecord.getRow().getString(2).toString());
- // check system columns: __bucket, __offset, __timestamp
- assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
-
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
- assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
- .isEqualTo(expectRecord.timestamp());
+
assertThat(actualRow.getString(2).toString()).isEqualTo(partition);
}
+ // check system columns: __bucket, __offset, __timestamp
+ assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
+
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
+ assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
+ .isEqualTo(expectRecord.timestamp());
}
assertThat(actualRecords.hasNext()).isFalse();
actualRecords.close();
@@ -503,17 +451,13 @@ class PaimonTieringTest {
List<LogRecord> logRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
GenericRow genericRow;
+ // Partitioned table: include partition field in data
+ genericRow = new GenericRow(3); // c1, c2, c3(partition)
+ genericRow.setField(0, i);
+ genericRow.setField(1, BinaryString.fromString("bucket" + bucket +
"_" + i));
if (partition != null) {
- // Partitioned table: include partition field in data
- genericRow = new GenericRow(3); // c1, c2, c3(partition)
- genericRow.setField(0, i);
- genericRow.setField(1, BinaryString.fromString("bucket" +
bucket + "_" + i));
genericRow.setField(2, BinaryString.fromString(partition)); //
partition field
} else {
- // Non-partitioned table
- genericRow = new GenericRow(3);
- genericRow.setField(0, i);
- genericRow.setField(1, BinaryString.fromString("bucket" +
bucket + "_" + i));
genericRow.setField(2, BinaryString.fromString("bucket" +
bucket));
}
LogRecord logRecord =