This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 9cdc9b7ef [lake/lance] Lance lake writer and committer implementation
(#1441)
9cdc9b7ef is described below
commit 9cdc9b7ef596996a86b45ec472a71d9dca42e68c
Author: xx789 <[email protected]>
AuthorDate: Mon Aug 25 10:51:19 2025 +0800
[lake/lance] Lance lake writer and committer implementation (#1441)
---
fluss-lake/fluss-lake-lance/pom.xml | 2 +-
.../com/alibaba/fluss/lake/lance/LanceConfig.java | 54 +---
.../alibaba/fluss/lake/lance/LanceLakeCatalog.java | 24 +-
.../alibaba/fluss/lake/lance/LanceLakeStorage.java | 3 +-
.../fluss/lake/lance/tiering/ArrowWriter.java | 74 +++++
.../fluss/lake/lance/tiering/LanceArrowWriter.java | 130 +++++++++
.../lance/tiering/LanceCommittableSerializer.java | 67 +++++
.../lake/lance/tiering/LanceLakeCommitter.java | 162 +++++++++++
.../lance/tiering/LanceLakeTieringFactory.java | 60 ++++
.../fluss/lake/lance/tiering/LanceLakeWriter.java | 94 ++++++
.../lance/tiering/LanceWriteResultSerializer.java | 55 ++++
.../fluss/lake/lance/utils/LanceArrowUtils.java | 88 ++++++
.../lake/lance/utils/LanceDatasetAdapter.java | 40 +++
.../lake/lance/writers/ArrowBigIntWriter.java | 52 ++++
.../lake/lance/writers/ArrowBinaryWriter.java | 57 ++++
.../lake/lance/writers/ArrowBooleanWriter.java | 54 ++++
.../fluss/lake/lance/writers/ArrowDateWriter.java | 54 ++++
.../lake/lance/writers/ArrowDecimalWriter.java | 70 +++++
.../lake/lance/writers/ArrowDoubleWriter.java | 54 ++++
.../fluss/lake/lance/writers/ArrowFieldWriter.java | 63 ++++
.../fluss/lake/lance/writers/ArrowFloatWriter.java | 53 ++++
.../fluss/lake/lance/writers/ArrowIntWriter.java | 54 ++++
.../lake/lance/writers/ArrowSmallIntWriter.java | 54 ++++
.../fluss/lake/lance/writers/ArrowTimeWriter.java | 90 ++++++
.../lance/writers/ArrowTimestampLtzWriter.java | 99 +++++++
.../lance/writers/ArrowTimestampNtzWriter.java | 97 +++++++
.../lake/lance/writers/ArrowTinyIntWriter.java | 56 ++++
.../lake/lance/writers/ArrowVarBinaryWriter.java | 52 ++++
.../lake/lance/writers/ArrowVarCharWriter.java | 56 ++++
.../src/main/resources/META-INF/NOTICE | 2 +-
.../resources/META-INF/licenses/LICENSE.zstd-jni | 26 --
.../lake/lance/LakeEnabledTableCreateITCase.java | 27 +-
.../lance/testutils/FlinkLanceTieringTestBase.java | 219 ++++++++++++++
.../lake/lance/tiering/LanceTieringITCase.java | 148 ++++++++++
.../fluss/lake/lance/tiering/LanceTieringTest.java | 318 +++++++++++++++++++++
.../testutils/FlinkPaimonTieringTestBase.java | 1 -
36 files changed, 2497 insertions(+), 112 deletions(-)
diff --git a/fluss-lake/fluss-lake-lance/pom.xml
b/fluss-lake/fluss-lake-lance/pom.xml
index 65715230d..960a3d4db 100644
--- a/fluss-lake/fluss-lake-lance/pom.xml
+++ b/fluss-lake/fluss-lake-lance/pom.xml
@@ -33,7 +33,7 @@
<packaging>jar</packaging>
<properties>
- <lance.version>0.26.1</lance.version>
+ <lance.version>0.33.0</lance.version>
</properties>
<dependencies>
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java
index 5d7218d89..240b387e6 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceConfig.java
@@ -32,17 +32,9 @@ public class LanceConfig implements Serializable {
private static final String block_size = "block_size";
private static final String version = "version";
- private static final String index_cache_size = "index_cache_size";
- private static final String metadata_cache_size = "metadata_cache_size";
private static final String max_row_per_file = "max_row_per_file";
private static final String max_rows_per_group = "max_rows_per_group";
private static final String max_bytes_per_file = "max_bytes_per_file";
- private static final String ak = "access_key_id";
- private static final String sk = "secret_access_key";
- private static final String endpoint = "aws_endpoint";
- private static final String region = "aws_region";
- private static final String virtual_hosted_style =
"virtual_hosted_style_request";
- private static final String allow_http = "allow_http";
private static final String batch_size = "batch_size";
private static final String warehouse = "warehouse";
@@ -64,11 +56,17 @@ public class LanceConfig implements Serializable {
}
public static LanceConfig from(
- Map<String, String> properties, String databaseName, String
tableName) {
- if (!properties.containsKey(warehouse)) {
+ Map<String, String> clusterConf,
+ Map<String, String> tableCustomProperties,
+ String databaseName,
+ String tableName) {
+ if (!clusterConf.containsKey(warehouse)) {
throw new IllegalArgumentException("Missing required option " +
warehouse);
}
- return new LanceConfig(databaseName, tableName,
properties.get(warehouse), properties);
+ Map<String, String> options = new HashMap<>();
+ options.putAll(clusterConf);
+ options.putAll(tableCustomProperties);
+ return new LanceConfig(databaseName, tableName,
clusterConf.get(warehouse), options);
}
public static int getBatchSize(LanceConfig config) {
@@ -83,14 +81,6 @@ public class LanceConfig implements Serializable {
return options;
}
- public String getDatabaseName() {
- return databaseName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
public String getDatasetUri() {
return datasetUri;
}
@@ -104,12 +94,6 @@ public class LanceConfig implements Serializable {
if (maps.containsKey(version)) {
builder.setVersion(Integer.parseInt(maps.get(version)));
}
- if (maps.containsKey(index_cache_size)) {
-
builder.setIndexCacheSize(Integer.parseInt(maps.get(index_cache_size)));
- }
- if (maps.containsKey(metadata_cache_size)) {
-
builder.setMetadataCacheSize(Integer.parseInt(maps.get(metadata_cache_size)));
- }
builder.setStorageOptions(genStorageOptions(config));
return builder.build();
}
@@ -130,24 +114,8 @@ public class LanceConfig implements Serializable {
return builder.build();
}
- private static Map<String, String> genStorageOptions(LanceConfig config) {
- Map<String, String> storageOptions = new HashMap<>();
- Map<String, String> maps = config.getOptions();
- if (maps.containsKey(ak) && maps.containsKey(sk) &&
maps.containsKey(endpoint)) {
- storageOptions.put(ak, maps.get(ak));
- storageOptions.put(sk, maps.get(sk));
- storageOptions.put(endpoint, maps.get(endpoint));
- }
- if (maps.containsKey(region)) {
- storageOptions.put(region, maps.get(region));
- }
- if (maps.containsKey(virtual_hosted_style)) {
- storageOptions.put(virtual_hosted_style,
maps.get(virtual_hosted_style));
- }
- if (maps.containsKey(allow_http)) {
- storageOptions.put(allow_http, maps.get(allow_http));
- }
- return storageOptions;
+ public static Map<String, String> genStorageOptions(LanceConfig config) {
+ return config.getOptions();
}
@Override
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java
index 827c9582d..bb2df9389 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeCatalog.java
@@ -26,31 +26,14 @@ import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePath;
import com.lancedb.lance.WriteParams;
-import org.apache.arrow.vector.types.TimeUnit;
-import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import java.util.ArrayList;
import java.util.List;
-import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
-import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
-import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
-
/** A Lance implementation of {@link LakeCatalog}. */
public class LanceLakeCatalog implements LakeCatalog {
- private static final List<Field> SYSTEM_COLUMNS = new ArrayList<>();
-
- static {
- SYSTEM_COLUMNS.add(Field.nullable(BUCKET_COLUMN_NAME, new
ArrowType.Int(32, true)));
- SYSTEM_COLUMNS.add(Field.nullable(OFFSET_COLUMN_NAME, new
ArrowType.Int(64, true)));
- SYSTEM_COLUMNS.add(
- Field.nullable(
- TIMESTAMP_COLUMN_NAME,
- new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)));
- }
-
private final Configuration options;
public LanceLakeCatalog(Configuration config) {
@@ -67,7 +50,10 @@ public class LanceLakeCatalog implements LakeCatalog {
LanceConfig config =
LanceConfig.from(
- options.toMap(), tablePath.getDatabaseName(),
tablePath.getTableName());
+ options.toMap(),
+ tableDescriptor.getCustomProperties(),
+ tablePath.getDatabaseName(),
+ tablePath.getTableName());
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
List<Field> fields = new ArrayList<>();
@@ -75,8 +61,6 @@ public class LanceLakeCatalog implements LakeCatalog {
fields.addAll(
LanceArrowUtils.toArrowSchema(tableDescriptor.getSchema().getRowType())
.getFields());
- // add system metadata columns to schema
- fields.addAll(SYSTEM_COLUMNS);
try {
LanceDatasetAdapter.createDataset(config.getDatasetUri(), new
Schema(fields), params);
} catch (RuntimeException e) {
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
index 5864ccf77..c4b51cd9c 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/LanceLakeStorage.java
@@ -20,6 +20,7 @@ package com.alibaba.fluss.lake.lance;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
import com.alibaba.fluss.lake.lance.tiering.LanceCommittable;
+import com.alibaba.fluss.lake.lance.tiering.LanceLakeTieringFactory;
import com.alibaba.fluss.lake.lance.tiering.LanceWriteResult;
import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
@@ -35,7 +36,7 @@ public class LanceLakeStorage implements LakeStorage {
@Override
public LakeTieringFactory<LanceWriteResult, LanceCommittable>
createLakeTieringFactory() {
- throw new UnsupportedOperationException("Not implemented");
+ return new LanceLakeTieringFactory(config);
}
@Override
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java
new file mode 100644
index 000000000..053b348b3
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/ArrowWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+/** An Arrow writer for {@link InternalRow}. */
+public class ArrowWriter {
+ private final VectorSchemaRoot root;
+
+ private final ArrowFieldWriter<InternalRow>[] fieldWriters;
+
+ private int recordsCount;
+
+ /**
+ * Writer which serializes the Fluss rows to Arrow record batches.
+ *
+ * @param fieldWriters An array of writers which are responsible for the
serialization of each
+ * column of the rows
+ * @param root Container that holds a set of vectors for the rows
+ */
+ public ArrowWriter(ArrowFieldWriter<InternalRow>[] fieldWriters,
VectorSchemaRoot root) {
+ this.fieldWriters = fieldWriters;
+ this.root = root;
+ }
+
+ public static ArrowWriter create(VectorSchemaRoot root, RowType rowType) {
+ ArrowFieldWriter<InternalRow>[] fieldWriters =
+ new ArrowFieldWriter[root.getFieldVectors().size()];
+ for (int i = 0; i < fieldWriters.length; i++) {
+ FieldVector fieldVector = root.getVector(i);
+
+ fieldWriters[i] =
+ LanceArrowUtils.createArrowFieldWriter(fieldVector,
rowType.getTypeAt(i));
+ }
+ return new ArrowWriter(fieldWriters, root);
+ }
+
+ /** Writes the specified row which is serialized into Arrow format. */
+ public void writeRow(InternalRow row) {
+ for (int i = 0; i < fieldWriters.length; i++) {
+ fieldWriters[i].write(row, i, true);
+ }
+ recordsCount++;
+ }
+
+ public void finish() {
+ root.setRowCount(recordsCount);
+ for (ArrowFieldWriter<InternalRow> fieldWriter : fieldWriters) {
+ fieldWriter.finish();
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java
new file mode 100644
index 000000000..e2d0f4944
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceArrowWriter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.fluss.utils.Preconditions.checkArgument;
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/** A custom arrow reader that supports writes Fluss internal rows while
reading data in batches. */
+public class LanceArrowWriter extends ArrowReader {
+ private final Schema schema;
+ private final RowType rowType;
+ private final int batchSize;
+
+ private volatile boolean finished;
+
+ private final AtomicLong totalBytesRead = new AtomicLong();
+ private ArrowWriter arrowWriter = null;
+ private final AtomicInteger count = new AtomicInteger(0);
+ private final Semaphore writeToken;
+ private final Semaphore loadToken;
+
+ public LanceArrowWriter(
+ BufferAllocator allocator, Schema schema, int batchSize, RowType
rowType) {
+ super(allocator);
+ checkNotNull(schema);
+ checkArgument(batchSize > 0);
+ this.schema = schema;
+ this.rowType = rowType;
+ this.batchSize = batchSize;
+ this.writeToken = new Semaphore(0);
+ this.loadToken = new Semaphore(0);
+ }
+
+ void write(LogRecord row) {
+ checkNotNull(row);
+ try {
+ // wait util prepareLoadNextBatch to release write token,
+ writeToken.acquire();
+ arrowWriter.writeRow(row.getRow());
+ if (count.incrementAndGet() == batchSize) {
+ // notify loadNextBatch to take the batch
+ loadToken.release();
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ void setFinished() {
+ loadToken.release();
+ finished = true;
+ }
+
+ @Override
+ public void prepareLoadNextBatch() throws IOException {
+ super.prepareLoadNextBatch();
+ arrowWriter = ArrowWriter.create(this.getVectorSchemaRoot(), rowType);
+ // release batch size token for write
+ writeToken.release(batchSize);
+ }
+
+ @Override
+ public boolean loadNextBatch() throws IOException {
+ prepareLoadNextBatch();
+ try {
+ if (finished && count.get() == 0) {
+ return false;
+ }
+ // wait util batch if full or finished
+ loadToken.acquire();
+ arrowWriter.finish();
+ if (!finished) {
+ count.set(0);
+ return true;
+ } else {
+ // true if it has some rows and return false if there is no
record
+ if (count.get() > 0) {
+ count.set(0);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public long bytesRead() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected synchronized void closeReadSource() throws IOException {
+ // Implement if needed
+ }
+
+ @Override
+ protected Schema readSchema() {
+ return this.schema;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java
new file mode 100644
index 000000000..abe7efc83
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceCommittableSerializer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import com.lancedb.lance.FragmentMetadata;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/** The serializer of {@link LanceCommittable}. */
+public class LanceCommittableSerializer implements
SimpleVersionedSerializer<LanceCommittable> {
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(LanceCommittable lanceCommittable) throws
IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(lanceCommittable.committable());
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public LanceCommittable deserialize(int version, byte[] serialized) throws
IOException {
+ if (version != CURRENT_VERSION) {
+ throw new UnsupportedOperationException(
+ "Expecting LanceCommittable version to be "
+ + CURRENT_VERSION
+ + ", but found "
+ + version
+ + ".");
+ }
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ ObjectInputStream ois = new ObjectInputStream(bais)) {
+ //noinspection unchecked
+ return new LanceCommittable((List<FragmentMetadata>)
ois.readObject());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Couldn't deserialize LanceCommittable", e);
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java
new file mode 100644
index 000000000..3eae2c792
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeCommitter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.metadata.TablePath;
+import
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
+import com.lancedb.lance.Version;
+import org.apache.arrow.memory.RootAllocator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+
+/** Implementation of {@link LakeCommitter} for Lance. */
+public class LanceLakeCommitter implements LakeCommitter<LanceWriteResult,
LanceCommittable> {
+ private final LanceConfig config;
+ private static final String committerName = "commit-user";
+ private final RootAllocator allocator = new RootAllocator();
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public LanceLakeCommitter(Configuration options, TablePath tablePath) {
+ this.config =
+ LanceConfig.from(
+ options.toMap(),
+ Collections.emptyMap(),
+ tablePath.getDatabaseName(),
+ tablePath.getTableName());
+ }
+
+ @Override
+ public LanceCommittable toCommittable(List<LanceWriteResult>
lanceWriteResults)
+ throws IOException {
+ List<FragmentMetadata> fragments =
+ lanceWriteResults.stream()
+ .map(LanceWriteResult::commitMessage)
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ return new LanceCommittable(fragments);
+ }
+
+ @Override
+ public long commit(LanceCommittable committable, Map<String, String>
snapshotProperties)
+ throws IOException {
+ Map<String, String> properties = new HashMap<>(snapshotProperties);
+ properties.put(committerName, FLUSS_LAKE_TIERING_COMMIT_USER);
+ return LanceDatasetAdapter.commitAppend(config,
committable.committable(), properties);
+ }
+
+ @Override
+ public void abort(LanceCommittable committable) throws IOException {
+ // TODO lance does not have the API to proactively delete the written
files yet, see
+ // https://github.com/lancedb/lance/issues/4508
+ }
+
+ @SuppressWarnings("checkstyle:LocalVariableName")
+ @Nullable
+ @Override
+ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long
latestLakeSnapshotIdOfFluss)
+ throws IOException {
+ Tuple2<Version, Transaction> latestLakeSnapshotIdOfLake =
+
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
+
+ if (latestLakeSnapshotIdOfLake == null) {
+ return null;
+ }
+
+ // we get the latest snapshot committed by fluss,
+ // but the latest snapshot is not greater than
latestLakeSnapshotIdOfFluss, no any missing
+ // snapshot, return directly
+ if (latestLakeSnapshotIdOfFluss != null
+ && latestLakeSnapshotIdOfLake.f0.getId() <=
latestLakeSnapshotIdOfFluss) {
+ return null;
+ }
+
+ CommittedLakeSnapshot committedLakeSnapshot =
+ new
CommittedLakeSnapshot(latestLakeSnapshotIdOfLake.f0.getId());
+ String flussOffsetProperties =
+ latestLakeSnapshotIdOfLake
+ .f1
+ .transactionProperties()
+ .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+ for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
+ BucketOffset bucketOffset =
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+ if (bucketOffset.getPartitionId() != null) {
+ committedLakeSnapshot.addPartitionBucket(
+ bucketOffset.getPartitionId(),
+ bucketOffset.getPartitionQualifiedName(),
+ bucketOffset.getBucket(),
+ bucketOffset.getLogOffset());
+ } else {
+ committedLakeSnapshot.addBucket(
+ bucketOffset.getBucket(), bucketOffset.getLogOffset());
+ }
+ }
+ return committedLakeSnapshot;
+ }
+
+ @Nullable
+ private Tuple2<Version, Transaction>
getCommittedLatestSnapshotOfLake(String commitUser) {
+ ReadOptions.Builder builder = new ReadOptions.Builder();
+ builder.setStorageOptions(LanceConfig.genStorageOptions(config));
+ try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(),
builder.build())) {
+ List<Version> versions = dataset.listVersions();
+ for (int i = versions.size() - 1; i >= 0; i--) {
+ Version version = versions.get(i);
+ builder.setVersion((int) version.getId());
+ try (Dataset datasetRead =
+ Dataset.open(allocator, config.getDatasetUri(),
builder.build())) {
+ Transaction transaction =
datasetRead.readTransaction().orElse(null);
+ if (transaction != null
+ && commitUser.equals(
+
transaction.transactionProperties().get(committerName))) {
+ return Tuple2.of(version, transaction);
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ allocator.close();
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeTieringFactory.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeTieringFactory.java
new file mode 100644
index 000000000..1bd0894b2
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeTieringFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.CommitterInitContext;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.writer.LakeTieringFactory;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+
+import java.io.IOException;
+
+/** Implementation of {@link LakeTieringFactory} for Lance . */
+public class LanceLakeTieringFactory
+ implements LakeTieringFactory<LanceWriteResult, LanceCommittable> {
+ private final Configuration config;
+
+ public LanceLakeTieringFactory(Configuration config) {
+ this.config = config;
+ }
+
+ @Override
+ public LakeWriter<LanceWriteResult> createLakeWriter(WriterInitContext
writerInitContext)
+ throws IOException {
+ return new LanceLakeWriter(config, writerInitContext);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<LanceWriteResult>
getWriteResultSerializer() {
+ return new LanceWriteResultSerializer();
+ }
+
+ @Override
+ public LakeCommitter<LanceWriteResult, LanceCommittable>
createLakeCommitter(
+ CommitterInitContext committerInitContext) throws IOException {
+ return new LanceLakeCommitter(config,
committerInitContext.tablePath());
+ }
+
+ @Override
+ public SimpleVersionedSerializer<LanceCommittable>
getCommittableSerializer() {
+ return new LanceCommittableSerializer();
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java
new file mode 100644
index 000000000..73c0fdfdb
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceLakeWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.record.LogRecord;
+
+import com.lancedb.lance.FragmentMetadata;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+
+/** Implementation of {@link LakeWriter} for Lance. */
+public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
+ private final LanceArrowWriter arrowWriter;
+ private final FutureTask<List<FragmentMetadata>> fragmentCreationTask;
+
+ public LanceLakeWriter(Configuration options, WriterInitContext
writerInitContext)
+ throws IOException {
+ LanceConfig config =
+ LanceConfig.from(
+ options.toMap(),
+ writerInitContext.customProperties(),
+ writerInitContext.tablePath().getDatabaseName(),
+ writerInitContext.tablePath().getTableName());
+ int batchSize = LanceConfig.getBatchSize(config);
+ Optional<Schema> schema = LanceDatasetAdapter.getSchema(config);
+ if (!schema.isPresent()) {
+ throw new IOException("Fail to get dataset " +
config.getDatasetUri() + " in Lance.");
+ }
+
+ this.arrowWriter =
+ LanceDatasetAdapter.getArrowWriter(
+ schema.get(), batchSize,
writerInitContext.schema().getRowType());
+
+ WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+ Callable<List<FragmentMetadata>> fragmentCreator =
+ () ->
+ LanceDatasetAdapter.createFragment(
+ config.getDatasetUri(), arrowWriter, params);
+ fragmentCreationTask = new FutureTask<>(fragmentCreator);
+ Thread fragmentCreationThread = new Thread(fragmentCreationTask);
+ fragmentCreationThread.start();
+ }
+
+ @Override
+ public void write(LogRecord record) throws IOException {
+ arrowWriter.write(record);
+ }
+
+ @Override
+ public LanceWriteResult complete() throws IOException {
+ arrowWriter.setFinished();
+ try {
+ List<FragmentMetadata> fragmentMetadata =
fragmentCreationTask.get();
+ return new LanceWriteResult(fragmentMetadata);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for reader thread
to finish", e);
+ } catch (ExecutionException e) {
+ throw new IOException("Exception in reader thread", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ arrowWriter.close();
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java
new file mode 100644
index 000000000..81c1b8450
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/tiering/LanceWriteResultSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/** The {@link SimpleVersionedSerializer} for {@link LanceWriteResult}. */
+public class LanceWriteResultSerializer implements
SimpleVersionedSerializer<LanceWriteResult> {
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(LanceWriteResult lanceWriteResult) throws
IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos); ) {
+ oos.writeObject(lanceWriteResult);
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public LanceWriteResult deserialize(int version, byte[] serialized) throws
IOException {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ ObjectInputStream ois = new ObjectInputStream(bais)) {
+ return (LanceWriteResult) ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Couldn't deserialize LanceWriteResult", e);
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java
index cf92952c9..4de499ffe 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceArrowUtils.java
@@ -17,6 +17,23 @@
package com.alibaba.fluss.lake.lance.utils;
+import com.alibaba.fluss.lake.lance.writers.ArrowBigIntWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowBinaryWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowBooleanWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowDateWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowDecimalWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowDoubleWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowFieldWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowFloatWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowIntWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowSmallIntWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowTimeWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowTimestampLtzWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowTimestampNtzWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowTinyIntWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowVarBinaryWriter;
+import com.alibaba.fluss.lake.lance.writers.ArrowVarCharWriter;
+import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.types.BigIntType;
import com.alibaba.fluss.types.BinaryType;
import com.alibaba.fluss.types.BooleanType;
@@ -37,6 +54,24 @@ import com.alibaba.fluss.types.TimeType;
import com.alibaba.fluss.types.TimestampType;
import com.alibaba.fluss.types.TinyIntType;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
@@ -187,4 +222,57 @@ public class LanceArrowUtils {
"Unsupported data type %s currently.",
dataType.asSummaryString()));
}
}
+
+ private static int getPrecision(DecimalVector decimalVector) {
+ return decimalVector.getPrecision();
+ }
+
+ public static ArrowFieldWriter<InternalRow> createArrowFieldWriter(
+ ValueVector vector, DataType dataType) {
+ if (vector instanceof TinyIntVector) {
+ return ArrowTinyIntWriter.forField((TinyIntVector) vector);
+ } else if (vector instanceof SmallIntVector) {
+ return ArrowSmallIntWriter.forField((SmallIntVector) vector);
+ } else if (vector instanceof IntVector) {
+ return ArrowIntWriter.forField((IntVector) vector);
+ } else if (vector instanceof BigIntVector) {
+ return ArrowBigIntWriter.forField((BigIntVector) vector);
+ } else if (vector instanceof BitVector) {
+ return ArrowBooleanWriter.forField((BitVector) vector);
+ } else if (vector instanceof Float4Vector) {
+ return ArrowFloatWriter.forField((Float4Vector) vector);
+ } else if (vector instanceof Float8Vector) {
+ return ArrowDoubleWriter.forField((Float8Vector) vector);
+ } else if (vector instanceof VarCharVector) {
+ return ArrowVarCharWriter.forField((VarCharVector) vector);
+ } else if (vector instanceof FixedSizeBinaryVector) {
+ return ArrowBinaryWriter.forField((FixedSizeBinaryVector) vector);
+ } else if (vector instanceof VarBinaryVector) {
+ return ArrowVarBinaryWriter.forField((VarBinaryVector) vector);
+ } else if (vector instanceof DecimalVector) {
+ DecimalVector decimalVector = (DecimalVector) vector;
+ return ArrowDecimalWriter.forField(
+ decimalVector, getPrecision(decimalVector),
decimalVector.getScale());
+ } else if (vector instanceof DateDayVector) {
+ return ArrowDateWriter.forField((DateDayVector) vector);
+ } else if (vector instanceof TimeSecVector
+ || vector instanceof TimeMilliVector
+ || vector instanceof TimeMicroVector
+ || vector instanceof TimeNanoVector) {
+ return ArrowTimeWriter.forField(vector);
+ } else if (vector instanceof TimeStampVector
+ && ((ArrowType.Timestamp)
vector.getField().getType()).getTimezone() == null) {
+ int precision;
+ if (dataType instanceof LocalZonedTimestampType) {
+ precision = ((LocalZonedTimestampType)
dataType).getPrecision();
+ return ArrowTimestampLtzWriter.forField(vector, precision);
+ } else {
+ precision = ((TimestampType) dataType).getPrecision();
+ return ArrowTimestampNtzWriter.forField(vector, precision);
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Unsupported type %s.", dataType));
+ }
+ }
}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java
index ed50abcc9..54ddefbef 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/utils/LanceDatasetAdapter.java
@@ -18,14 +18,25 @@
package com.alibaba.fluss.lake.lance.utils;
import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.tiering.LanceArrowWriter;
+import com.alibaba.fluss.types.RowType;
import com.lancedb.lance.Dataset;
+import com.lancedb.lance.Fragment;
+import com.lancedb.lance.FragmentMetadata;
import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.operation.Append;
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
/** Lance dataset API adapter. */
@@ -46,4 +57,33 @@ public class LanceDatasetAdapter {
return Optional.empty();
}
}
+
+ public static long commitAppend(
+ LanceConfig config, List<FragmentMetadata> fragments, Map<String,
String> properties) {
+ String uri = config.getDatasetUri();
+ ReadOptions options = LanceConfig.genReadOptionFromConfig(config);
+ try (Dataset dataset = Dataset.open(allocator, uri, options)) {
+ Transaction transaction =
+ dataset.newTransactionBuilder()
+
.operation(Append.builder().fragments(fragments).build())
+ .transactionProperties(properties)
+ .build();
+ try (Dataset appendedDataset = transaction.commit()) {
+ // note: lance dataset version starts from 1
+ return appendedDataset.version();
+ }
+ }
+ }
+
+ public static LanceArrowWriter getArrowWriter(Schema schema, int
batchSize, RowType rowType) {
+ return new LanceArrowWriter(allocator, schema, batchSize, rowType);
+ }
+
+ public static List<FragmentMetadata> createFragment(
+ String datasetUri, ArrowReader reader, WriteParams params) {
+ try (ArrowArrayStream arrowStream =
ArrowArrayStream.allocateNew(allocator)) {
+ Data.exportArrayStream(allocator, reader, arrowStream);
+ return Fragment.create(datasetUri, arrowStream, params);
+ }
+ }
}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBigIntWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBigIntWriter.java
new file mode 100644
index 000000000..55026de5b
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBigIntWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.BigIntVector;
+
+/** {@link ArrowFieldWriter} for BigInt. */
+public class ArrowBigIntWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowBigIntWriter forField(BigIntVector bigIntVector) {
+ return new ArrowBigIntWriter(bigIntVector);
+ }
+
+ private ArrowBigIntWriter(BigIntVector bigIntVector) {
+ super(bigIntVector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ BigIntVector vector = (BigIntVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else if (handleSafe) {
+ vector.setSafe(getCount(), readLong(row, ordinal));
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private long readLong(InternalRow row, int ordinal) {
+ return row.getLong(ordinal);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBinaryWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBinaryWriter.java
new file mode 100644
index 000000000..b33fd83f4
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBinaryWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+
+/** {@link ArrowFieldWriter} for Binary. */
+public class ArrowBinaryWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowBinaryWriter forField(FixedSizeBinaryVector
binaryVector) {
+ return new ArrowBinaryWriter(binaryVector);
+ }
+
+ private final int byteWidth;
+
+ private ArrowBinaryWriter(FixedSizeBinaryVector binaryVector) {
+ super(binaryVector);
+ this.byteWidth = binaryVector.getByteWidth();
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ FixedSizeBinaryVector vector = (FixedSizeBinaryVector)
getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else if (handleSafe) {
+ vector.setSafe(getCount(), readBinary(row, ordinal));
+ } else {
+ vector.set(getCount(), readBinary(row, ordinal));
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private byte[] readBinary(InternalRow row, int ordinal) {
+ return row.getBinary(ordinal, byteWidth);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBooleanWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBooleanWriter.java
new file mode 100644
index 000000000..80b8e79d8
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowBooleanWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.BitVector;
+
+/** {@link ArrowFieldWriter} for Boolean. */
+public class ArrowBooleanWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowBooleanWriter forField(BitVector booleanVector) {
+ return new ArrowBooleanWriter(booleanVector);
+ }
+
+ private ArrowBooleanWriter(BitVector bitVector) {
+ super(bitVector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ BitVector vector = (BitVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else if (handleSafe) {
+ vector.setSafe(getCount(), readBoolean(row, ordinal) ? 1 : 0);
+ } else {
+ vector.set(getCount(), readBoolean(row, ordinal) ? 1 : 0);
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private boolean readBoolean(InternalRow row, int ordinal) {
+ return row.getBoolean(ordinal);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDateWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDateWriter.java
new file mode 100644
index 000000000..141df39a8
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDateWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.DateDayVector;
+
+/** {@link ArrowFieldWriter} for Date. */
+public class ArrowDateWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowDateWriter forField(DateDayVector dateDayVector) {
+ return new ArrowDateWriter(dateDayVector);
+ }
+
+ private ArrowDateWriter(DateDayVector dateDayVector) {
+ super(dateDayVector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ DateDayVector vector = (DateDayVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else if (handleSafe) {
+ vector.setSafe(getCount(), readDate(row, ordinal));
+ } else {
+ vector.set(getCount(), readDate(row, ordinal));
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private int readDate(InternalRow row, int ordinal) {
+ return row.getInt(ordinal);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDecimalWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDecimalWriter.java
new file mode 100644
index 000000000..f4e2e4ab9
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDecimalWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.DecimalVector;
+
+import java.math.BigDecimal;
+
+/** {@link ArrowFieldWriter} for Decimal. */
+public class ArrowDecimalWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowDecimalWriter forField(
+ DecimalVector decimalVector, int precision, int scale) {
+ return new ArrowDecimalWriter(decimalVector, precision, scale);
+ }
+
+ private final int precision;
+ private final int scale;
+
+ private ArrowDecimalWriter(DecimalVector decimalVector, int precision, int
scale) {
+ super(decimalVector);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ DecimalVector vector = (DecimalVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ BigDecimal bigDecimal = readDecimal(row, ordinal).toBigDecimal();
+ if (bigDecimal == null) {
+ vector.setNull(getCount());
+ } else {
+ if (handleSafe) {
+ vector.setSafe(getCount(), bigDecimal);
+ } else {
+ vector.set(getCount(), bigDecimal);
+ }
+ }
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private Decimal readDecimal(InternalRow row, int ordinal) {
+ return row.getDecimal(ordinal, precision, scale);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDoubleWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDoubleWriter.java
new file mode 100644
index 000000000..f3d54bdf3
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowDoubleWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.Float8Vector;
+
+/** {@link ArrowFieldWriter} for Double. */
+public class ArrowDoubleWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowDoubleWriter forField(Float8Vector doubleVector) {
+ return new ArrowDoubleWriter(doubleVector);
+ }
+
+ private ArrowDoubleWriter(Float8Vector doubleVector) {
+ super(doubleVector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ Float8Vector vector = (Float8Vector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else if (handleSafe) {
+ vector.setSafe(getCount(), readDouble(row, ordinal));
+ } else {
+ vector.set(getCount(), readDouble(row, ordinal));
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private double readDouble(InternalRow row, int ordinal) {
+ return row.getDouble(ordinal);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFieldWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFieldWriter.java
new file mode 100644
index 000000000..0278d2b29
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFieldWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import org.apache.arrow.vector.ValueVector;
+
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Base class for arrow field writer which is used to convert a field to an
Arrow format.
+ *
+ * @param <IN> Type of the input to write. Currently, it's always InternalRow,
may support
+ * InternalArray in the future.
+ */
+public abstract class ArrowFieldWriter<IN> {
+
+ /** Container which is used to store the written sequence of values of a
column. */
+ private final ValueVector valueVector;
+
+ private int count;
+
+ public ArrowFieldWriter(ValueVector valueVector) {
+ this.valueVector = checkNotNull(valueVector);
+ }
+
+ /** Returns the underlying container which stores the sequence of values
of a column. */
+ public ValueVector getValueVector() {
+ return valueVector;
+ }
+
+ /** Returns the current count of elements written. */
+ public int getCount() {
+ return count;
+ }
+
+ /** Sets the field value as the field at the specified ordinal of the
specified row. */
+ public abstract void doWrite(IN row, int ordinal, boolean handleSafe);
+
+ /** Writes the specified ordinal of the specified row. */
+ public void write(IN row, int ordinal, boolean handleSafe) {
+ doWrite(row, ordinal, handleSafe);
+ count++;
+ }
+
+ public void finish() {
+ valueVector.setValueCount(count);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFloatWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFloatWriter.java
new file mode 100644
index 000000000..e971e7735
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowFloatWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.Float4Vector;
+
+/** {@link ArrowFieldWriter} for Float. */
+public class ArrowFloatWriter extends ArrowFieldWriter<InternalRow> {
+ public static ArrowFloatWriter forField(Float4Vector float4Vector) {
+ return new ArrowFloatWriter(float4Vector);
+ }
+
+ private ArrowFloatWriter(Float4Vector float4Vector) {
+ super(float4Vector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ Float4Vector vector = (Float4Vector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else if (handleSafe) {
+ vector.setSafe(getCount(), readFloat(row, ordinal));
+ } else {
+ vector.set(getCount(), readFloat(row, ordinal));
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private float readFloat(InternalRow row, int ordinal) {
+ return row.getFloat(ordinal);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowIntWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowIntWriter.java
new file mode 100644
index 000000000..06f397b8e
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowIntWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.IntVector;
+
+/** {@link ArrowFieldWriter} for Int. */
+public class ArrowIntWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowIntWriter forField(IntVector intVector) {
+ return new ArrowIntWriter(intVector);
+ }
+
+ private ArrowIntWriter(IntVector intVector) {
+ super(intVector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ IntVector vector = (IntVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else if (handleSafe) {
+ vector.setSafe(getCount(), readInt(row, ordinal));
+ } else {
+ vector.set(getCount(), readInt(row, ordinal));
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ int readInt(InternalRow row, int ordinal) {
+ return row.getInt(ordinal);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowSmallIntWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowSmallIntWriter.java
new file mode 100644
index 000000000..3b410ed6a
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowSmallIntWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.SmallIntVector;
+
+/** {@link ArrowFieldWriter} for SmallInt. */
+public class ArrowSmallIntWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowSmallIntWriter forField(SmallIntVector smallIntVector) {
+ return new ArrowSmallIntWriter(smallIntVector);
+ }
+
+ private ArrowSmallIntWriter(SmallIntVector smallIntVector) {
+ super(smallIntVector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ SmallIntVector vector = (SmallIntVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else if (handleSafe) {
+ vector.setSafe(getCount(), readShort(row, ordinal));
+ } else {
+ vector.set(getCount(), readShort(row, ordinal));
+ }
+ }
+
+ public boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ public short readShort(InternalRow row, int ordinal) {
+ return row.getShort(ordinal);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimeWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimeWriter.java
new file mode 100644
index 000000000..fcfe9fecb
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimeWriter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.ValueVector;
+
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** {@link ArrowFieldWriter} for Time. */
+public class ArrowTimeWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowTimeWriter forField(ValueVector valueVector) {
+ return new ArrowTimeWriter(valueVector);
+ }
+
+ private ArrowTimeWriter(ValueVector valueVector) {
+ super(valueVector);
+ checkState(
+ valueVector instanceof TimeSecVector
+ || valueVector instanceof TimeMilliVector
+ || valueVector instanceof TimeMicroVector
+ || valueVector instanceof TimeNanoVector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ ValueVector valueVector = getValueVector();
+ if (isNullAt(row, ordinal)) {
+ ((BaseFixedWidthVector) valueVector).setNull(getCount());
+ } else if (valueVector instanceof TimeSecVector) {
+ int sec = readTime(row, ordinal) / 1000;
+ if (handleSafe) {
+ ((TimeSecVector) valueVector).setSafe(getCount(), sec);
+ } else {
+ ((TimeSecVector) valueVector).set(getCount(), sec);
+ }
+ } else if (valueVector instanceof TimeMilliVector) {
+ int ms = readTime(row, ordinal);
+ if (handleSafe) {
+ ((TimeMilliVector) valueVector).setSafe(getCount(), ms);
+ } else {
+ ((TimeMilliVector) valueVector).set(getCount(), ms);
+ }
+ } else if (valueVector instanceof TimeMicroVector) {
+ long microSec = readTime(row, ordinal) * 1000L;
+ if (handleSafe) {
+ ((TimeMicroVector) valueVector).setSafe(getCount(), microSec);
+ } else {
+ ((TimeMicroVector) valueVector).set(getCount(), microSec);
+ }
+ } else {
+ long nanoSec = readTime(row, ordinal) * 1000000L;
+ if (handleSafe) {
+ ((TimeNanoVector) valueVector).setSafe(getCount(), nanoSec);
+ } else {
+ ((TimeNanoVector) valueVector).set(getCount(), nanoSec);
+ }
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private int readTime(InternalRow row, int ordinal) {
+ return row.getInt(ordinal);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
new file mode 100644
index 000000000..3ea10d399
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampLtzWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** {@link ArrowFieldWriter} for TimestampLtz. */
+public class ArrowTimestampLtzWriter extends ArrowFieldWriter<InternalRow> {
+ public static ArrowTimestampLtzWriter forField(ValueVector valueVector,
int precision) {
+ return new ArrowTimestampLtzWriter(valueVector, precision);
+ }
+
+ private final int precision;
+
+ private ArrowTimestampLtzWriter(ValueVector valueVector, int precision) {
+ super(valueVector);
+ checkState(
+ valueVector instanceof TimeStampVector
+ && ((ArrowType.Timestamp)
valueVector.getField().getType()).getTimezone()
+ == null);
+ this.precision = precision;
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ TimeStampVector vector = (TimeStampVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ TimestampLtz timestamp = readTimestamp(row, ordinal);
+ if (vector instanceof TimeStampSecVector) {
+ long sec = timestamp.getEpochMillisecond() / 1000;
+ if (handleSafe) {
+ vector.setSafe(getCount(), sec);
+ } else {
+ vector.set(getCount(), sec);
+ }
+ } else if (vector instanceof TimeStampMilliVector) {
+ long ms = timestamp.getEpochMillisecond();
+ if (handleSafe) {
+ vector.setSafe(getCount(), ms);
+ } else {
+ vector.set(getCount(), ms);
+ }
+ } else if (vector instanceof TimeStampMicroVector) {
+ long microSec =
+ timestamp.getEpochMillisecond() * 1000
+ + timestamp.getNanoOfMillisecond() / 1000;
+ if (handleSafe) {
+ vector.setSafe(getCount(), microSec);
+ } else {
+ vector.set(getCount(), microSec);
+ }
+ } else {
+ long nanoSec =
+ timestamp.getEpochMillisecond() * 1_000_000
+ + timestamp.getNanoOfMillisecond();
+ if (handleSafe) {
+ vector.setSafe(getCount(), nanoSec);
+ } else {
+ vector.set(getCount(), nanoSec);
+ }
+ }
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private TimestampLtz readTimestamp(InternalRow row, int ordinal) {
+ return row.getTimestampLtz(ordinal, precision);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
new file mode 100644
index 000000000..3d129be98
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTimestampNtzWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampNtz;
+
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** {@link ArrowFieldWriter} for TimestampNtz. */
+public class ArrowTimestampNtzWriter extends ArrowFieldWriter<InternalRow> {
+ public static ArrowTimestampNtzWriter forField(ValueVector valueVector,
int precision) {
+ return new ArrowTimestampNtzWriter(valueVector, precision);
+ }
+
+ private final int precision;
+
+ private ArrowTimestampNtzWriter(ValueVector valueVector, int precision) {
+ super(valueVector);
+ checkState(
+ valueVector instanceof TimeStampVector
+ && ((ArrowType.Timestamp)
valueVector.getField().getType()).getTimezone()
+ == null);
+ this.precision = precision;
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ TimeStampVector vector = (TimeStampVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ TimestampNtz timestamp = readTimestamp(row, ordinal);
+ if (vector instanceof TimeStampSecVector) {
+ long sec = timestamp.getMillisecond() / 1000;
+ if (handleSafe) {
+ vector.setSafe(getCount(), sec);
+ } else {
+ vector.set(getCount(), sec);
+ }
+ } else if (vector instanceof TimeStampMilliVector) {
+ long ms = timestamp.getMillisecond();
+ if (handleSafe) {
+ vector.setSafe(getCount(), ms);
+ } else {
+ vector.set(getCount(), ms);
+ }
+ } else if (vector instanceof TimeStampMicroVector) {
+ long microSec =
+ timestamp.getMillisecond() * 1000 +
timestamp.getNanoOfMillisecond() / 1000;
+ if (handleSafe) {
+ vector.setSafe(getCount(), microSec);
+ } else {
+ vector.set(getCount(), microSec);
+ }
+ } else {
+ long nanoSec =
+ timestamp.getMillisecond() * 1_000_000 +
timestamp.getNanoOfMillisecond();
+ if (handleSafe) {
+ vector.setSafe(getCount(), nanoSec);
+ } else {
+ vector.set(getCount(), nanoSec);
+ }
+ }
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private TimestampNtz readTimestamp(InternalRow row, int ordinal) {
+ return row.getTimestampNtz(ordinal, precision);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTinyIntWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTinyIntWriter.java
new file mode 100644
index 000000000..805d24afe
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowTinyIntWriter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.TinyIntVector;
+
+/** {@link ArrowFieldWriter} for TinyInt. */
+public class ArrowTinyIntWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowTinyIntWriter forField(TinyIntVector tinyIntVector) {
+ return new ArrowTinyIntWriter(tinyIntVector);
+ }
+
+ private ArrowTinyIntWriter(TinyIntVector tinyIntVector) {
+ super(tinyIntVector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ TinyIntVector vector = (TinyIntVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ if (handleSafe) {
+ vector.setSafe(getCount(), readByte(row, ordinal));
+ } else {
+ vector.set(getCount(), readByte(row, ordinal));
+ }
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private byte readByte(InternalRow row, int ordinal) {
+ return row.getByte(ordinal);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
new file mode 100644
index 000000000..d3be0a653
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarBinaryWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.VarBinaryVector;
+
+/** {@link ArrowFieldWriter} for VarBinary. */
+public class ArrowVarBinaryWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowVarBinaryWriter forField(VarBinaryVector
varBinaryVector) {
+ return new ArrowVarBinaryWriter(varBinaryVector);
+ }
+
+ private ArrowVarBinaryWriter(VarBinaryVector varBinaryVector) {
+ super(varBinaryVector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ VarBinaryVector vector = (VarBinaryVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ vector.setSafe(getCount(), readBinary(row, ordinal));
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private byte[] readBinary(InternalRow row, int ordinal) {
+ return row.getBytes(ordinal);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarCharWriter.java
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarCharWriter.java
new file mode 100644
index 000000000..05b2d442c
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/main/java/com/alibaba/fluss/lake/lance/writers/ArrowVarCharWriter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.writers;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.arrow.vector.VarCharVector;
+
+import java.nio.ByteBuffer;
+
+/** {@link ArrowFieldWriter} for VarChar. */
+public class ArrowVarCharWriter extends ArrowFieldWriter<InternalRow> {
+
+ public static ArrowVarCharWriter forField(VarCharVector varCharVector) {
+ return new ArrowVarCharWriter(varCharVector);
+ }
+
+ private ArrowVarCharWriter(VarCharVector varCharVector) {
+ super(varCharVector);
+ }
+
+ @Override
+ public void doWrite(InternalRow row, int ordinal, boolean handleSafe) {
+ VarCharVector vector = (VarCharVector) getValueVector();
+ if (isNullAt(row, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ ByteBuffer buffer = readString(row, ordinal).wrapByteBuffer();
+ vector.setSafe(getCount(), buffer, buffer.position(),
buffer.remaining());
+ }
+ }
+
+ private boolean isNullAt(InternalRow row, int ordinal) {
+ return row.isNullAt(ordinal);
+ }
+
+ private BinaryString readString(InternalRow row, int ordinal) {
+ return row.getString(ordinal);
+ }
+}
diff --git a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE
b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE
index 786813d32..36053bfbc 100644
--- a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE
+++ b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/NOTICE
@@ -21,7 +21,7 @@ This project bundles the following dependencies under the
Apache Software Licens
- org.apache.arrow:arrow-memory-netty:15.0.0
- org.apache.arrow:arrow-vector:15.0.0
- org.questdb:jar-jni:1.1.1
-- com.lancedb:lance-core:0.26.1
+- com.lancedb:lance-core:0.33.0
- commons-codec:commons-codec:1.4
- com.google.flatbuffers:flatbuffers-java:23.5.26
diff --git
a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/licenses/LICENSE.zstd-jni
b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/licenses/LICENSE.zstd-jni
deleted file mode 100644
index 66abb8ae7..000000000
---
a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/licenses/LICENSE.zstd-jni
+++ /dev/null
@@ -1,26 +0,0 @@
-Zstd-jni: JNI bindings to Zstd Library
-
-Copyright (c) 2015-present, Luben Karavelov/ All rights reserved.
-
-BSD License
-
-Redistribution and use in source and binary forms, with or without
modification,
-are permitted provided that the following conditions are met:
-
-* Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
-
-* Redistributions in binary form must reproduce the above copyright notice,
this
- list of conditions and the following disclaimer in the documentation and/or
- other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR
-ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
-ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java
index 1b071e867..2be42fff4 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/LakeEnabledTableCreateITCase.java
@@ -45,10 +45,9 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import java.nio.file.Files;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
-import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
-import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
-import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -111,6 +110,8 @@ class LakeEnabledTableCreateITCase {
@Test
void testLogTable() throws Exception {
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("lance.batch_size", "256");
// test bucket key log table
TableDescriptor logTable =
TableDescriptor.builder()
@@ -134,11 +135,13 @@ class LakeEnabledTableCreateITCase {
.column("log_c16",
DataTypes.TIMESTAMP())
.build())
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .customProperties(customProperties)
.distributedBy(BUCKET_NUM, "log_c1", "log_c2")
.build();
TablePath logTablePath = TablePath.of(DATABASE, "log_table");
admin.createTable(logTablePath, logTable, false).get();
- LanceConfig config = LanceConfig.from(lanceConf.toMap(), DATABASE,
"log_table");
+ LanceConfig config =
+ LanceConfig.from(lanceConf.toMap(), customProperties,
DATABASE, "log_table");
// check the gotten log table
Field logC1 = new Field("log_c1", FieldType.nullable(new
ArrowType.Int(4 * 8, true)), null);
@@ -182,25 +185,11 @@ class LakeEnabledTableCreateITCase {
FieldType.nullable(new
ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
null);
- // for __bucket, __offset, __timestamp
- Field logC17 =
- new Field(
- BUCKET_COLUMN_NAME, FieldType.nullable(new
ArrowType.Int(32, true)), null);
- Field logC18 =
- new Field(
- OFFSET_COLUMN_NAME, FieldType.nullable(new
ArrowType.Int(64, true)), null);
- Field logC19 =
- new Field(
- TIMESTAMP_COLUMN_NAME,
- FieldType.nullable(new
ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
- null);
-
org.apache.arrow.vector.types.pojo.Schema expectedSchema =
new org.apache.arrow.vector.types.pojo.Schema(
Arrays.asList(
logC1, logC2, logC3, logC4, logC5, logC6,
logC7, logC8, logC9,
- logC10, logC11, logC12, logC13, logC14,
logC15, logC16, logC17,
- logC18, logC19));
+ logC10, logC11, logC12, logC13, logC14,
logC15, logC16));
assertThat(expectedSchema).isEqualTo(LanceDatasetAdapter.getSchema(config).get());
}
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
new file mode 100644
index 000000000..728e4a8a9
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.testutils;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.writer.AppendWriter;
+import com.alibaba.fluss.client.table.writer.TableWriter;
+import com.alibaba.fluss.client.table.writer.UpsertWriter;
+import com.alibaba.fluss.config.AutoPartitionTimeUnit;
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.exception.FlussRuntimeException;
+import com.alibaba.fluss.flink.tiering.LakeTieringJobBuilder;
+import com.alibaba.fluss.metadata.DataLakeFormat;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.server.replica.Replica;
+import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import com.alibaba.fluss.types.DataTypes;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for sync to lance by Flink. */
+public class FlinkLanceTieringTestBase {
+
+ protected static final String DEFAULT_DB = "fluss";
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setClusterConf(initConfig())
+ .setNumOfTabletServers(3)
+ .build();
+
+ protected StreamExecutionEnvironment execEnv;
+
+ protected static Connection conn;
+ protected static Admin admin;
+ protected static Configuration clientConf;
+ private static String warehousePath;
+
+ private static Configuration initConfig() {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+ // not to clean snapshots for test purpose
+ .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
Integer.MAX_VALUE);
+ conf.setString("datalake.format", "lance");
+ try {
+ warehousePath =
+ Files.createTempDirectory("fluss-testing-datalake-tiered")
+ .resolve("warehouse")
+ .toString();
+ } catch (Exception e) {
+ throw new FlussRuntimeException("Failed to create warehouse path");
+ }
+ conf.setString("datalake.lance.warehouse", warehousePath);
+ return conf;
+ }
+
+ @BeforeAll
+ protected static void beforeAll() {
+ clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+ conn = ConnectionFactory.createConnection(clientConf);
+ admin = conn.getAdmin();
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ execEnv.setParallelism(2);
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ @AfterAll
+ static void afterAll() throws Exception {
+ if (admin != null) {
+ admin.close();
+ admin = null;
+ }
+ if (conn != null) {
+ conn.close();
+ conn = null;
+ }
+ }
+
+ protected static Map<String, String> getLanceCatalogConf() {
+ Map<String, String> lanceConf = new HashMap<>();
+ lanceConf.put("warehouse", warehousePath);
+ return lanceConf;
+ }
+
+ protected long createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
+ throws Exception {
+ admin.createTable(tablePath, tableDescriptor, true).get();
+ return admin.getTableInfo(tablePath).get().getTableId();
+ }
+
+ protected long createLogTable(TablePath tablePath) throws Exception {
+ return createLogTable(tablePath, 1);
+ }
+
+ protected long createLogTable(TablePath tablePath, int bucketNum) throws
Exception {
+ return createLogTable(tablePath, bucketNum, false);
+ }
+
+ protected Replica getLeaderReplica(TableBucket tableBucket) {
+ return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+ }
+
+ protected void assertReplicaStatus(TableBucket tb, long
expectedLogEndOffset) {
+ retry(
+ Duration.ofMinutes(1),
+ () -> {
+ Replica replica = getLeaderReplica(tb);
+ // datalake snapshot id should be updated
+ assertThat(replica.getLogTablet().getLakeTableSnapshotId())
+ .isGreaterThanOrEqualTo(0);
+
assertThat(replica.getLakeLogEndOffset()).isEqualTo(expectedLogEndOffset);
+ });
+ }
+
+ protected long createLogTable(TablePath tablePath, int bucketNum, boolean
isPartitioned)
+ throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder().column("a", DataTypes.INT()).column("b",
DataTypes.STRING());
+
+ TableDescriptor.Builder tableBuilder =
+ TableDescriptor.builder()
+ .distributedBy(bucketNum, "a")
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+
+ if (isPartitioned) {
+ schemaBuilder.column("c", DataTypes.STRING());
+ tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true);
+ tableBuilder.partitionedBy("c");
+ tableBuilder.property(
+ ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR);
+ }
+ tableBuilder.schema(schemaBuilder.build());
+ return createTable(tablePath, tableBuilder.build());
+ }
+
+ protected void writeRows(TablePath tablePath, List<InternalRow> rows,
boolean append)
+ throws Exception {
+ try (Table table = conn.getTable(tablePath)) {
+ TableWriter tableWriter;
+ if (append) {
+ tableWriter = table.newAppend().createWriter();
+ } else {
+ tableWriter = table.newUpsert().createWriter();
+ }
+ for (InternalRow row : rows) {
+ if (tableWriter instanceof AppendWriter) {
+ ((AppendWriter) tableWriter).append(row);
+ } else {
+ ((UpsertWriter) tableWriter).upsert(row);
+ }
+ }
+ tableWriter.flush();
+ }
+ }
+
+ protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv)
throws Exception {
+ Configuration flussConfig = new Configuration(clientConf);
+ flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L));
+ return LakeTieringJobBuilder.newBuilder(
+ execEnv,
+ flussConfig,
+ Configuration.fromMap(getLanceCatalogConf()),
+ DataLakeFormat.LANCE.toString())
+ .build();
+ }
+
+ protected void waitUntilSnapshot(long tableId, int bucketNum, long
snapshotId) {
+ for (int i = 0; i < bucketNum; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, i);
+ FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java
new file mode 100644
index 000000000..ee54dfd92
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringITCase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.testutils.FlinkLanceTieringTestBase;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.ReadOptions;
+import com.lancedb.lance.Transaction;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for tiering tables to lance. */
+class LanceTieringITCase extends FlinkLanceTieringTestBase {
+ protected static final String DEFAULT_DB = "fluss";
+ private static StreamExecutionEnvironment execEnv;
+ private static Configuration lanceConf;
+ private static final RootAllocator allocator = new RootAllocator();
+
+ @BeforeAll
+ protected static void beforeAll() {
+ FlinkLanceTieringTestBase.beforeAll();
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setParallelism(2);
+ execEnv.enableCheckpointing(1000);
+ lanceConf = Configuration.fromMap(getLanceCatalogConf());
+ }
+
+ @Test
+ void testTiering() throws Exception {
+ // create log table
+ TablePath t1 = TablePath.of(DEFAULT_DB, "logTable");
+ long t1Id = createLogTable(t1);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+ List<InternalRow> flussRows = new ArrayList<>();
+ // write records
+ for (int i = 0; i < 10; i++) {
+ List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
+ flussRows.addAll(rows);
+ // write records
+ writeRows(t1, rows, true);
+ }
+
+ // then start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ // check the status of replica after synced;
+ // note: we can't update log start offset for unaware bucket mode log
table
+ assertReplicaStatus(t1Bucket, 30);
+
+ LanceConfig config =
+ LanceConfig.from(
+ lanceConf.toMap(),
+ Collections.emptyMap(),
+ t1.getDatabaseName(),
+ t1.getTableName());
+
+ // check data in lance
+ checkDataInLanceAppendOnlyTable(config, flussRows);
+ // check snapshot property in lance
+ Map<String, String> properties =
+ new HashMap<String, String>() {
+ {
+ put(
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ "[{\"bucket_id\":0,\"log_offset\":30}]");
+ put("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+ }
+ };
+ checkSnapshotPropertyInLance(config, properties);
+
+ jobClient.cancel().get();
+ }
+
+ private void checkSnapshotPropertyInLance(
+ LanceConfig config, Map<String, String> expectedProperties) throws
Exception {
+ ReadOptions.Builder builder = new ReadOptions.Builder();
+ builder.setStorageOptions(LanceConfig.genStorageOptions(config));
+ try (Dataset dataset = Dataset.open(allocator, config.getDatasetUri(),
builder.build())) {
+ Transaction transaction = dataset.readTransaction().orElse(null);
+ assertThat(transaction).isNotNull();
+
assertThat(transaction.transactionProperties()).isEqualTo(expectedProperties);
+ }
+ }
+
+ private void checkDataInLanceAppendOnlyTable(LanceConfig config,
List<InternalRow> expectedRows)
+ throws Exception {
+ try (Dataset dataset =
+ Dataset.open(
+ allocator,
+ config.getDatasetUri(),
+ LanceConfig.genReadOptionFromConfig(config))) {
+ ArrowReader reader = dataset.newScan().scanBatches();
+ VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+ reader.loadNextBatch();
+ Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
+ int rowCount = readerRoot.getRowCount();
+ for (int i = 0; i < rowCount; i++) {
+ InternalRow flussRow = flussRowIterator.next();
+ assertThat((int) (readerRoot.getVector(0).getObject(i)))
+ .isEqualTo(flussRow.getInt(0));
+ assertThat(((VarCharVector)
readerRoot.getVector(1)).getObject(i).toString())
+ .isEqualTo(flussRow.getString(1).toString());
+ }
+ assertThat(reader.loadNextBatch()).isFalse();
+ assertThat(flussRowIterator.hasNext()).isFalse();
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java
new file mode 100644
index 000000000..60b7a330e
--- /dev/null
+++
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.lance.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.lance.LanceConfig;
+import com.alibaba.fluss.lake.lance.utils.LanceArrowUtils;
+import com.alibaba.fluss.lake.lance.utils.LanceDatasetAdapter;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import com.lancedb.lance.Dataset;
+import com.lancedb.lance.WriteParams;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for tiering to Lance via {@link LanceLakeTieringFactory}. */
+class LanceTieringTest {
+ private @TempDir File tempWarehouseDir;
+ private LanceLakeTieringFactory lanceLakeTieringFactory;
+ private Configuration configuration;
+
+ @BeforeEach
+ void beforeEach() {
+ configuration = new Configuration();
+ configuration.setString("warehouse", tempWarehouseDir.toString());
+ lanceLakeTieringFactory = new LanceLakeTieringFactory(configuration);
+ }
+
+ private static Stream<Arguments> tieringWriteArgs() {
+ return Stream.of(Arguments.of(false), Arguments.of(true));
+ }
+
+ @ParameterizedTest
+ @MethodSource("tieringWriteArgs")
+ void testTieringWriteTable(boolean isPartitioned) throws Exception {
+ int bucketNum = 3;
+ TablePath tablePath = TablePath.of("lance", "logTable");
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("lance.batch_size", "256");
+ LanceConfig config =
+ LanceConfig.from(
+ configuration.toMap(),
+ customProperties,
+ tablePath.getDatabaseName(),
+ tablePath.getTableName());
+ Schema schema = createTable(config);
+
+ List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
+ SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
+ lanceLakeTieringFactory.getWriteResultSerializer();
+ SimpleVersionedSerializer<LanceCommittable> committableSerializer =
+ lanceLakeTieringFactory.getCommittableSerializer();
+
+ try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+ createLakeCommitter(tablePath)) {
+ // should no any missing snapshot
+ assertThat(lakeCommitter.getMissingLakeSnapshot(2L)).isNull();
+ }
+
+ Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new
HashMap<>();
+ Map<Long, String> partitionIdAndName =
+ isPartitioned
+ ? new HashMap<Long, String>() {
+ {
+ put(1L, "p1");
+ put(2L, "p2");
+ put(3L, "p3");
+ }
+ }
+ : Collections.singletonMap(null, null);
+ List<String> partitionKeys = isPartitioned ? Arrays.asList("c3") :
null;
+ Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+ // first, write data
+ for (int bucket = 0; bucket < bucketNum; bucket++) {
+ for (Map.Entry<Long, String> entry :
partitionIdAndName.entrySet()) {
+ String partition = entry.getValue();
+ try (LakeWriter<LanceWriteResult> lakeWriter =
+ createLakeWriter(tablePath, bucket, partition, schema,
customProperties)) {
+ Tuple2<String, Integer> partitionBucket =
Tuple2.of(partition, bucket);
+ Tuple2<List<LogRecord>, List<LogRecord>>
writeAndExpectRecords =
+ genLogTableRecords(partition, bucket, 10);
+ List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+ List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+ recordsByBucket.put(partitionBucket, expectRecords);
+ tableBucketOffsets.put(new TableBucket(0, entry.getKey(),
bucket), 10L);
+ for (LogRecord logRecord : writtenRecords) {
+ lakeWriter.write(logRecord);
+ }
+ // serialize/deserialize writeResult
+ LanceWriteResult lanceWriteResult = lakeWriter.complete();
+ byte[] serialized =
writeResultSerializer.serialize(lanceWriteResult);
+ lanceWriteResults.add(
+ writeResultSerializer.deserialize(
+ writeResultSerializer.getVersion(),
serialized));
+ }
+ }
+ }
+
+ // second, commit data
+ try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+ createLakeCommitter(tablePath)) {
+ // serialize/deserialize committable
+ LanceCommittable lanceCommittable =
lakeCommitter.toCommittable(lanceWriteResults);
+ byte[] serialized =
committableSerializer.serialize(lanceCommittable);
+ lanceCommittable =
+ committableSerializer.deserialize(
+ committableSerializer.getVersion(), serialized);
+ long snapshot =
+ lakeCommitter.commit(
+ lanceCommittable,
+ toBucketOffsetsProperty(
+ tableBucketOffsets, partitionIdAndName,
partitionKeys));
+ // lance dataset version starts from 1
+ assertThat(snapshot).isEqualTo(2);
+ }
+
+ try (Dataset dataset =
+ Dataset.open(
+ new RootAllocator(),
+ config.getDatasetUri(),
+ LanceConfig.genReadOptionFromConfig(config))) {
+ ArrowReader reader = dataset.newScan().scanBatches();
+ VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();
+
+ // then, check data
+ for (int bucket = 0; bucket < 3; bucket++) {
+ for (String partition : partitionIdAndName.values()) {
+ reader.loadNextBatch();
+ Tuple2<String, Integer> partitionBucket =
Tuple2.of(partition, bucket);
+ List<LogRecord> expectRecords =
recordsByBucket.get(partitionBucket);
+ verifyLogTableRecords(
+ readerRoot, expectRecords, bucket, isPartitioned,
partition);
+ }
+ }
+ assertThat(reader.loadNextBatch()).isFalse();
+ }
+
+ // then, let's verify getMissingLakeSnapshot works
+ try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
+ createLakeCommitter(tablePath)) {
+ // use snapshot id 1 as the known snapshot id
+ CommittedLakeSnapshot committedLakeSnapshot =
lakeCommitter.getMissingLakeSnapshot(1L);
+ assertThat(committedLakeSnapshot).isNotNull();
+ Map<Tuple2<Long, Integer>, Long> offsets =
committedLakeSnapshot.getLogEndOffsets();
+ for (int bucket = 0; bucket < 3; bucket++) {
+ for (Long partitionId : partitionIdAndName.keySet()) {
+ // we only write 10 records, so expected log offset should
be 10
+ assertThat(offsets.get(Tuple2.of(partitionId,
bucket))).isEqualTo(10);
+ }
+ }
+
assertThat(committedLakeSnapshot.getLakeSnapshotId()).isEqualTo(2L);
+
+ // use null as the known snapshot id
+ CommittedLakeSnapshot committedLakeSnapshot2 =
+ lakeCommitter.getMissingLakeSnapshot(null);
+
assertThat(committedLakeSnapshot2).isEqualTo(committedLakeSnapshot);
+
+ // use snapshot id 2 as the known snapshot id
+ committedLakeSnapshot = lakeCommitter.getMissingLakeSnapshot(2L);
+ // no any missing committed offset since the latest snapshot is 2L
+ assertThat(committedLakeSnapshot).isNull();
+ }
+ }
+
+ private void verifyLogTableRecords(
+ VectorSchemaRoot root,
+ List<LogRecord> expectRecords,
+ int expectBucket,
+ boolean isPartitioned,
+ @Nullable String partition)
+ throws Exception {
+ assertThat(root.getRowCount()).isEqualTo(expectRecords.size());
+ for (int i = 0; i < expectRecords.size(); i++) {
+ LogRecord expectRecord = expectRecords.get(i);
+ // check business columns:
+ assertThat((int) (root.getVector(0).getObject(i)))
+ .isEqualTo(expectRecord.getRow().getInt(0));
+ assertThat(((VarCharVector)
root.getVector(1)).getObject(i).toString())
+ .isEqualTo(expectRecord.getRow().getString(1).toString());
+ assertThat(((VarCharVector)
root.getVector(2)).getObject(i).toString())
+ .isEqualTo(expectRecord.getRow().getString(2).toString());
+ }
+ }
+
+ private LakeCommitter<LanceWriteResult, LanceCommittable>
createLakeCommitter(
+ TablePath tablePath) throws IOException {
+ return lanceLakeTieringFactory.createLakeCommitter(() -> tablePath);
+ }
+
+ private LakeWriter<LanceWriteResult> createLakeWriter(
+ TablePath tablePath,
+ int bucket,
+ @Nullable String partition,
+ Schema schema,
+ Map<String, String> customProperties)
+ throws IOException {
+ return lanceLakeTieringFactory.createLakeWriter(
+ new WriterInitContext() {
+ @Override
+ public TablePath tablePath() {
+ return tablePath;
+ }
+
+ @Override
+ public TableBucket tableBucket() {
+ // don't care about tableId & partitionId
+ return new TableBucket(0, 0L, bucket);
+ }
+
+ @Nullable
+ @Override
+ public String partition() {
+ return partition;
+ }
+
+ @Override
+ public com.alibaba.fluss.metadata.Schema schema() {
+ return schema;
+ }
+
+ @Override
+ public Map<String, String> customProperties() {
+ return customProperties;
+ }
+ });
+ }
+
+ private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
+ @Nullable String partition, int bucket, int numRecords) {
+ List<LogRecord> logRecords = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ GenericRow genericRow;
+ if (partition != null) {
+ // Partitioned table: include partition field in data
+ genericRow = new GenericRow(3); // c1, c2, c3(partition)
+ genericRow.setField(0, i);
+ genericRow.setField(1, BinaryString.fromString("bucket" +
bucket + "_" + i));
+ genericRow.setField(2, BinaryString.fromString(partition)); //
partition field
+ } else {
+ // Non-partitioned table
+ genericRow = new GenericRow(3);
+ genericRow.setField(0, i);
+ genericRow.setField(1, BinaryString.fromString("bucket" +
bucket + "_" + i));
+ genericRow.setField(2, BinaryString.fromString("bucket" +
bucket));
+ }
+ LogRecord logRecord =
+ new GenericRecord(
+ i, System.currentTimeMillis(),
ChangeType.APPEND_ONLY, genericRow);
+ logRecords.add(logRecord);
+ }
+ return Tuple2.of(logRecords, logRecords);
+ }
+
+ private Schema createTable(LanceConfig config) throws Exception {
+ List<Schema.Column> columns = new ArrayList<>();
+ columns.add(new Schema.Column("c1", DataTypes.INT()));
+ columns.add(new Schema.Column("c2", DataTypes.STRING()));
+ columns.add(new Schema.Column("c3", DataTypes.STRING()));
+ Schema.Builder schemaBuilder =
Schema.newBuilder().fromColumns(columns);
+ Schema schema = schemaBuilder.build();
+ WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
+ LanceDatasetAdapter.createDataset(
+ config.getDatasetUri(),
LanceArrowUtils.toArrowSchema(schema.getRowType()), params);
+
+ return schema;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 3dcfc826d..fe2933906 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -126,7 +126,6 @@ public class FlinkPaimonTieringTestBase {
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
execEnv.setParallelism(2);
- execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
}
protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv)
throws Exception {