This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new c5257fd23 [client] Get TableInfo via the Admin API instead of via
metadata updater (#2016)
c5257fd23 is described below
commit c5257fd23281a560469f06caac422a48796092a2
Author: yunhong <[email protected]>
AuthorDate: Sat Nov 29 23:33:12 2025 +0800
[client] Get TableInfo via the Admin API instead of via metadata updater
(#2016)
---
.../org/apache/fluss/client/FlussConnection.java | 24 +++++---
.../org/apache/fluss/client/admin/FlussAdmin.java | 5 +-
.../fluss/client/metadata/MetadataUpdater.java | 21 -------
.../client/table/writer/AppendWriterImpl.java | 6 +-
.../client/table/writer/UpsertWriterImpl.java | 12 +++-
.../fluss/client/write/ArrowLogWriteBatch.java | 5 ++
.../client/write/DynamicPartitionCreator.java | 10 ++--
.../fluss/client/write/IndexedLogWriteBatch.java | 5 ++
.../apache/fluss/client/write/KvWriteBatch.java | 5 ++
.../fluss/client/write/RecordAccumulator.java | 66 ++++++++++------------
.../java/org/apache/fluss/client/write/Sender.java | 26 ++++++---
.../org/apache/fluss/client/write/WriteBatch.java | 8 +++
.../org/apache/fluss/client/write/WriteRecord.java | 32 ++++++++++-
.../apache/fluss/client/write/WriterClient.java | 13 +++--
.../fluss/client/admin/FlussAdminITCase.java | 2 +-
.../fluss/client/write/ArrowLogWriteBatchTest.java | 2 +-
.../client/write/IndexedLogWriteBatchTest.java | 2 +-
.../fluss/client/write/KvWriteBatchTest.java | 7 ++-
.../fluss/client/write/RecordAccumulatorTest.java | 4 +-
.../org/apache/fluss/client/write/SenderTest.java | 2 +-
.../java/org/apache/fluss/cluster/Cluster.java | 54 +++++-------------
.../java/org/apache/fluss/cluster/ClusterTest.java | 14 -----
22 files changed, 170 insertions(+), 155 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
index e6a31b9f3..f29d90ba9 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
@@ -33,7 +33,6 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.fs.FileSystem;
-import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -61,6 +60,7 @@ public final class FlussConnection implements Connection {
private volatile LookupClient lookupClient;
private volatile RemoteFileDownloader remoteFileDownloader;
private volatile SecurityTokenManager securityTokenManager;
+ private volatile Admin admin;
FlussConnection(Configuration conf) {
this(conf, MetricRegistry.create(conf, null));
@@ -93,19 +93,16 @@ public final class FlussConnection implements Connection {
@Override
public Admin getAdmin() {
- return new FlussAdmin(rpcClient, metadataUpdater);
+ return getOrCreateAdmin();
}
@Override
public Table getTable(TablePath tablePath) {
- // force to update the table info from server to avoid stale data in
cache
+ // force to update the table info from server to avoid stale data in
cache.
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
- TableInfo tableInfo =
metadataUpdater.getTableInfoOrElseThrow(tablePath);
- return new FlussTable(this, tablePath, tableInfo);
- }
- public RpcClient getRpcClient() {
- return rpcClient;
+ Admin admin = getOrCreateAdmin();
+ return new FlussTable(this, tablePath,
admin.getTableInfo(tablePath).join());
}
public MetadataUpdater getMetadataUpdater() {
@@ -140,6 +137,17 @@ public final class FlussConnection implements Connection {
return lookupClient;
}
+ public Admin getOrCreateAdmin() {
+ if (admin == null) {
+ synchronized (this) {
+ if (admin == null) {
+ admin = new FlussAdmin(rpcClient, metadataUpdater);
+ }
+ }
+ }
+ return admin;
+ }
+
public RemoteFileDownloader getOrCreateRemoteFileDownloader() {
if (remoteFileDownloader == null) {
synchronized (this) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 3dc4c16c9..57dc52008 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -420,7 +420,8 @@ public class FlussAdmin implements Admin {
OffsetSpec offsetSpec) {
Long partitionId = null;
metadataUpdater.updateTableOrPartitionMetadata(physicalTablePath.getTablePath(),
null);
- long tableId =
metadataUpdater.getTableId(physicalTablePath.getTablePath());
+ TableInfo tableInfo =
getTableInfo(physicalTablePath.getTablePath()).join();
+
// if partition name is not null, we need to check and update
partition metadata
if (physicalTablePath.getPartitionName() != null) {
metadataUpdater.updatePhysicalTableMetadata(Collections.singleton(physicalTablePath));
@@ -428,7 +429,7 @@ public class FlussAdmin implements Admin {
}
Map<Integer, ListOffsetsRequest> requestMap =
prepareListOffsetsRequests(
- metadataUpdater, tableId, partitionId, buckets,
offsetSpec);
+ metadataUpdater, tableInfo.getTableId(), partitionId,
buckets, offsetSpec);
Map<Integer, CompletableFuture<Long>> bucketToOffsetMap =
MapUtils.newConcurrentHashMap();
for (int bucket : buckets) {
bucketToOffsetMap.put(bucket, new CompletableFuture<>());
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 831a82436..b47e709b8 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -32,7 +32,6 @@ import org.apache.fluss.exception.RetriableException;
import org.apache.fluss.exception.StaleMetadataException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePartition;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -87,10 +86,6 @@ public class MetadataUpdater {
return cluster.getCoordinatorServer();
}
- public long getTableId(TablePath tablePath) {
- return cluster.getTableId(tablePath);
- }
-
public Optional<Long> getPartitionId(PhysicalTablePath physicalTablePath) {
return cluster.getPartitionId(physicalTablePath);
}
@@ -99,26 +94,10 @@ public class MetadataUpdater {
return cluster.getPartitionIdOrElseThrow(physicalTablePath);
}
- public TableInfo getTableInfoOrElseThrow(TablePath tablePath) {
- return cluster.getTableOrElseThrow(tablePath);
- }
-
public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket)
{
return cluster.getBucketLocation(tableBucket);
}
- private Optional<TableInfo> getTableInfo(TablePath tablePath) {
- return cluster.getTable(tablePath);
- }
-
- public TableInfo getTableInfoOrElseThrow(long tableId) {
- return getTableInfo(cluster.getTablePathOrElseThrow(tableId))
- .orElseThrow(
- () ->
- new FlussRuntimeException(
- "Table not found for table id: " +
tableId));
- }
-
public int leaderFor(TableBucket tableBucket) {
Integer serverNode = cluster.leaderFor(tableBucket);
if (serverNode == null) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
index e2e4ac47e..d702e9621 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
@@ -45,6 +45,7 @@ class AppendWriterImpl extends AbstractTableWriter implements
AppendWriter {
private final LogFormat logFormat;
private final IndexedRowEncoder indexedRowEncoder;
private final FieldGetter[] fieldGetters;
+ private final TableInfo tableInfo;
AppendWriterImpl(TablePath tablePath, TableInfo tableInfo, WriterClient
writerClient) {
super(tablePath, tableInfo, writerClient);
@@ -60,6 +61,7 @@ class AppendWriterImpl extends AbstractTableWriter implements
AppendWriter {
this.logFormat = tableInfo.getTableConfig().getLogFormat();
this.indexedRowEncoder = new IndexedRowEncoder(tableInfo.getRowType());
this.fieldGetters =
InternalRow.createFieldGetters(tableInfo.getRowType());
+ this.tableInfo = tableInfo;
}
/**
@@ -77,10 +79,10 @@ class AppendWriterImpl extends AbstractTableWriter
implements AppendWriter {
final WriteRecord record;
if (logFormat == LogFormat.INDEXED) {
IndexedRow indexedRow = encodeIndexedRow(row);
- record = WriteRecord.forIndexedAppend(physicalPath, indexedRow,
bucketKey);
+ record = WriteRecord.forIndexedAppend(tableInfo, physicalPath,
indexedRow, bucketKey);
} else {
// ARROW format supports general internal row
- record = WriteRecord.forArrowAppend(physicalPath, row, bucketKey);
+ record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row,
bucketKey);
}
return send(record).thenApply(ignored -> APPEND_SUCCESS);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
index bdfc48364..904b85c2a 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
@@ -52,6 +52,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements
UpsertWriter {
private final KvFormat kvFormat;
private final RowEncoder rowEncoder;
private final FieldGetter[] fieldGetters;
+ private final TableInfo tableInfo;
UpsertWriterImpl(
TablePath tablePath,
@@ -75,6 +76,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements
UpsertWriter {
this.kvFormat = tableInfo.getTableConfig().getKvFormat();
this.rowEncoder = RowEncoder.create(kvFormat, rowType);
this.fieldGetters = InternalRow.createFieldGetters(rowType);
+ this.tableInfo = tableInfo;
}
private static void sanityCheck(
@@ -129,7 +131,12 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
bucketKeyEncoder == primaryKeyEncoder ? key :
bucketKeyEncoder.encodeKey(row);
WriteRecord record =
WriteRecord.forUpsert(
- getPhysicalPath(row), encodeRow(row), key, bucketKey,
targetColumns);
+ tableInfo,
+ getPhysicalPath(row),
+ encodeRow(row),
+ key,
+ bucketKey,
+ targetColumns);
return send(record).thenApply(ignored -> UPSERT_SUCCESS);
}
@@ -146,7 +153,8 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
byte[] bucketKey =
bucketKeyEncoder == primaryKeyEncoder ? key :
bucketKeyEncoder.encodeKey(row);
WriteRecord record =
- WriteRecord.forDelete(getPhysicalPath(row), key, bucketKey,
targetColumns);
+ WriteRecord.forDelete(
+ tableInfo, getPhysicalPath(row), key, bucketKey,
targetColumns);
return send(record).thenApply(ignored -> DELETE_SUCCESS);
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
index dec1f55d1..578ff2a25 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
@@ -62,6 +62,11 @@ public class ArrowLogWriteBatch extends WriteBatch {
MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter,
outputView, true);
}
+ @Override
+ public boolean isLogBatch() {
+ return true;
+ }
+
@Override
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback)
throws Exception {
InternalRow row = writeRecord.getRow();
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
index d5cc14ff8..d1d23d39f 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
@@ -23,7 +23,6 @@ import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
-import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.ExceptionUtils;
@@ -64,7 +63,8 @@ public class DynamicPartitionCreator {
this.fatalErrorHandler = fatalErrorHandler;
}
- public void checkAndCreatePartitionAsync(PhysicalTablePath
physicalTablePath) {
+ public void checkAndCreatePartitionAsync(
+ PhysicalTablePath physicalTablePath, List<String> partitionKeys) {
String partitionName = physicalTablePath.getPartitionName();
if (partitionName == null) {
// no need to check and create partition
@@ -89,7 +89,7 @@ public class DynamicPartitionCreator {
// if the partition is not in inflightPartitionsToCreate,
we should create it.
// this means that the partition is not being created by
other threads.
LOG.info("Dynamically creating partition partition for
{}", physicalTablePath);
- createPartition(physicalTablePath);
+ createPartition(physicalTablePath, partitionKeys);
} else {
// if the partition is already in
inflightPartitionsToCreate, we should skip
// creating it.
@@ -121,12 +121,10 @@ public class DynamicPartitionCreator {
return idExist;
}
- private void createPartition(PhysicalTablePath physicalTablePath) {
+ private void createPartition(PhysicalTablePath physicalTablePath,
List<String> partitionKeys) {
String partitionName = physicalTablePath.getPartitionName();
TablePath tablePath = physicalTablePath.getTablePath();
checkArgument(partitionName != null, "Partition name shouldn't be
null.");
- TableInfo tableInfo =
metadataUpdater.getTableInfoOrElseThrow(tablePath);
- List<String> partitionKeys = tableInfo.getPartitionKeys();
ResolvedPartitionSpec resolvedPartitionSpec =
ResolvedPartitionSpec.fromPartitionName(partitionKeys,
partitionName);
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
index 5d1e27080..fb9655658 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
@@ -61,6 +61,11 @@ public final class IndexedLogWriteBatch extends WriteBatch {
MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit,
outputView, true);
}
+ @Override
+ public boolean isLogBatch() {
+ return true;
+ }
+
@Override
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback)
throws Exception {
checkNotNull(callback, "write callback must be not null");
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
index 63652db22..963e9866b 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
@@ -67,6 +67,11 @@ public class KvWriteBatch extends WriteBatch {
this.targetColumns = targetColumns;
}
+ @Override
+ public boolean isLogBatch() {
+ return false;
+ }
+
@Override
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback)
throws Exception {
// currently, we throw exception directly when the target columns of
the write record is
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index 2cfd74da3..9dd8bade6 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -171,17 +171,16 @@ public final class RecordAccumulator {
boolean abortIfBatchFull)
throws Exception {
PhysicalTablePath physicalTablePath =
writeRecord.getPhysicalTablePath();
-
- TableInfo tableInfo =
cluster.getTableOrElseThrow(physicalTablePath.getTablePath());
+ TableInfo tableInfo = writeRecord.getTableInfo();
+ // The metadata may return null for the partition id, but it is fine
to pass null here,
+ // because we will fill the partitionId in bucketReady() before send
the batch.
Optional<Long> partitionIdOpt =
cluster.getPartitionId(physicalTablePath);
BucketAndWriteBatches bucketAndWriteBatches =
writeBatches.computeIfAbsent(
physicalTablePath,
k ->
new BucketAndWriteBatches(
- tableInfo.getTableId(),
- partitionIdOpt.orElse(null),
- tableInfo.isPartitioned()));
+ partitionIdOpt.orElse(null),
tableInfo.isPartitioned()));
// We keep track of the number of appending thread to make sure we do
not miss batches in
// abortIncompleteBatches().
@@ -209,13 +208,7 @@ public final class RecordAccumulator {
synchronized (dq) {
RecordAppendResult appendResult =
appendNewBatch(
- writeRecord,
- callback,
- bucketId,
- tableInfo,
- dq,
- memorySegments,
- cluster);
+ writeRecord, callback, bucketId, tableInfo,
dq, memorySegments);
if (appendResult.newBatchCreated) {
memorySegments = Collections.emptyList();
}
@@ -333,7 +326,6 @@ public final class RecordAccumulator {
physicalTablePath,
k ->
new BucketAndWriteBatches(
- tableBucket.getTableId(),
tableBucket.getPartitionId(),
physicalTablePath.getPartitionName()
!= null));
return bucketAndWriteBatches.batches.computeIfAbsent(
@@ -507,22 +499,28 @@ public final class RecordAccumulator {
}
int bucketId = entry.getKey();
- TableBucket tableBucket =
cluster.getTableBucket(physicalTablePath, bucketId);
- Integer leader = cluster.leaderFor(tableBucket);
- if (leader == null) {
- // This is a bucket for which leader is not known, but
messages are
- // available to send. Note that entries are currently not
removed from
- // batches when deque is empty.
+ Optional<Long> tableIdOpt =
cluster.getTableId(physicalTablePath.getTablePath());
+ if (!tableIdOpt.isPresent()) {
unknownLeaderTables.add(physicalTablePath);
} else {
- nextReadyCheckDelayMs =
- batchReady(
- exhausted,
- leader,
- waitedTimeMs,
- full,
- readyNodes,
- nextReadyCheckDelayMs);
+ TableBucket tableBucket =
+ cluster.getTableBucket(tableIdOpt.get(),
physicalTablePath, bucketId);
+ Integer leader = cluster.leaderFor(tableBucket);
+ if (leader == null) {
+ // This is a bucket for which leader is not known, but
messages are
+ // available to send. Note that entries are currently not
removed from
+ // batches when deque is empty.
+ unknownLeaderTables.add(physicalTablePath);
+ } else {
+ nextReadyCheckDelayMs =
+ batchReady(
+ exhausted,
+ leader,
+ waitedTimeMs,
+ full,
+ readyNodes,
+ nextReadyCheckDelayMs);
+ }
}
}
@@ -570,8 +568,7 @@ public final class RecordAccumulator {
int bucketId,
TableInfo tableInfo,
Deque<WriteBatch> deque,
- List<MemorySegment> segments,
- Cluster cluster)
+ List<MemorySegment> segments)
throws Exception {
RecordAppendResult appendResult = tryAppend(writeRecord, callback,
deque);
if (appendResult != null) {
@@ -591,7 +588,7 @@ public final class RecordAccumulator {
new KvWriteBatch(
bucketId,
physicalTablePath,
- schemaId,
+ tableInfo.getSchemaId(),
tableInfo.getTableConfig().getKvFormat(),
outputView.getPreAllocatedSize(),
outputView,
@@ -609,7 +606,7 @@ public final class RecordAccumulator {
new ArrowLogWriteBatch(
bucketId,
physicalTablePath,
- schemaId,
+ tableInfo.getSchemaId(),
arrowWriter,
outputView,
clock.milliseconds());
@@ -618,7 +615,7 @@ public final class RecordAccumulator {
new IndexedLogWriteBatch(
bucketId,
physicalTablePath,
- schemaId,
+ tableInfo.getSchemaId(),
outputView.getPreAllocatedSize(),
outputView,
clock.milliseconds());
@@ -928,15 +925,12 @@ public final class RecordAccumulator {
/** Per table bucket and write batches. */
private static class BucketAndWriteBatches {
- public final long tableId;
public final boolean isPartitionedTable;
public volatile @Nullable Long partitionId;
// Write batches for each bucket in queue.
public final Map<Integer, Deque<WriteBatch>> batches = new
CopyOnWriteMap<>();
- public BucketAndWriteBatches(
- long tableId, @Nullable Long partitionId, boolean
isPartitionedTable) {
- this.tableId = tableId;
+ public BucketAndWriteBatches(@Nullable Long partitionId, boolean
isPartitionedTable) {
this.partitionId = partitionId;
this.isPartitionedTable = isPartitionedTable;
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
index a81c69ee7..e9c285306 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
@@ -30,7 +30,6 @@ import org.apache.fluss.exception.RetriableException;
import org.apache.fluss.exception.UnknownTableOrBucketException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.PbProduceLogRespForBucket;
import org.apache.fluss.rpc.messages.PbPutKvRespForBucket;
@@ -56,6 +55,7 @@ import java.util.Set;
import static
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeProduceLogRequest;
import static
org.apache.fluss.client.utils.ClientRpcMessageUtils.makePutKvRequest;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
@@ -377,18 +377,17 @@ public class Sender implements Runnable {
} else {
writeBatchByTable.forEach(
(tableId, writeBatches) -> {
- TableInfo tableInfo =
metadataUpdater.getTableInfoOrElseThrow(tableId);
- if (tableInfo.hasPrimaryKey()) {
- sendPutKvRequestAndHandleResponse(
+ if (isLogBatches(writeBatches)) {
+ sendProduceLogRequestAndHandleResponse(
gateway,
- makePutKvRequest(
+ makeProduceLogRequest(
tableId, acks,
maxRequestTimeoutMs, writeBatches),
tableId,
recordsByBucket);
} else {
- sendProduceLogRequestAndHandleResponse(
+ sendPutKvRequestAndHandleResponse(
gateway,
- makeProduceLogRequest(
+ makePutKvRequest(
tableId, acks,
maxRequestTimeoutMs, writeBatches),
tableId,
recordsByBucket);
@@ -397,6 +396,19 @@ public class Sender implements Runnable {
}
}
+ /**
+ * Check whether the given batches are log batches. We assume all the
batches are of the same
+ * type.
+ *
+ * @param batches the batches to check, must not be empty.
+ * @return true if the given batches are log batches, false if they are kv
batches.
+ */
+ private boolean isLogBatches(List<ReadyWriteBatch> batches) {
+ checkArgument(!batches.isEmpty(), "batches must not be empty");
+ ReadyWriteBatch batch = batches.get(0);
+ return batch.writeBatch().isLogBatch();
+ }
+
private void sendProduceLogRequestAndHandleResponse(
TabletServerGateway gateway,
ProduceLogRequest request,
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
index 49766612c..a35a9f58b 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
@@ -62,6 +62,14 @@ public abstract class WriteBatch {
this.recordCount = 0;
}
+ /**
+ * Check if the batch is log batch, e.g., ArrowLogBatch or
IndexedLogBatch, and should use
+ * ProduceLog request. Otherwise, it is a kv batch, and should use PutKv
request.
+ *
+ * @return true if log batch, false if kv batch
+ */
+ public abstract boolean isLogBatch();
+
/**
* try to append one write record to the record batch.
*
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
index 0c1427e6f..e1ab2ec24 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
@@ -19,6 +19,7 @@ package org.apache.fluss.client.write;
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.record.DefaultKvRecord;
import org.apache.fluss.record.DefaultKvRecordBatch;
import org.apache.fluss.record.IndexedLogRecord;
@@ -41,6 +42,7 @@ public final class WriteRecord {
/** Create a write record for upsert operation and partial-upsert
operation. */
public static WriteRecord forUpsert(
+ TableInfo tableInfo,
PhysicalTablePath tablePath,
BinaryRow row,
byte[] key,
@@ -52,6 +54,7 @@ public final class WriteRecord {
int estimatedSizeInBytes =
DefaultKvRecord.sizeOf(key, row) +
DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
return new WriteRecord(
+ tableInfo,
tablePath,
key,
bucketKey,
@@ -63,6 +66,7 @@ public final class WriteRecord {
/** Create a write record for delete operation and partial-delete update.
*/
public static WriteRecord forDelete(
+ TableInfo tableInfo,
PhysicalTablePath tablePath,
byte[] key,
byte[] bucketKey,
@@ -72,6 +76,7 @@ public final class WriteRecord {
int estimatedSizeInBytes =
DefaultKvRecord.sizeOf(key, null) +
DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
return new WriteRecord(
+ tableInfo,
tablePath,
key,
bucketKey,
@@ -83,11 +88,15 @@ public final class WriteRecord {
/** Create a write record for append operation for indexed format. */
public static WriteRecord forIndexedAppend(
- PhysicalTablePath tablePath, IndexedRow row, @Nullable byte[]
bucketKey) {
+ TableInfo tableInfo,
+ PhysicalTablePath tablePath,
+ IndexedRow row,
+ @Nullable byte[] bucketKey) {
checkNotNull(row);
int estimatedSizeInBytes =
IndexedLogRecord.sizeOf(row) +
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
return new WriteRecord(
+ tableInfo,
tablePath,
null,
bucketKey,
@@ -99,13 +108,23 @@ public final class WriteRecord {
/** Creates a write record for append operation for Arrow format. */
public static WriteRecord forArrowAppend(
- PhysicalTablePath tablePath, InternalRow row, @Nullable byte[]
bucketKey) {
+ TableInfo tableInfo,
+ PhysicalTablePath tablePath,
+ InternalRow row,
+ @Nullable byte[] bucketKey) {
checkNotNull(row);
// the write row maybe GenericRow, can't estimate the size.
// it is not necessary to estimate size for Arrow format.
int estimatedSizeInBytes = -1;
return new WriteRecord(
- tablePath, null, bucketKey, row, WriteFormat.ARROW_LOG, null,
estimatedSizeInBytes);
+ tableInfo,
+ tablePath,
+ null,
+ bucketKey,
+ row,
+ WriteFormat.ARROW_LOG,
+ null,
+ estimatedSizeInBytes);
}
//
------------------------------------------------------------------------------------------
@@ -120,8 +139,10 @@ public final class WriteRecord {
// will be null if it's not for partial update
private final @Nullable int[] targetColumns;
private final int estimatedSizeInBytes;
+ private final TableInfo tableInfo;
private WriteRecord(
+ TableInfo tableInfo,
PhysicalTablePath physicalTablePath,
@Nullable byte[] key,
@Nullable byte[] bucketKey,
@@ -129,6 +150,7 @@ public final class WriteRecord {
WriteFormat writeFormat,
@Nullable int[] targetColumns,
int estimatedSizeInBytes) {
+ this.tableInfo = tableInfo;
this.physicalTablePath = physicalTablePath;
this.key = key;
this.bucketKey = bucketKey;
@@ -142,6 +164,10 @@ public final class WriteRecord {
return physicalTablePath;
}
+ public TableInfo getTableInfo() {
+ return tableInfo;
+ }
+
public @Nullable byte[] getKey() {
return key;
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index e91c04843..962369ef7 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -126,8 +126,7 @@ public class WriterClient {
"Failed to construct writer. Max request size: %d
bytes, Idempotence enabled: %b",
maxRequestSizeLocal,
idempotenceManagerLocal != null
- ?
idempotenceManagerLocal.idempotenceEnabled()
- : false),
+ &&
idempotenceManagerLocal.idempotenceEnabled()),
t);
}
}
@@ -174,15 +173,17 @@ public class WriterClient {
try {
throwIfWriterClosed();
+ TableInfo tableInfo = record.getTableInfo();
PhysicalTablePath physicalTablePath =
record.getPhysicalTablePath();
-
dynamicPartitionCreator.checkAndCreatePartitionAsync(physicalTablePath);
+ dynamicPartitionCreator.checkAndCreatePartitionAsync(
+ physicalTablePath, tableInfo.getPartitionKeys());
// maybe create bucket assigner.
Cluster cluster = metadataUpdater.getCluster();
BucketAssigner bucketAssigner =
bucketAssignerMap.computeIfAbsent(
physicalTablePath,
- k -> createBucketAssigner(physicalTablePath, conf,
cluster));
+ k -> createBucketAssigner(tableInfo,
physicalTablePath, conf));
// Append the record to the accumulator.
int bucketId = bucketAssigner.assignBucket(record.getBucketKey(),
cluster);
@@ -332,6 +333,7 @@ public class WriterClient {
if (sender != null) {
sender.forceClose();
}
+
LOG.info("Writer closed.");
}
@@ -340,8 +342,7 @@ public class WriterClient {
}
private BucketAssigner createBucketAssigner(
- PhysicalTablePath physicalTablePath, Configuration conf, Cluster
cluster) {
- TableInfo tableInfo =
cluster.getTableOrElseThrow(physicalTablePath.getTablePath());
+ TableInfo tableInfo, PhysicalTablePath physicalTablePath,
Configuration conf) {
int bucketNumber = tableInfo.getNumBuckets();
List<String> bucketKeys = tableInfo.getBucketKeys();
if (!bucketKeys.isEmpty()) {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 5273b5895..10890f947 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -129,7 +129,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
void testMultiClient() throws Exception {
Admin admin1 = conn.getAdmin();
Admin admin2 = conn.getAdmin();
- assertThat(admin1).isNotSameAs(admin2);
+ assertThat(admin1).isEqualTo(admin2);
TableInfo t1 = admin1.getTableInfo(DEFAULT_TABLE_PATH).get();
TableInfo t2 = admin2.getTableInfo(DEFAULT_TABLE_PATH).get();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
index 04ba2d8b7..6b7eb6fe7 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
@@ -288,7 +288,7 @@ public class ArrowLogWriteBatchTest {
}
private WriteRecord createWriteRecord(GenericRow row) {
- return WriteRecord.forArrowAppend(DATA1_PHYSICAL_TABLE_PATH, row,
null);
+ return WriteRecord.forArrowAppend(DATA1_TABLE_INFO,
DATA1_PHYSICAL_TABLE_PATH, row, null);
}
private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int
maxSizeInBytes) {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
index 07544e8da..3081814a5 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
@@ -199,7 +199,7 @@ public class IndexedLogWriteBatchTest {
}
private WriteRecord createWriteRecord() {
- return WriteRecord.forIndexedAppend(DATA1_PHYSICAL_TABLE_PATH, row,
null);
+ return WriteRecord.forIndexedAppend(DATA1_TABLE_INFO,
DATA1_PHYSICAL_TABLE_PATH, row, null);
}
private IndexedLogWriteBatch createLogWriteBatch(TableBucket tb, long
baseLogOffset)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
index d9e7272b8..5321da0f2 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
@@ -200,7 +200,12 @@ class KvWriteBatchTest {
protected WriteRecord createWriteRecord() {
return WriteRecord.forUpsert(
- PhysicalTablePath.of(DATA1_TABLE_PATH_PK), row, key, key,
null);
+ DATA1_TABLE_INFO_PK,
+ PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+ row,
+ key,
+ key,
+ null);
}
private KvWriteBatch createKvWriteBatch(TableBucket tb) throws Exception {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index e5d71a927..c721e974c 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -242,7 +242,7 @@ class RecordAccumulatorTest {
while (true) {
GenericRow row = row(1, RandomStringUtils.random(10));
PhysicalTablePath tablePath =
PhysicalTablePath.of(ZSTD_TABLE_INFO.getTablePath());
- WriteRecord record = WriteRecord.forArrowAppend(tablePath, row,
null);
+ WriteRecord record = WriteRecord.forArrowAppend(ZSTD_TABLE_INFO,
tablePath, row, null);
// append until the batch is full
if (accum.append(record, writeCallback, cluster, bucketId,
false).batchIsFull) {
break;
@@ -543,7 +543,7 @@ class RecordAccumulatorTest {
* format , see {@link #updateCluster(List)}.
*/
private WriteRecord createRecord(IndexedRow row) {
- return WriteRecord.forIndexedAppend(DATA1_PHYSICAL_TABLE_PATH, row,
null);
+ return WriteRecord.forIndexedAppend(DATA1_TABLE_INFO,
DATA1_PHYSICAL_TABLE_PATH, row, null);
}
private Cluster updateCluster(List<BucketLocation> bucketLocations) {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index 7209a315d..5618d224d 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -707,7 +707,7 @@ final class SenderTest {
private void appendToAccumulator(TableBucket tb, GenericRow row,
WriteCallback writeCallback)
throws Exception {
accumulator.append(
- WriteRecord.forArrowAppend(DATA1_PHYSICAL_TABLE_PATH, row,
null),
+ WriteRecord.forArrowAppend(DATA1_TABLE_INFO,
DATA1_PHYSICAL_TABLE_PATH, row, null),
writeCallback,
metadataUpdater.getCluster(),
tb.getBucket(),
diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
index 9aef86261..ccd73e22c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
+++ b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
@@ -20,7 +20,6 @@ package org.apache.fluss.cluster;
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
@@ -159,17 +158,6 @@ public final class Cluster {
return aliveTabletServers;
}
- /**
- * Get the table id for this table.
- *
- * @param tablePath the table path
- * @return the table id, if metadata cache contains the table path, return
the table path,
- * otherwise return {@link TableInfo#UNKNOWN_TABLE_ID}
- */
- public long getTableId(TablePath tablePath) {
- return tableIdByPath.getOrDefault(tablePath,
TableInfo.UNKNOWN_TABLE_ID);
- }
-
/** Get the table path for this table id. */
public Optional<TablePath> getTablePath(long tableId) {
return Optional.ofNullable(pathByTableId.get(tableId));
@@ -224,41 +212,31 @@ public final class Cluster {
return availableLocationsByPath.getOrDefault(physicalTablePath,
Collections.emptyList());
}
- /** Get the table info for this table. */
+ /**
+ * Get the table info for this table.
+ *
+ * <p>TODO this method need to be remove, use Admin getTableInfo instead.
+ */
public Optional<TableInfo> getTable(TablePath tablePath) {
return Optional.ofNullable(tableInfoByPath.get(tablePath));
}
+ public Optional<Long> getTableId(TablePath tablePath) {
+ return Optional.ofNullable(tableIdByPath.get(tablePath));
+ }
+
/** Get the partition id for this partition. */
public Optional<Long> getPartitionId(PhysicalTablePath physicalTablePath) {
return Optional.ofNullable(partitionsIdByPath.get(physicalTablePath));
}
- /** Return whether the cluster contains the given physical table path or
not. */
- public boolean contains(PhysicalTablePath physicalTablePath) {
- if (physicalTablePath.getPartitionName() == null) {
- return getTable(physicalTablePath.getTablePath()).isPresent();
- } else {
- return getPartitionId(physicalTablePath).isPresent();
- }
- }
-
- public TableInfo getTableOrElseThrow(TablePath tablePath) {
- return getTable(tablePath)
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- String.format(
- "table: %s not found in
cluster", tablePath)));
- }
-
- public TableBucket getTableBucket(PhysicalTablePath physicalTablePath, int
bucketId) {
- TableInfo tableInfo =
getTableOrElseThrow(physicalTablePath.getTablePath());
+ public TableBucket getTableBucket(
+ long tableId, PhysicalTablePath physicalTablePath, int bucketId) {
if (physicalTablePath.getPartitionName() != null) {
Long partitionId = getPartitionIdOrElseThrow(physicalTablePath);
- return new TableBucket(tableInfo.getTableId(), partitionId,
bucketId);
+ return new TableBucket(tableId, partitionId, bucketId);
} else {
- return new TableBucket(tableInfo.getTableId(), bucketId);
+ return new TableBucket(tableId, bucketId);
}
}
@@ -286,12 +264,6 @@ public final class Cluster {
return Optional.ofNullable(partitionNameById.get(partitionId));
}
- /** Get the latest schema for the given table. */
- public Optional<SchemaInfo> getSchema(TablePath tablePath) {
- return getTable(tablePath)
- .map(tableInfo -> new SchemaInfo(tableInfo.getSchema(),
tableInfo.getSchemaId()));
- }
-
/** Get the table path to table id map. */
public Map<TablePath, Long> getTableIdByPath() {
return tableIdByPath;
diff --git
a/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
b/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
index 943163e44..458a79eb1 100644
--- a/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
@@ -18,7 +18,6 @@
package org.apache.fluss.cluster;
import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
@@ -34,11 +33,9 @@ import java.util.Map;
import java.util.Set;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
-import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
-import static org.apache.fluss.record.TestData.DATA2_SCHEMA;
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
@@ -87,17 +84,6 @@ class ClusterTest {
.isInstanceOf(UnsupportedOperationException.class);
}
- @Test
- void testGetTable() {
- Cluster cluster = createCluster(aliveTabletServersById);
-
assertThat(cluster.getTable(DATA1_TABLE_PATH).get()).isEqualTo(DATA1_TABLE_INFO);
-
assertThat(cluster.getTable(DATA2_TABLE_PATH).get()).isEqualTo(DATA2_TABLE_INFO);
- assertThat(cluster.getSchema(DATA1_TABLE_PATH).get())
- .isEqualTo(new SchemaInfo(DATA1_SCHEMA, 1));
- assertThat(cluster.getSchema(DATA2_TABLE_PATH).get())
- .isEqualTo(new SchemaInfo(DATA2_SCHEMA, 1));
- }
-
@Test
void testInvalidMetaAndUpdate() {
Cluster cluster = createCluster(aliveTabletServersById);