This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 72b10bbbf [common] Remove TableInfo from Cluster (#2068)
72b10bbbf is described below
commit 72b10bbbf77c8d662db5b610597f5cc0727f8f04
Author: yunhong <[email protected]>
AuthorDate: Tue Dec 16 19:40:43 2025 +0800
[common] Remove TableInfo from Cluster (#2068)
---
.../fluss/client/metadata/MetadataUpdater.java | 6 ++---
.../apache/fluss/client/utils/MetadataUtils.java | 31 ++--------------------
.../client/metadata/MetadataUpdaterITCase.java | 1 -
.../client/metadata/TestingMetadataUpdater.java | 7 ++---
.../client/table/scanner/log/LogFetcherTest.java | 3 +--
.../fluss/client/write/RecordAccumulatorTest.java | 23 +---------------
.../org/apache/fluss/client/write/SenderTest.java | 3 +--
.../write/StickyStaticBucketAssignerTest.java | 19 +------------
.../java/org/apache/fluss/cluster/Cluster.java | 26 ++----------------
.../java/org/apache/fluss/cluster/ClusterTest.java | 13 ++-------
fluss-rpc/src/main/proto/FlussApi.proto | 3 +++
11 files changed, 18 insertions(+), 117 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 00e80e463..5cb714044 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -166,7 +166,7 @@ public class MetadataUpdater {
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
Set<TablePath> needUpdateTablePaths =
tablePaths.stream()
- .filter(tablePath ->
!cluster.getTable(tablePath).isPresent())
+ .filter(tablePath ->
!cluster.getTableId(tablePath).isPresent())
.collect(Collectors.toSet());
if (!needUpdateTablePaths.isEmpty()) {
updateMetadata(needUpdateTablePaths, null, null);
@@ -188,8 +188,8 @@ public class MetadataUpdater {
}
/**
- * Check the table/partition info for the given table bucket exist in
metadata cache, if not,
- * try to update the metadata cache.
+ * Check the table/partition bucket info for the given table bucket exist
in metadata cache, if
+ * not, try to update the metadata cache.
*/
public void checkAndUpdateMetadata(TablePath tablePath, TableBucket
tableBucket) {
if (tableBucket.getPartitionId() == null) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
index 14aa480da..299005499 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java
@@ -24,8 +24,6 @@ import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.exception.StaleMetadataException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TableDescriptor;
-import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.GatewayClientProxy;
import org.apache.fluss.rpc.RpcClient;
@@ -121,7 +119,6 @@ public class MetadataUtils {
ServerNode coordinatorServer =
getCoordinatorServer(response);
Map<TablePath, Long> newTablePathToTableId;
- Map<TablePath, TableInfo> newTablePathToTableInfo;
Map<PhysicalTablePath, List<BucketLocation>>
newBucketLocations;
Map<PhysicalTablePath, Long> newPartitionIdByPath;
@@ -133,16 +130,12 @@ public class MetadataUtils {
// the origin cluster.
newTablePathToTableId =
new
HashMap<>(originCluster.getTableIdByPath());
- newTablePathToTableInfo =
- new
HashMap<>(originCluster.getTableInfoByPath());
newBucketLocations =
new
HashMap<>(originCluster.getBucketLocationsByPath());
newPartitionIdByPath =
new
HashMap<>(originCluster.getPartitionIdByPath());
newTablePathToTableId.putAll(newTableMetadata.tablePathToTableId);
- newTablePathToTableInfo.putAll(
- newTableMetadata.tablePathToTableInfo);
newBucketLocations.putAll(newTableMetadata.bucketLocations);
newPartitionIdByPath.putAll(newTableMetadata.partitionIdByPath);
@@ -150,7 +143,6 @@ public class MetadataUtils {
// If full update, we will clear all tables
info out ot the origin
// cluster.
newTablePathToTableId =
newTableMetadata.tablePathToTableId;
- newTablePathToTableInfo =
newTableMetadata.tablePathToTableInfo;
newBucketLocations =
newTableMetadata.bucketLocations;
newPartitionIdByPath =
newTableMetadata.partitionIdByPath;
}
@@ -160,8 +152,7 @@ public class MetadataUtils {
coordinatorServer,
newBucketLocations,
newTablePathToTableId,
- newPartitionIdByPath,
- newTablePathToTableInfo);
+ newPartitionIdByPath);
})
.get(30, TimeUnit.SECONDS); // TODO currently, we don't have
timeout logic in
// RpcClient, it will let the get() block forever. So we
@@ -171,7 +162,6 @@ public class MetadataUtils {
private static NewTableMetadata getTableMetadataToUpdate(
Cluster cluster, MetadataResponse metadataResponse) {
Map<TablePath, Long> newTablePathToTableId = new HashMap<>();
- Map<TablePath, TableInfo> newTablePathToTableInfo = new HashMap<>();
Map<PhysicalTablePath, List<BucketLocation>> newBucketLocations = new
HashMap<>();
Map<PhysicalTablePath, Long> newPartitionIdByPath = new HashMap<>();
@@ -187,17 +177,6 @@ public class MetadataUtils {
protoTablePath.getDatabaseName(),
protoTablePath.getTableName());
newTablePathToTableId.put(tablePath, tableId);
- TableDescriptor tableDescriptor =
-
TableDescriptor.fromJsonBytes(pbTableMetadata.getTableJson());
- newTablePathToTableInfo.put(
- tablePath,
- TableInfo.of(
- tablePath,
- pbTableMetadata.getTableId(),
- pbTableMetadata.getSchemaId(),
- tableDescriptor,
- pbTableMetadata.getCreatedTime(),
- pbTableMetadata.getModifiedTime()));
// Get all buckets for the table.
List<PbBucketMetadata> pbBucketMetadataList =
@@ -232,25 +211,19 @@ public class MetadataUtils {
});
return new NewTableMetadata(
- newTablePathToTableId,
- newTablePathToTableInfo,
- newBucketLocations,
- newPartitionIdByPath);
+ newTablePathToTableId, newBucketLocations,
newPartitionIdByPath);
}
private static final class NewTableMetadata {
private final Map<TablePath, Long> tablePathToTableId;
- private final Map<TablePath, TableInfo> tablePathToTableInfo;
private final Map<PhysicalTablePath, List<BucketLocation>>
bucketLocations;
private final Map<PhysicalTablePath, Long> partitionIdByPath;
public NewTableMetadata(
Map<TablePath, Long> tablePathToTableId,
- Map<TablePath, TableInfo> tablePathToTableInfo,
Map<PhysicalTablePath, List<BucketLocation>> bucketLocations,
Map<PhysicalTablePath, Long> partitionIdByPath) {
this.tablePathToTableId = tablePathToTableId;
- this.tablePathToTableInfo = tablePathToTableInfo;
this.bucketLocations = bucketLocations;
this.partitionIdByPath = partitionIdByPath;
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
index caecbe545..6d099a24e 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
@@ -108,7 +108,6 @@ class MetadataUpdaterITCase {
null,
Collections.emptyMap(),
Collections.emptyMap(),
- Collections.emptyMap(),
Collections.emptyMap());
metadataUpdater = new MetadataUpdater(rpcClient, new Configuration(),
newCluster);
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index 4d863a29a..1c8f825a9 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -145,7 +145,7 @@ public class TestingMetadataUpdater extends MetadataUpdater
{
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
Set<TablePath> needUpdateTablePaths =
tablePaths.stream()
- .filter(tablePath ->
!cluster.getTable(tablePath).isPresent())
+ .filter(tablePath ->
!cluster.getTableId(tablePath).isPresent())
.collect(Collectors.toSet());
if (!needUpdateTablePaths.isEmpty()) {
throw new IllegalStateException(
@@ -189,7 +189,6 @@ public class TestingMetadataUpdater extends MetadataUpdater
{
Map<PhysicalTablePath, List<BucketLocation>>
tablePathToBucketLocations = new HashMap<>();
Map<TablePath, Long> tableIdByPath = new HashMap<>();
- Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
tableInfos.forEach(
(tablePath, tableInfo) -> {
long tableId = tableInfo.getTableId();
@@ -216,7 +215,6 @@ public class TestingMetadataUpdater extends MetadataUpdater
{
tabletServers.get(2).id(),
replicas)));
tableIdByPath.put(tablePath, tableId);
- tableInfoByPath.put(tablePath, tableInfo);
});
cluster =
new Cluster(
@@ -224,7 +222,6 @@ public class TestingMetadataUpdater extends MetadataUpdater
{
coordinatorServer,
tablePathToBucketLocations,
tableIdByPath,
- Collections.emptyMap(),
- tableInfoByPath);
+ Collections.emptyMap());
}
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
index 3393a37a3..bb36cb7f1 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java
@@ -260,8 +260,7 @@ public class LogFetcherTest extends
ClientToServerITCaseBase {
oldCluster.getCoordinatorServer(),
oldCluster.getBucketLocationsByPath(),
oldCluster.getTableIdByPath(),
- oldCluster.getPartitionIdByPath(),
- oldCluster.getTableInfoByPath());
+ oldCluster.getPartitionIdByPath());
metadataUpdater = new MetadataUpdater(rpcClient, clientConf,
newCluster);
LogScannerStatus logScannerStatus = new LogScannerStatus();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index 47a9c2414..24f264c92 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -25,7 +25,6 @@ import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;
-import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.SchemaGetter;
import org.apache.fluss.metadata.SchemaInfo;
@@ -563,32 +562,12 @@ class RecordAccumulatorTest {
Map<TablePath, Long> tableIdByPath = new HashMap<>();
tableIdByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_ID);
-
- TableInfo data1NonPkTableInfo =
- TableInfo.of(
- DATA1_TABLE_PATH,
- DATA1_TABLE_ID,
- 1,
- TableDescriptor.builder()
- // use INDEXED format better memory control
- // to test RecordAccumulator
- .logFormat(LogFormat.INDEXED)
- .schema(DATA1_SCHEMA)
- .distributedBy(3)
- .build(),
- System.currentTimeMillis(),
- System.currentTimeMillis());
- Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
- tableInfoByPath.put(DATA1_TABLE_PATH, data1NonPkTableInfo);
- tableInfoByPath.put(ZSTD_TABLE_INFO.getTablePath(), ZSTD_TABLE_INFO);
-
return new Cluster(
aliveTabletServersById,
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
bucketsByPath,
tableIdByPath,
- Collections.emptyMap(),
- tableInfoByPath);
+ Collections.emptyMap());
}
private void delayedInterrupt(final Thread thread, final long delayMs) {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index 2d06057e8..f1d9481a6 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -671,8 +671,7 @@ final class SenderTest {
oldCluster.getCoordinatorServer(),
oldCluster.getBucketLocationsByPath(),
oldCluster.getTableIdByPath(),
- oldCluster.getPartitionIdByPath(),
- oldCluster.getTableInfoByPath());
+ oldCluster.getPartitionIdByPath());
metadataUpdater.updateCluster(newCluster);
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
index 5ac05b0fb..f82890633 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
@@ -22,8 +22,6 @@ import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.metadata.TableDescriptor;
-import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.junit.jupiter.api.Test;
@@ -39,7 +37,6 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
-import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.assertj.core.api.Assertions.assertThat;
@@ -202,7 +199,6 @@ class StickyStaticBucketAssignerTest {
Map<PhysicalTablePath, List<BucketLocation>> bucketsByPath = new
HashMap<>();
Map<TablePath, Long> tableIdByPath = new HashMap<>();
- Map<TablePath, TableInfo> tableInfoByPath = new HashMap<>();
bucketLocations.forEach(
bucketLocation -> {
PhysicalTablePath physicalTablePath =
bucketLocation.getPhysicalTablePath();
@@ -212,18 +208,6 @@ class StickyStaticBucketAssignerTest {
tableIdByPath.put(
bucketLocation.getPhysicalTablePath().getTablePath(),
bucketLocation.getTableBucket().getTableId());
- tableInfoByPath.put(
- physicalTablePath.getTablePath(),
- TableInfo.of(
- physicalTablePath.getTablePath(),
-
bucketLocation.getTableBucket().getTableId(),
- 1,
- TableDescriptor.builder()
- .schema(DATA1_SCHEMA)
- .distributedBy(3)
- .build(),
- System.currentTimeMillis(),
- System.currentTimeMillis()));
});
return new Cluster(
@@ -231,7 +215,6 @@ class StickyStaticBucketAssignerTest {
new ServerNode(0, "localhost", 89, ServerType.COORDINATOR),
bucketsByPath,
tableIdByPath,
- Collections.emptyMap(),
- tableInfoByPath);
+ Collections.emptyMap());
}
}
diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
index 1cec4540d..d5690326a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
+++ b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
@@ -21,7 +21,6 @@ import org.apache.fluss.annotation.Internal;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import javax.annotation.Nullable;
@@ -54,22 +53,17 @@ public final class Cluster {
private final Map<PhysicalTablePath, Long> partitionsIdByPath;
private final Map<Long, String> partitionNameById;
- /** Only latest schema of table will be put in it. */
- private final Map<TablePath, TableInfo> tableInfoByPath;
-
public Cluster(
Map<Integer, ServerNode> aliveTabletServersById,
@Nullable ServerNode coordinatorServer,
Map<PhysicalTablePath, List<BucketLocation>> bucketLocationsByPath,
Map<TablePath, Long> tableIdByPath,
- Map<PhysicalTablePath, Long> partitionsIdByPath,
- Map<TablePath, TableInfo> tableInfoByPath) {
+ Map<PhysicalTablePath, Long> partitionsIdByPath) {
this.coordinatorServer = coordinatorServer;
this.aliveTabletServersById =
Collections.unmodifiableMap(aliveTabletServersById);
this.aliveTabletServers =
Collections.unmodifiableList(new
ArrayList<>(aliveTabletServersById.values()));
this.tableIdByPath = Collections.unmodifiableMap(tableIdByPath);
- this.tableInfoByPath = Collections.unmodifiableMap(tableInfoByPath);
this.partitionsIdByPath =
Collections.unmodifiableMap(partitionsIdByPath);
// Index the bucket locations by table path, and index bucket location
by bucket.
@@ -138,8 +132,7 @@ public final class Cluster {
coordinatorServer,
newBucketLocationsByPath,
new HashMap<>(tableIdByPath),
- new HashMap<>(partitionsIdByPath),
- new HashMap<>(tableInfoByPath));
+ new HashMap<>(partitionsIdByPath));
}
@Nullable
@@ -208,15 +201,6 @@ public final class Cluster {
return availableLocationsByPath.getOrDefault(physicalTablePath,
Collections.emptyList());
}
- /**
- * Get the table info for this table.
- *
- * <p>TODO this method need to be remove, use Admin getTableInfo instead.
- */
- public Optional<TableInfo> getTable(TablePath tablePath) {
- return Optional.ofNullable(tableInfoByPath.get(tablePath));
- }
-
public Optional<Long> getTableId(TablePath tablePath) {
return Optional.ofNullable(tableIdByPath.get(tablePath));
}
@@ -265,11 +249,6 @@ public final class Cluster {
return tableIdByPath;
}
- /** Get the table info by table. */
- public Map<TablePath, TableInfo> getTableInfoByPath() {
- return tableInfoByPath;
- }
-
/** Get the bucket by a physical table path. */
public Map<PhysicalTablePath, List<BucketLocation>>
getBucketLocationsByPath() {
return availableLocationsByPath;
@@ -286,7 +265,6 @@ public final class Cluster {
null,
Collections.emptyMap(),
Collections.emptyMap(),
- Collections.emptyMap(),
Collections.emptyMap());
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
b/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
index 458a79eb1..ee23c9109 100644
--- a/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
@@ -18,7 +18,6 @@
package org.apache.fluss.cluster;
import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.junit.jupiter.api.BeforeEach;
@@ -34,10 +33,8 @@ import java.util.Set;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
-import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
-import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -99,8 +96,7 @@ class ClusterTest {
COORDINATOR_SERVER,
new HashMap<>(cluster.getBucketLocationsByPath()),
new HashMap<>(cluster.getTableIdByPath()),
- Collections.emptyMap(),
- new HashMap<>(cluster.getTableInfoByPath()));
+ Collections.emptyMap());
}
// verify available buckets
@@ -176,16 +172,11 @@ class ClusterTest {
tablePathToTableId.put(DATA1_TABLE_PATH, DATA1_TABLE_ID);
tablePathToTableId.put(DATA2_TABLE_PATH, DATA2_TABLE_ID);
- Map<TablePath, TableInfo> tablePathToTableInfo = new HashMap<>();
- tablePathToTableInfo.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO);
- tablePathToTableInfo.put(DATA2_TABLE_PATH, DATA2_TABLE_INFO);
-
return new Cluster(
aliveTabletServersById,
COORDINATOR_SERVER,
tablePathToBucketLocations,
tablePathToTableId,
- Collections.emptyMap(),
- tablePathToTableInfo);
+ Collections.emptyMap());
}
}
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index a08eeaba4..e6cb83b04 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -609,6 +609,9 @@ message PbTableMetadata {
required PbTablePath table_path = 1;
required int64 table_id = 2;
required int32 schema_id = 3;
+ // TODO Starting from version 0.9, fluss-client no longer get table info
from metadata response. However, for backward
+ // compatibility, we will still retain the field in the request. Once
metadataV2 is introduced, table info will be
+ // officially omitted from the new metadata response.
required bytes table_json = 4;
repeated PbBucketMetadata bucket_metadata = 5;
required int64 created_time = 6;