This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new ea2a1fab3e [#7269] feat(storage): Add storage for statistics (#7690)
ea2a1fab3e is described below

commit ea2a1fab3ee3b6f331765c06ba26e77fdc19e747
Author: roryqi <[email protected]>
AuthorDate: Tue Aug 12 10:44:46 2025 +0800

    [#7269] feat(storage): Add storage for statistics (#7690)
    
    ### What changes were proposed in this pull request?
    
    Add storage for statistics
    
    ### Why are the changes needed?
    
    Fix: #7269
    
    ### Does this PR introduce _any_ user-facing change?
    
    Add storage for statistics
    
    ### How was this patch tested?
    
    Add UT
---
 .../java/org/apache/gravitino/json/JsonUtils.java  |  96 ++++-
 .../org/apache/gravitino/json/TestJsonUtils.java   |  62 ++++
 .../src/main/java/org/apache/gravitino/Entity.java |   1 +
 .../org/apache/gravitino/meta/StatisticEntity.java | 127 +++++++
 .../gravitino/storage/relational/JDBCBackend.java  |   6 +
 .../relational/mapper/StatisticMetaMapper.java     |  72 ++++
 .../mapper/StatisticSQLProviderFactory.java        |  94 +++++
 .../provider/base/StatisticBaseSQLProvider.java    | 185 ++++++++++
 .../SecurableObjectPostgreSQLProvider.java         |   4 +-
 .../postgresql/StatisticPostgresSQLProvider.java   |  62 ++++
 .../storage/relational/po/StatisticPO.java         | 214 +++++++++++
 .../relational/service/CatalogMetaService.java     |  11 +-
 .../relational/service/FilesetMetaService.java     |   5 +
 .../relational/service/MetalakeMetaService.java    |   9 +
 .../relational/service/ModelMetaService.java       |   5 +
 .../relational/service/SchemaMetaService.java      |  11 +-
 .../relational/service/StatisticMetaService.java   | 103 ++++++
 .../relational/service/TableMetaService.java       |   5 +
 .../relational/service/TopicMetaService.java       |   5 +
 .../apache/gravitino/utils/NameIdentifierUtil.java |  13 +
 .../storage/relational/TestJDBCBackend.java        |  65 ++++
 .../service/TestStatisticMetaService.java          | 402 +++++++++++++++++++++
 .../storage/relational/utils/TestPOConverters.java |  53 +++
 scripts/h2/schema-1.0.0-h2.sql                     |  18 +
 scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql           |  18 +
 scripts/mysql/schema-1.0.0-mysql.sql               |  18 +
 scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql     |  18 +
 scripts/postgresql/schema-1.0.0-postgresql.sql     |  30 ++
 .../upgrade-0.9.0-to-1.0.0-postgresql.sql          |  26 ++
 29 files changed, 1732 insertions(+), 6 deletions(-)

diff --git a/common/src/main/java/org/apache/gravitino/json/JsonUtils.java 
b/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
index 68b66220ad..5fd2ab3f8f 100644
--- a/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
+++ b/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
@@ -35,6 +35,7 @@ import com.fasterxml.jackson.databind.cfg.EnumFeature;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import com.google.common.annotations.VisibleForTesting;
@@ -44,6 +45,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
@@ -85,6 +87,8 @@ import 
org.apache.gravitino.rel.expressions.sorts.SortDirection;
 import org.apache.gravitino.rel.indexes.Index;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
 
 /** Utility class for working with JSON data. */
 public class JsonUtils {
@@ -301,7 +305,9 @@ public class JsonUtils {
                     .addDeserializer(Type.class, new TypeDeserializer())
                     .addSerializer(Type.class, new TypeSerializer())
                     .addDeserializer(Expression.class, new 
ColumnDefaultValueDeserializer())
-                    .addSerializer(Expression.class, new 
ColumnDefaultValueSerializer()));
+                    .addSerializer(Expression.class, new 
ColumnDefaultValueSerializer())
+                    .addDeserializer(StatisticValue.class, new 
StatisticValueDeserializer())
+                    .addSerializer(StatisticValue.class, new 
StatisticValueSerializer()));
   }
 
   /**
@@ -1343,6 +1349,94 @@ public class JsonUtils {
     }
   }
 
+  /** Custom JSON deserializer for StatisticValue objects. */
+  public static class StatisticValueDeserializer extends 
JsonDeserializer<StatisticValue> {
+    @Override
+    public StatisticValue<?> deserialize(JsonParser p, DeserializationContext 
ctxt)
+        throws IOException {
+      JsonNode node = p.getCodec().readTree(p);
+      return getStatisticValue(node);
+    }
+  }
+
+  private static StatisticValue<?> getStatisticValue(JsonNode node) throws 
IOException {
+    Preconditions.checkArgument(
+        node != null && !node.isNull(), "Cannot parse statistic value from 
invalid JSON: %s", node);
+    if (node.isIntegralNumber()) {
+      return StatisticValues.longValue(node.asLong());
+    } else if (node.isFloatingPointNumber()) {
+      return StatisticValues.doubleValue(node.asDouble());
+    } else if (node.isTextual()) {
+      return StatisticValues.stringValue(node.asText());
+    } else if (node.isBoolean()) {
+      return StatisticValues.booleanValue(node.asBoolean());
+    } else if (node.isArray()) {
+      ArrayNode arrayNode = (ArrayNode) node;
+      List<StatisticValue<Object>> values = 
Lists.newArrayListWithCapacity(arrayNode.size());
+      for (JsonNode arrayElement : arrayNode) {
+        StatisticValue<?> value = getStatisticValue(arrayElement);
+        if (value != null) {
+          values.add((StatisticValue<Object>) value);
+        }
+      }
+      return StatisticValues.listValue(values);
+    } else if (node.isObject()) {
+      ObjectNode objectNode = (ObjectNode) node;
+      Map<String, StatisticValue<?>> map = Maps.newHashMap();
+      objectNode
+          .fields()
+          .forEachRemaining(
+              entry -> {
+                try {
+                  StatisticValue<?> value = 
getStatisticValue(entry.getValue());
+                  if (value != null) {
+                    map.put(entry.getKey(), value);
+                  }
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+      return StatisticValues.objectValue(map);
+    } else {
+      throw new UnsupportedEncodingException(
+          String.format("Don't support json node type %s", 
node.getNodeType()));
+    }
+  }
+
+  /** Custom JSON serializer for StatisticValue objects. */
+  public static class StatisticValueSerializer extends 
JsonSerializer<StatisticValue> {
+
+    @Override
+    public void serialize(StatisticValue value, JsonGenerator gen, 
SerializerProvider serializers)
+        throws IOException {
+      if (value.dataType().name() == Type.Name.BOOLEAN) {
+        gen.writeBoolean((Boolean) value.value());
+      } else if (value.dataType().name() == Type.Name.STRING) {
+        gen.writeString((String) value.value());
+      } else if (value.dataType().name() == Type.Name.DOUBLE) {
+        gen.writeNumber((Double) value.value());
+      } else if (value.dataType().name() == Type.Name.LONG) {
+        gen.writeNumber((Long) value.value());
+      } else if (value.dataType().name() == Type.Name.LIST) {
+        gen.writeStartArray();
+        for (StatisticValue<?> element : (List<StatisticValue<?>>) 
value.value()) {
+          serialize(element, gen, serializers);
+        }
+        gen.writeEndArray();
+      } else if (value.dataType().name() == Type.Name.STRUCT) {
+        gen.writeStartObject();
+        for (Map.Entry<String, StatisticValue<?>> entry :
+            ((Map<String, StatisticValue<?>>) value.value()).entrySet()) {
+          gen.writeFieldName(entry.getKey());
+          serialize(entry.getValue(), gen, serializers);
+        }
+        gen.writeEndObject();
+      } else {
+        throw new IOException("Unsupported statistic value type: " + 
value.dataType());
+      }
+    }
+  }
+
   /** Custom JSON serializer for PartitionDTO objects. */
   public static class PartitionDTOSerializer extends 
JsonSerializer<PartitionDTO> {
     @Override
diff --git a/common/src/test/java/org/apache/gravitino/json/TestJsonUtils.java 
b/common/src/test/java/org/apache/gravitino/json/TestJsonUtils.java
index ff07635e20..c99af454e3 100644
--- a/common/src/test/java/org/apache/gravitino/json/TestJsonUtils.java
+++ b/common/src/test/java/org/apache/gravitino/json/TestJsonUtils.java
@@ -26,7 +26,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.util.List;
+import java.util.Map;
 import org.apache.gravitino.dto.rel.expressions.LiteralDTO;
 import org.apache.gravitino.dto.rel.indexes.IndexDTO;
 import org.apache.gravitino.dto.rel.partitions.IdentityPartitionDTO;
@@ -36,6 +39,8 @@ import 
org.apache.gravitino.dto.rel.partitions.RangePartitionDTO;
 import org.apache.gravitino.rel.indexes.Index;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -50,6 +55,8 @@ public class TestJsonUtils {
     SimpleModule module = new SimpleModule();
     module.addSerializer(Type.class, new JsonUtils.TypeSerializer());
     module.addDeserializer(Type.class, new JsonUtils.TypeDeserializer());
+    module.addSerializer(StatisticValue.class, new 
JsonUtils.StatisticValueSerializer());
+    module.addDeserializer(StatisticValue.class, new 
JsonUtils.StatisticValueDeserializer());
     objectMapper.registerModule(module);
   }
 
@@ -458,4 +465,59 @@ public class TestJsonUtils {
         objectMapper.readValue(expected, IndexDTO.class),
         objectMapper.readValue(jsonValue, IndexDTO.class));
   }
+
+  @Test
+  void testStatisticValueSerde() throws JsonProcessingException {
+    String expectJson = "\"1\"";
+    String strValue = 
objectMapper.writeValueAsString(StatisticValues.stringValue("1"));
+    Assertions.assertEquals(strValue, expectJson);
+    Assertions.assertEquals(
+        objectMapper.readValue(expectJson, StatisticValue.class),
+        objectMapper.readValue(strValue, StatisticValue.class));
+
+    expectJson = "1";
+    String longValue = 
objectMapper.writeValueAsString(StatisticValues.longValue(1));
+    Assertions.assertEquals(longValue, expectJson);
+    Assertions.assertEquals(
+        objectMapper.readValue(expectJson, StatisticValue.class),
+        objectMapper.readValue(longValue, StatisticValue.class));
+
+    expectJson = "1.0";
+    String doubleValue = 
objectMapper.writeValueAsString(StatisticValues.doubleValue(1.0));
+    Assertions.assertEquals(doubleValue, expectJson);
+    Assertions.assertEquals(
+        objectMapper.readValue(expectJson, StatisticValue.class),
+        objectMapper.readValue(doubleValue, StatisticValue.class));
+
+    expectJson = "true";
+    String booleanValue = 
objectMapper.writeValueAsString(StatisticValues.booleanValue(true));
+    Assertions.assertEquals(booleanValue, "true");
+    Assertions.assertEquals(
+        objectMapper.readValue(expectJson, StatisticValue.class),
+        objectMapper.readValue(booleanValue, StatisticValue.class));
+
+    expectJson = "[\"1\",\"2\"]";
+    String listValue =
+        objectMapper.writeValueAsString(
+            StatisticValues.listValue(
+                Lists.newArrayList(
+                    StatisticValues.stringValue("1"), 
StatisticValues.stringValue("2"))));
+    Assertions.assertEquals(listValue, expectJson);
+    Assertions.assertEquals(
+        objectMapper.readValue(expectJson, StatisticValue.class),
+        objectMapper.readValue(listValue, StatisticValue.class));
+
+    expectJson = "{\"key1\":\"value1\",\"key2\":2}";
+    Map<String, StatisticValue<?>> values = Maps.newHashMap();
+    values.put("key1", StatisticValues.stringValue("value1"));
+    values.put("key2", StatisticValues.longValue(2L));
+
+    String objectValue = 
objectMapper.writeValueAsString(StatisticValues.objectValue(values));
+    Assertions.assertEquals(
+        objectValue, expectJson, "Object value serialization should match 
expected format");
+
+    Assertions.assertEquals(
+        objectMapper.readValue(expectJson, StatisticValue.class),
+        objectMapper.readValue(objectValue, StatisticValue.class));
+  }
 }
diff --git a/core/src/main/java/org/apache/gravitino/Entity.java 
b/core/src/main/java/org/apache/gravitino/Entity.java
index c8d973f8c5..d08ddd0011 100644
--- a/core/src/main/java/org/apache/gravitino/Entity.java
+++ b/core/src/main/java/org/apache/gravitino/Entity.java
@@ -77,6 +77,7 @@ public interface Entity extends Serializable {
     MODEL,
     MODEL_VERSION,
     POLICY,
+    STATISTIC,
     JOB_TEMPLATE,
     JOB,
     AUDIT;
diff --git a/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
new file mode 100644
index 0000000000..284b347a73
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.meta;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.Auditable;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Field;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.stats.StatisticValue;
+
+public class StatisticEntity implements Entity, HasIdentifier, Auditable {
+  public static final Field ID =
+      Field.required("id", Long.class, "The unique identifier of the statistic 
entity.");
+  public static final Field NAME =
+      Field.required("name", String.class, "The name of the statistic 
entity.");
+  public static final Field VALUE =
+      Field.required("value", StatisticValue.class, "The value of the 
statistic entity.");
+  public static final Field AUDIT_INFO =
+      Field.required("audit_info", Audit.class, "The audit details of the 
statistic entity.");
+
+  private Long id;
+  private String name;
+  private StatisticValue<?> value;
+  private AuditInfo auditInfo;
+  private Namespace namespace;
+
+  @Override
+  public Audit auditInfo() {
+    return auditInfo;
+  }
+
+  @Override
+  public Map<Field, Object> fields() {
+    Map<Field, Object> fields = Maps.newHashMap();
+    fields.put(ID, id);
+    fields.put(NAME, name);
+    fields.put(VALUE, value);
+    fields.put(AUDIT_INFO, auditInfo);
+    return fields;
+  }
+
+  @Override
+  public EntityType type() {
+    return EntityType.STATISTIC;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public Long id() {
+    return id;
+  }
+
+  @Override
+  public Namespace namespace() {
+    return namespace;
+  }
+
+  public StatisticValue<?> value() {
+    return value;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private final StatisticEntity statisticEntity;
+
+    private Builder() {
+      statisticEntity = new StatisticEntity();
+    }
+
+    public Builder withId(Long id) {
+      statisticEntity.id = id;
+      return this;
+    }
+
+    public Builder withName(String name) {
+      statisticEntity.name = name;
+      return this;
+    }
+
+    public Builder withValue(StatisticValue<?> value) {
+      statisticEntity.value = value;
+      return this;
+    }
+
+    public Builder withAuditInfo(AuditInfo auditInfo) {
+      statisticEntity.auditInfo = auditInfo;
+      return this;
+    }
+
+    public Builder withNamespace(Namespace namespace) {
+      statisticEntity.namespace = namespace;
+      return this;
+    }
+
+    public StatisticEntity build() {
+      statisticEntity.validate();
+      return statisticEntity;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 7c92e88a2b..bfe3ac72a9 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -67,6 +67,7 @@ import 
org.apache.gravitino.storage.relational.service.OwnerMetaService;
 import org.apache.gravitino.storage.relational.service.PolicyMetaService;
 import org.apache.gravitino.storage.relational.service.RoleMetaService;
 import org.apache.gravitino.storage.relational.service.SchemaMetaService;
+import org.apache.gravitino.storage.relational.service.StatisticMetaService;
 import org.apache.gravitino.storage.relational.service.TableColumnMetaService;
 import org.apache.gravitino.storage.relational.service.TableMetaService;
 import org.apache.gravitino.storage.relational.service.TagMetaService;
@@ -374,6 +375,10 @@ public class JDBCBackend implements RelationalBackend {
         return ModelVersionMetaService.getInstance()
             .deleteModelVersionMetasByLegacyTimeline(
                 legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
+      case STATISTIC:
+        return StatisticMetaService.getInstance()
+            .deleteStatisticsByLegacyTimeline(
+                legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case JOB_TEMPLATE:
         return JobTemplateMetaService.getInstance()
             .deleteJobTemplatesByLegacyTimeline(
@@ -408,6 +413,7 @@ public class JDBCBackend implements RelationalBackend {
       case TAG:
       case MODEL:
       case MODEL_VERSION:
+      case STATISTIC:
       case JOB_TEMPLATE:
       case JOB:
         // These entity types have not implemented multi-versions, so we can 
skip.
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticMetaMapper.java
new file mode 100644
index 0000000000..0e422e5d6f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticMetaMapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface StatisticMetaMapper {
+
+  String STATISTIC_META_TABLE_NAME = "statistic_meta";
+
+  @SelectProvider(type = StatisticSQLProviderFactory.class, method = 
"listStatisticPOsByEntityId")
+  List<StatisticPO> listStatisticPOsByEntityId(
+      @Param("metalakeId") Long metalakeId, @Param("entityId") long entityId);
+
+  @InsertProvider(
+      type = StatisticSQLProviderFactory.class,
+      method = "batchInsertStatisticPOsOnDuplicateKeyUpdate")
+  void batchInsertStatisticPOsOnDuplicateKeyUpdate(
+      @Param("statisticPOs") List<StatisticPO> statisticPOs);
+
+  @UpdateProvider(type = StatisticSQLProviderFactory.class, method = 
"batchDeleteStatisticPOs")
+  Integer batchDeleteStatisticPOs(
+      @Param("entityId") Long entityId, @Param("statisticNames") List<String> 
statisticNames);
+
+  @UpdateProvider(
+      type = StatisticSQLProviderFactory.class,
+      method = "softDeleteStatisticsByEntityId")
+  Integer softDeleteStatisticsByEntityId(@Param("entityId") Long entityId);
+
+  @UpdateProvider(
+      type = StatisticSQLProviderFactory.class,
+      method = "softDeleteStatisticsByMetalakeId")
+  Integer softDeleteStatisticsByMetalakeId(@Param("metalakeId") Long 
metalakeId);
+
+  @UpdateProvider(
+      type = StatisticSQLProviderFactory.class,
+      method = "softDeleteStatisticsByCatalogId")
+  Integer softDeleteStatisticsByCatalogId(@Param("catalogId") Long catalogId);
+
+  @UpdateProvider(
+      type = StatisticSQLProviderFactory.class,
+      method = "softDeleteStatisticsBySchemaId")
+  Integer softDeleteStatisticsBySchemaId(@Param("schemaId") Long schemaId);
+
+  @DeleteProvider(
+      type = StatisticSQLProviderFactory.class,
+      method = "deleteStatisticsByLegacyTimeline")
+  Integer deleteStatisticsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticSQLProviderFactory.java
new file mode 100644
index 0000000000..393ecd8618
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticSQLProviderFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.StatisticBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.StatisticPostgresSQLProvider;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class StatisticSQLProviderFactory {
+
+  static class StatisticMySQLProvider extends StatisticBaseSQLProvider {}
+
+  static class StatisticH2Provider extends StatisticBaseSQLProvider {}
+
+  private static final Map<JDBCBackend.JDBCBackendType, 
StatisticBaseSQLProvider>
+      STATISTIC_SQL_PROVIDERS =
+          ImmutableMap.of(
+              JDBCBackend.JDBCBackendType.H2,
+              new StatisticH2Provider(),
+              JDBCBackend.JDBCBackendType.MYSQL,
+              new StatisticMySQLProvider(),
+              JDBCBackend.JDBCBackendType.POSTGRESQL,
+              new StatisticPostgresSQLProvider());
+
+  public static StatisticBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+    JDBCBackend.JDBCBackendType jdbcBackendType =
+        JDBCBackend.JDBCBackendType.fromString(databaseId);
+    return STATISTIC_SQL_PROVIDERS.get(jdbcBackendType);
+  }
+
+  public static String batchInsertStatisticPOsOnDuplicateKeyUpdate(
+      @Param("statisticPOs") List<StatisticPO> statisticPOs) {
+    return 
getProvider().batchInsertStatisticPOsOnDuplicateKeyUpdate(statisticPOs);
+  }
+
+  public static String batchDeleteStatisticPOs(
+      @Param("entityId") Long entityId, @Param("statisticNames") List<String> 
statisticNames) {
+    return getProvider().batchDeleteStatisticPOs(entityId, statisticNames);
+  }
+
+  public static String softDeleteStatisticsByEntityId(@Param("entityId") Long 
entityId) {
+    return getProvider().softDeleteStatisticsByEntityId(entityId);
+  }
+
+  public static String listStatisticPOsByEntityId(
+      @Param("metalakeId") Long metalakeId, @Param("entityId") Long entityId) {
+    return getProvider().listStatisticPOsByEntityId(metalakeId, entityId);
+  }
+
+  public static String softDeleteStatisticsByMetalakeId(@Param("metalakeId") 
Long metalakeId) {
+    return getProvider().softDeleteStatisticsByMetalakeId(metalakeId);
+  }
+
+  public static String softDeleteStatisticsByCatalogId(@Param("catalogId") 
Long catalogId) {
+    return getProvider().softDeleteStatisticsByCatalogId(catalogId);
+  }
+
+  public static String softDeleteStatisticsBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return getProvider().softDeleteStatisticsBySchemaId(schemaId);
+  }
+
+  public static String deleteStatisticsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return getProvider().deleteStatisticsByLegacyTimeline(legacyTimeline, 
limit);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/StatisticBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/StatisticBaseSQLProvider.java
new file mode 100644
index 0000000000..a3ffce5254
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/StatisticBaseSQLProvider.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static 
org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper.STATISTIC_META_TABLE_NAME;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
+import org.apache.ibatis.annotations.Param;
+
+public class StatisticBaseSQLProvider {
+
+  public String batchInsertStatisticPOsOnDuplicateKeyUpdate(
+      @Param("statisticPOs") List<StatisticPO> statisticPOs) {
+    return "<script>"
+        + "INSERT INTO "
+        + STATISTIC_META_TABLE_NAME
+        + " (statistic_id, statistic_name, statistic_value, metalake_id, 
metadata_object_id,"
+        + " metadata_object_type, audit_info, current_version, last_version, 
deleted_at) VALUES "
+        + "<foreach collection='statisticPOs' item='item' separator=','>"
+        + "(#{item.statisticId}, "
+        + "#{item.statisticName}, "
+        + "#{item.statisticValue}, "
+        + "#{item.metalakeId}, "
+        + "#{item.metadataObjectId}, "
+        + "#{item.metadataObjectType}, "
+        + "#{item.auditInfo}, "
+        + "#{item.currentVersion}, "
+        + "#{item.lastVersion}, "
+        + "#{item.deletedAt})"
+        + "</foreach>"
+        + " ON DUPLICATE KEY UPDATE "
+        + "  statistic_value = VALUES(statistic_value),"
+        + "  audit_info = VALUES(audit_info),"
+        + "  current_version = VALUES(current_version),"
+        + "  last_version = VALUES(last_version),"
+        + "  deleted_at = VALUES(deleted_at)"
+        + "</script>";
+  }
+
+  public String batchDeleteStatisticPOs(
+      @Param("entityId") Long entityId, @Param("statisticNames") List<String> 
statisticNames) {
+    return "<script>"
+        + "UPDATE "
+        + STATISTIC_META_TABLE_NAME
+        + softDeleteSQL()
+        + " WHERE "
+        + " statistic_name IN  ("
+        + "<foreach collection='statisticNames' item='item' separator=','>"
+        + " #{item}"
+        + "</foreach>"
+        + " ) AND deleted_at = 0 AND metadata_object_id = #{entityId}"
+        + "</script>";
+  }
+
+  public String softDeleteStatisticsByEntityId(@Param("entityId") Long 
entityId) {
+    return "UPDATE "
+        + STATISTIC_META_TABLE_NAME
+        + softDeleteSQL()
+        + " WHERE metadata_object_id = #{entityId} AND deleted_at = 0";
+  }
+
+  public String listStatisticPOsByEntityId(
+      @Param("metalakeId") Long metalakeId, @Param("entityId") Long entityId) {
+    return "SELECT statistic_id as statisticId, statistic_name as 
statisticName, metalake_id as metalakeId,"
+        + " statistic_value as statisticValue, metadata_object_id as 
metadataObjectId,"
+        + "metadata_object_type as metadataObjectType, audit_info as 
auditInfo,"
+        + "current_version as currentVersion, last_version as lastVersion, 
deleted_at as deletedAt FROM "
+        + STATISTIC_META_TABLE_NAME
+        + " WHERE metadata_object_id = #{entityId} AND deleted_at = 0 AND 
metalake_id = #{metalakeId}";
+  }
+
+  public String softDeleteStatisticsByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + STATISTIC_META_TABLE_NAME
+        + " stat "
+        + softDeleteSQL()
+        + " WHERE  stat.metalake_id = #{metalakeId} AND stat.deleted_at = 0";
+  }
+
+  public String softDeleteStatisticsByCatalogId(@Param("catalogId") Long 
catalogId) {
+    return "UPDATE "
+        + STATISTIC_META_TABLE_NAME
+        + " stat "
+        + softDeleteSQL()
+        + " WHERE stat.deleted_at = 0 AND EXISTS ("
+        + " SELECT ct.catalog_id FROM "
+        + CatalogMetaMapper.TABLE_NAME
+        + " ct WHERE ct.catalog_id = #{catalogId}  AND "
+        + " ct.catalog_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'CATALOG'"
+        + " UNION "
+        + " SELECT st.catalog_id FROM "
+        + SchemaMetaMapper.TABLE_NAME
+        + " st WHERE st.catalog_id = #{catalogId} AND "
+        + " st.schema_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'SCHEMA'"
+        + " UNION "
+        + " SELECT tt.catalog_id FROM "
+        + TopicMetaMapper.TABLE_NAME
+        + " tt WHERE tt.catalog_id = #{catalogId} AND "
+        + " tt.topic_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'TOPIC'"
+        + " UNION "
+        + " SELECT tat.catalog_id FROM "
+        + TableMetaMapper.TABLE_NAME
+        + " tat WHERE tat.catalog_id = #{catalogId} AND "
+        + " tat.table_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'TABLE'"
+        + " UNION "
+        + " SELECT ft.catalog_id FROM "
+        + FilesetMetaMapper.META_TABLE_NAME
+        + " ft WHERE ft.catalog_id = #{catalogId} AND"
+        + " ft.fileset_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'FILESET'"
+        + " UNION "
+        + " SELECT mt.catalog_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " mt WHERE mt.catalog_id = #{catalogId} AND"
+        + " mt.model_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'MODEL'"
+        + ")";
+  }
+
+  public String softDeleteStatisticsBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return "UPDATE "
+        + STATISTIC_META_TABLE_NAME
+        + " stat"
+        + softDeleteSQL()
+        + " WHERE stat.deleted_at = 0 AND EXISTS ("
+        + " SELECT st.schema_id FROM "
+        + SchemaMetaMapper.TABLE_NAME
+        + " st WHERE st.schema_id = #{schemaId} "
+        + " AND st.schema_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'SCHEMA'"
+        + " UNION "
+        + " SELECT tt.schema_id FROM "
+        + TopicMetaMapper.TABLE_NAME
+        + " tt WHERE tt.schema_id = #{schemaId} AND "
+        + " tt.topic_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'TOPIC'"
+        + " UNION "
+        + " SELECT tat.schema_id FROM "
+        + TableMetaMapper.TABLE_NAME
+        + " tat WHERE tat.schema_id = #{schemaId} AND "
+        + " tat.table_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'TABLE'"
+        + " UNION "
+        + " SELECT ft.schema_id FROM "
+        + FilesetMetaMapper.META_TABLE_NAME
+        + " ft WHERE ft.schema_id = #{schemaId} AND "
+        + " ft.fileset_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'FILESET'"
+        + " UNION "
+        + " SELECT mt.schema_id FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " mt WHERE mt.schema_id = #{schemaId} AND "
+        + " mt.model_id = stat.metadata_object_id AND 
stat.metadata_object_type = 'MODEL'"
+        + ")";
+  }
+
+  public String deleteStatisticsByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + STATISTIC_META_TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
+  }
+
+  protected String softDeleteSQL() {
+    return " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000 ";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
index 027f4fb720..3b25a7f469 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
@@ -44,9 +44,7 @@ public class SecurableObjectPostgreSQLProvider extends 
SecurableObjectBaseSQLPro
         + " WHERE FALSE "
         + "<foreach collection='securableObjects' item='item' separator=' '>"
         + " OR (metadata_object_id = #{item.metadataObjectId} AND"
-        + " role_id = #{item.roleId} AND deleted_at = 0 AND"
-        + " privilege_names = #{item.privilegeNames} AND"
-        + " privilege_conditions = #{item.privilegeConditions})"
+        + " role_id = #{item.roleId} AND deleted_at = 0)"
         + "</foreach>"
         + "</script>";
   }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/StatisticPostgresSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/StatisticPostgresSQLProvider.java
new file mode 100644
index 0000000000..f0e0e7d215
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/StatisticPostgresSQLProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import static 
org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper.STATISTIC_META_TABLE_NAME;
+
+import java.util.List;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.StatisticBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
+
+public class StatisticPostgresSQLProvider extends StatisticBaseSQLProvider {
+  @Override
+  protected String softDeleteSQL() {
+    return " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) ";
+  }
+
+  @Override
+  public String batchInsertStatisticPOsOnDuplicateKeyUpdate(List<StatisticPO> 
statisticPOs) {
+    return "<script>"
+        + "INSERT INTO "
+        + STATISTIC_META_TABLE_NAME
+        + " (statistic_id, statistic_name, statistic_value, metalake_id, 
metadata_object_id,"
+        + " metadata_object_type, audit_info, current_version, last_version, 
deleted_at) VALUES "
+        + "<foreach collection='statisticPOs' item='item' separator=','>"
+        + "(#{item.statisticId}, "
+        + "#{item.statisticName}, "
+        + "#{item.statisticValue}, "
+        + "#{item.metalakeId}, "
+        + "#{item.metadataObjectId}, "
+        + "#{item.metadataObjectType}, "
+        + "#{item.auditInfo}, "
+        + "#{item.currentVersion}, "
+        + "#{item.lastVersion}, "
+        + "#{item.deletedAt})"
+        + "</foreach>"
+        + " ON CONFLICT (statistic_name, metadata_object_id, deleted_at)"
+        + " DO UPDATE SET "
+        + "  statistic_value = EXCLUDED.statistic_value,"
+        + "  audit_info = EXCLUDED.audit_info,"
+        + "  current_version = EXCLUDED.current_version,"
+        + "  last_version = EXCLUDED.last_version,"
+        + "  deleted_at = EXCLUDE.deleted_at"
+        + "</script>";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
new file mode 100644
index 0000000000..51dd1e0066
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.StatisticEntity;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.storage.relational.utils.POConverters;
+
+@Getter
+public class StatisticPO {
+  private Long metalakeId;
+  private Long statisticId;
+
+  private String statisticName;
+
+  private String statisticValue;
+  private Long metadataObjectId;
+
+  private String metadataObjectType;
+
+  private String auditInfo;
+
+  private Long currentVersion;
+  private Long lastVersion;
+  private Long deletedAt;
+
+  private StatisticPO() {}
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static StatisticEntity fromStatisticPO(StatisticPO statisticPO) {
+    try {
+      return StatisticEntity.builder()
+          .withId(statisticPO.getStatisticId())
+          .withName(statisticPO.getStatisticName())
+          .withValue(
+              JsonUtils.anyFieldMapper()
+                  .readValue(statisticPO.getStatisticValue(), 
StatisticValue.class))
+          .withAuditInfo(
+              JsonUtils.anyFieldMapper().readValue(statisticPO.getAuditInfo(), 
AuditInfo.class))
+          .build();
+    } catch (JsonProcessingException je) {
+      throw new RuntimeException("Failed to deserialize json object: ", je);
+    }
+  }
+
+  public static List<StatisticPO> initializeStatisticPOs(
+      List<StatisticEntity> statisticEntities,
+      Long metalakeId,
+      Long objectId,
+      MetadataObject.Type objectType) {
+    return statisticEntities.stream()
+        .map(
+            statisticEntity -> {
+              try {
+                return builder()
+                    .withMetalakeId(metalakeId)
+                    .withMetadataObjectId(objectId)
+                    .withMetadataObjectType(objectType.name())
+                    .withStatisticId(statisticEntity.id())
+                    .withStatisticName(statisticEntity.name())
+                    .withStatisticValue(
+                        
JsonUtils.anyFieldMapper().writeValueAsString(statisticEntity.value()))
+                    .withDeletedAt(POConverters.DEFAULT_DELETED_AT)
+                    .withCurrentVersion(POConverters.INIT_VERSION)
+                    .withLastVersion(POConverters.INIT_VERSION)
+                    .withAuditInfo(
+                        
JsonUtils.anyFieldMapper().writeValueAsString(statisticEntity.auditInfo()))
+                    .build();
+              } catch (JsonProcessingException e) {
+                throw new RuntimeException("Failed to serialize json object:", 
e);
+              }
+            })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof StatisticPO)) {
+      return false;
+    }
+    StatisticPO that = (StatisticPO) o;
+    return statisticId.equals(that.statisticId)
+        && metadataObjectId.equals(that.metadataObjectId)
+        && metalakeId.equals(that.metalakeId)
+        && metadataObjectType.equals(that.metadataObjectType)
+        && statisticName.equals(that.statisticName)
+        && statisticValue.equals(that.statisticValue)
+        && auditInfo.equals(that.auditInfo)
+        && currentVersion.equals(that.currentVersion)
+        && lastVersion.equals(that.lastVersion)
+        && deletedAt.equals(that.deletedAt);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        metalakeId,
+        statisticId,
+        metadataObjectId,
+        metadataObjectType,
+        statisticName,
+        statisticValue,
+        auditInfo,
+        currentVersion,
+        lastVersion,
+        deletedAt);
+  }
+
+  public static class Builder {
+
+    private final StatisticPO statisticPO;
+
+    public Builder() {
+      this.statisticPO = new StatisticPO();
+    }
+
+    public Builder withMetalakeId(Long metalakeId) {
+      statisticPO.metalakeId = metalakeId;
+      return this;
+    }
+
+    public Builder withStatisticId(Long statisticId) {
+      statisticPO.statisticId = statisticId;
+      return this;
+    }
+
+    public Builder withMetadataObjectId(Long objectId) {
+      statisticPO.metadataObjectId = objectId;
+      return this;
+    }
+
+    public Builder withMetadataObjectType(String objectType) {
+      statisticPO.metadataObjectType = objectType;
+      return this;
+    }
+
+    public Builder withStatisticName(String statisticName) {
+      statisticPO.statisticName = statisticName;
+      return this;
+    }
+
+    public Builder withStatisticValue(String value) {
+      statisticPO.statisticValue = value;
+      return this;
+    }
+
+    public Builder withAuditInfo(String auditInfo) {
+      statisticPO.auditInfo = auditInfo;
+      return this;
+    }
+
+    public Builder withCurrentVersion(Long currentVersion) {
+      statisticPO.currentVersion = currentVersion;
+      return this;
+    }
+
+    public Builder withLastVersion(Long lastVersion) {
+      statisticPO.lastVersion = lastVersion;
+      return this;
+    }
+
+    public Builder withDeletedAt(Long deletedAt) {
+      statisticPO.deletedAt = deletedAt;
+      return this;
+    }
+
+    public StatisticPO build() {
+      Preconditions.checkArgument(statisticPO.metadataObjectId != null, 
"`objectId is required");
+      Preconditions.checkArgument(
+          statisticPO.metadataObjectType != null, "`objectType` is required");
+      Preconditions.checkArgument(statisticPO.statisticId != null, 
"`statisticId` is required");
+      Preconditions.checkArgument(statisticPO.statisticName != null, 
"`statisticName` is required");
+      Preconditions.checkArgument(statisticPO.statisticValue != null, "`value` 
is required");
+      Preconditions.checkArgument(statisticPO.auditInfo != null, "`auditInfo` 
is required");
+      Preconditions.checkArgument(statisticPO.metalakeId != null, 
"`metalakeId` is required");
+      Preconditions.checkArgument(statisticPO.deletedAt != null, "`deletedAt` 
is required");
+      Preconditions.checkArgument(statisticPO.lastVersion != null, 
"`lastVersion` is required");
+      Preconditions.checkArgument(
+          statisticPO.currentVersion != null, "`currentVersion` is required");
+      return statisticPO;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
index a297c47afb..e4d08f4130 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
@@ -44,6 +44,7 @@ import 
org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
@@ -274,7 +275,11 @@ public class CatalogMetaService {
           () ->
               SessionUtils.doWithoutCommit(
                   ModelMetaMapper.class,
-                  mapper -> 
mapper.softDeleteModelMetasByCatalogId(catalogId)));
+                  mapper -> mapper.softDeleteModelMetasByCatalogId(catalogId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  StatisticMetaMapper.class,
+                  mapper -> 
mapper.softDeleteStatisticsByCatalogId(catalogId)));
     } else {
       List<SchemaEntity> schemaEntities =
           SchemaMetaService.getInstance()
@@ -307,6 +312,10 @@ public class CatalogMetaService {
                   mapper ->
                       mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
                           catalogId, MetadataObject.Type.CATALOG.name())),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  StatisticMetaMapper.class,
+                  mapper -> mapper.softDeleteStatisticsByEntityId(catalogId)),
           () ->
               SessionUtils.doWithoutCommit(
                   PolicyMetadataObjectRelMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
index 669acbbe87..246414609b 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
@@ -35,6 +35,7 @@ import 
org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.po.FilesetMaxVersionPO;
 import org.apache.gravitino.storage.relational.po.FilesetPO;
@@ -262,6 +263,10 @@ public class FilesetMetaService {
                 mapper ->
                     mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
                         filesetId, MetadataObject.Type.FILESET.name())),
+        () ->
+            SessionUtils.doWithoutCommit(
+                StatisticMetaMapper.class,
+                mapper -> mapper.softDeleteStatisticsByEntityId(filesetId)),
         () ->
             SessionUtils.doWithoutCommit(
                 PolicyMetadataObjectRelMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
index 6c2b6b7c57..74fb01b1c6 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -48,6 +48,7 @@ import 
org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper;
 import org.apache.gravitino.storage.relational.mapper.RoleMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TagMetaMapper;
@@ -271,6 +272,10 @@ public class MetalakeMetaService {
                 SessionUtils.doWithoutCommit(
                     ModelMetaMapper.class,
                     mapper -> 
mapper.softDeleteModelMetasByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    StatisticMetaMapper.class,
+                    mapper -> 
mapper.softDeleteStatisticsByMetalakeId(metalakeId)),
             () ->
                 SessionUtils.doWithoutCommit(
                     JobTemplateMetaMapper.class,
@@ -328,6 +333,10 @@ public class MetalakeMetaService {
                 SessionUtils.doWithoutCommit(
                     OwnerMetaMapper.class,
                     mapper -> 
mapper.softDeleteOwnerRelByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    StatisticMetaMapper.class,
+                    mapper -> 
mapper.softDeleteStatisticsByMetalakeId(metalakeId)),
             () ->
                 SessionUtils.doWithoutCommit(
                     JobTemplateMetaMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
index 5b98be0504..2a5db664c7 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
@@ -40,6 +40,7 @@ import 
org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.po.ModelPO;
 import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
@@ -158,6 +159,10 @@ public class ModelMetaService {
                 mapper ->
                     mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
                         modelId, MetadataObject.Type.MODEL.name())),
+        () ->
+            SessionUtils.doWithoutCommit(
+                StatisticMetaMapper.class,
+                mapper -> mapper.softDeleteStatisticsByEntityId(modelId)),
         () ->
             SessionUtils.doWithoutCommit(
                 PolicyMetadataObjectRelMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index e3f954239d..a793af4e5c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -44,6 +44,7 @@ import 
org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
@@ -257,7 +258,11 @@ public class SchemaMetaService {
             () ->
                 SessionUtils.doWithoutCommit(
                     ModelMetaMapper.class,
-                    mapper -> 
mapper.softDeleteModelMetasBySchemaId(schemaId)));
+                    mapper -> mapper.softDeleteModelMetasBySchemaId(schemaId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    StatisticMetaMapper.class,
+                    mapper -> 
mapper.softDeleteStatisticsBySchemaId(schemaId)));
       } else {
         List<TableEntity> tableEntities =
             TableMetaService.getInstance()
@@ -316,6 +321,10 @@ public class SchemaMetaService {
                     mapper ->
                         mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
                             schemaId, MetadataObject.Type.SCHEMA.name())),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    StatisticMetaMapper.class,
+                    mapper -> mapper.softDeleteStatisticsByEntityId(schemaId)),
             () ->
                 SessionUtils.doWithoutCommit(
                     PolicyMetadataObjectRelMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
new file mode 100644
index 0000000000..8b92cc5a25
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.StatisticEntity;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/**
+ * The service class for statistic metadata. It provides the basic database 
operations for
+ * statistic.
+ */
+public class StatisticMetaService {
+
+  private static final StatisticMetaService INSTANCE = new 
StatisticMetaService();
+
+  public static StatisticMetaService getInstance() {
+    return INSTANCE;
+  }
+
+  private StatisticMetaService() {}
+
+  public List<StatisticEntity> listStatisticsByEntity(
+      NameIdentifier identifier, Entity.EntityType type) {
+    long metalakeId =
+        MetalakeMetaService.getInstance()
+            .getMetalakeIdByName(NameIdentifierUtil.getMetalake(identifier));
+    MetadataObject object = NameIdentifierUtil.toMetadataObject(identifier, 
type);
+    long entityId =
+        MetadataObjectService.getMetadataObjectId(metalakeId, 
object.fullName(), object.type());
+    List<StatisticPO> statisticPOs =
+        SessionUtils.getWithoutCommit(
+            StatisticMetaMapper.class,
+            mapper -> mapper.listStatisticPOsByEntityId(metalakeId, entityId));
+    return 
statisticPOs.stream().map(StatisticPO::fromStatisticPO).collect(Collectors.toList());
+  }
+
+  public void batchInsertStatisticPOsOnDuplicateKeyUpdate(
+      List<StatisticEntity> statisticEntities,
+      String metalake,
+      NameIdentifier entity,
+      Entity.EntityType type) {
+    if (statisticEntities == null || statisticEntities.isEmpty()) {
+      return;
+    }
+    Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+    MetadataObject object = NameIdentifierUtil.toMetadataObject(entity, type);
+    Long entityId =
+        MetadataObjectService.getMetadataObjectId(metalakeId, 
object.fullName(), object.type());
+
+    List<StatisticPO> pos =
+        StatisticPO.initializeStatisticPOs(statisticEntities, metalakeId, 
entityId, object.type());
+    SessionUtils.doWithCommit(
+        StatisticMetaMapper.class,
+        mapper -> mapper.batchInsertStatisticPOsOnDuplicateKeyUpdate(pos));
+  }
+
+  public int batchDeleteStatisticPOs(
+      NameIdentifier identifier, Entity.EntityType type, List<String> 
statisticNames) {
+    if (statisticNames == null || statisticNames.isEmpty()) {
+      return 0;
+    }
+    Long metalakeId =
+        MetalakeMetaService.getInstance()
+            .getMetalakeIdByName(NameIdentifierUtil.getMetalake(identifier));
+    MetadataObject object = NameIdentifierUtil.toMetadataObject(identifier, 
type);
+    Long entityId =
+        MetadataObjectService.getMetadataObjectId(metalakeId, 
object.fullName(), object.type());
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        StatisticMetaMapper.class,
+        mapper -> mapper.batchDeleteStatisticPOs(entityId, statisticNames));
+  }
+
+  public int deleteStatisticsByLegacyTimeline(long legacyTimeline, int limit) {
+    return SessionUtils.doWithCommitAndFetchResult(
+        StatisticMetaMapper.class,
+        mapper -> mapper.deleteStatisticsByLegacyTimeline(legacyTimeline, 
limit));
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index d4a93c1a9b..4d2a979342 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -35,6 +35,7 @@ import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.po.ColumnPO;
@@ -240,6 +241,10 @@ public class TableMetaService {
             SessionUtils.doWithoutCommit(
                 TagMetadataObjectRelMapper.class,
                 mapper -> 
mapper.softDeleteTagMetadataObjectRelsByTableId(tableId));
+
+            SessionUtils.doWithoutCommit(
+                StatisticMetaMapper.class,
+                mapper -> mapper.softDeleteStatisticsByEntityId(tableId));
             SessionUtils.doWithoutCommit(
                 PolicyMetadataObjectRelMapper.class,
                 mapper -> 
mapper.softDeletePolicyMetadataObjectRelsByTableId(tableId));
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
index 8f3e7eedd5..bd7f46763a 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
@@ -33,6 +33,7 @@ import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
 import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
 import org.apache.gravitino.storage.relational.po.TopicPO;
@@ -205,6 +206,10 @@ public class TopicMetaService {
                 mapper ->
                     mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
                         topicId, MetadataObject.Type.TOPIC.name())),
+        () ->
+            SessionUtils.doWithoutCommit(
+                StatisticMetaMapper.class,
+                mapper -> mapper.softDeleteStatisticsByEntityId(topicId)),
         () ->
             SessionUtils.doWithoutCommit(
                 PolicyMetadataObjectRelMapper.class,
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index 10d7ec4f13..c1fd511807 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -547,4 +547,17 @@ public class NameIdentifierUtil {
   public static NameIdentifier ofGroup(String metalake, String groupName) {
     return NameIdentifier.of(NamespaceUtil.ofGroup(metalake), groupName);
   }
+
+  /**
+   * Create a statistic {@link NameIdentifier} from the given identifier and 
name. The statistic
+   * belongs to the given identifier. For example, if the identifier is a 
table identifier, the
+   * statistic will be created for that table.
+   *
+   * @param entityIdent The identifier to use.
+   * @param name The name of the statistic
+   * @return The created statistic of {@link NameIdentifier}
+   */
+  public static NameIdentifier ofStatistic(NameIdentifier entityIdent, String 
name) {
+    return NameIdentifier.of(Namespace.fromString(entityIdent.toString()), 
name);
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index 56337a18aa..4b00d377a1 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -47,6 +47,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Instant;
@@ -1387,6 +1388,70 @@ public class TestJDBCBackend {
     }
   }
 
+  protected Integer countAllStats(Long metalakeId) {
+    try (SqlSession sqlSession =
+            
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+        Connection connection = sqlSession.getConnection();
+        Statement statement1 = connection.createStatement();
+        ResultSet rs1 =
+            statement1.executeQuery(
+                String.format(
+                    "SELECT count(*) FROM statistic_meta WHERE metalake_id = 
%d", metalakeId))) {
+      if (rs1.next()) {
+        return rs1.getInt(1);
+      } else {
+        throw new RuntimeException("Doesn't contain data");
+      }
+    } catch (SQLException se) {
+      throw new RuntimeException("SQL execution failed", se);
+    }
+  }
+
+  protected Integer countActiveStats(Long metalakeId) {
+    try (SqlSession sqlSession =
+            
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+        Connection connection = sqlSession.getConnection();
+        Statement statement1 = connection.createStatement();
+        ResultSet rs1 =
+            statement1.executeQuery(
+                String.format(
+                    "SELECT count(*) FROM statistic_meta WHERE metalake_id = 
%d AND deleted_at = 0",
+                    metalakeId))) {
+      if (rs1.next()) {
+        return rs1.getInt(1);
+      } else {
+        throw new RuntimeException("Doesn't contain data");
+      }
+    } catch (SQLException se) {
+      throw new RuntimeException("SQL execution failed", se);
+    }
+  }
+
+  protected Integer testStats(Long metalakeId) {
+    try (SqlSession sqlSession =
+            
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+        Connection connection = sqlSession.getConnection();
+        Statement statement1 = connection.createStatement();
+        ResultSet rs1 =
+            statement1.executeQuery(
+                String.format(
+                    "SELECT * FROM statistic_meta WHERE metalake_id = %d AND 
deleted_at = 0",
+                    metalakeId))) {
+      ResultSetMetaData metaData = rs1.getMetaData();
+      int columnCount = metaData.getColumnCount();
+      while (rs1.next()) {
+        for (int index = 0; index < columnCount; index++) {
+          String columnName = metaData.getColumnName(index + 1);
+          Object value = rs1.getObject(index + 1);
+          System.out.printf("Column: %s, Value: %s%n", columnName, value);
+        }
+      }
+    } catch (SQLException se) {
+      throw new RuntimeException("SQL execution failed", se);
+    }
+    return 1;
+  }
+
   protected Integer countAllTagRel(Long tagId) {
     try (SqlSession sqlSession =
             
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
new file mode 100644
index 0000000000..a45705377d
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.collect.Lists;
+import java.time.Instant;
+import java.util.List;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.StatisticEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TopicEntity;
+import org.apache.gravitino.stats.StatisticValues;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestStatisticMetaService extends TestJDBCBackend {
+  StatisticMetaService statisticMetaService = 
StatisticMetaService.getInstance();
+
+  @Test
+  public void testStatisticsLifeCycle() throws Exception {
+    String metalakeName = "metalake";
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    CatalogEntity catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"), 
"catalog", auditInfo);
+    backend.insert(catalog, false);
+
+    SchemaEntity schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog"),
+            "schema",
+            auditInfo);
+    backend.insert(schema, false);
+
+    TableEntity table =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "table",
+            auditInfo);
+    backend.insert(table, false);
+
+    List<StatisticEntity> statisticEntities = Lists.newArrayList();
+    StatisticEntity statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, table.nameIdentifier(), 
Entity.EntityType.TABLE);
+
+    List<StatisticEntity> listEntities =
+        statisticMetaService.listStatisticsByEntity(
+            table.nameIdentifier(), Entity.EntityType.TABLE);
+    Assertions.assertEquals(1, listEntities.size());
+    Assertions.assertEquals("test", listEntities.get(0).name());
+    Assertions.assertEquals(100L, listEntities.get(0).value().value());
+
+    // Update the duplicated key
+    statisticEntity = createStatisticEntity(auditInfo, 200L);
+    statisticEntities.clear();
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, table.nameIdentifier(), 
Entity.EntityType.TABLE);
+
+    listEntities =
+        statisticMetaService.listStatisticsByEntity(
+            table.nameIdentifier(), Entity.EntityType.TABLE);
+    Assertions.assertEquals(1, listEntities.size());
+    Assertions.assertEquals("test", listEntities.get(0).name());
+    Assertions.assertEquals(200L, listEntities.get(0).value().value());
+
+    List<String> names = Lists.newArrayList(statisticEntity.name());
+    statisticMetaService.batchDeleteStatisticPOs(table.nameIdentifier(), 
table.type(), names);
+    listEntities =
+        statisticMetaService.listStatisticsByEntity(table.nameIdentifier(), 
table.type());
+    Assertions.assertEquals(0, listEntities.size());
+  }
+
+  @Test
+  public void testDeleteMetadataObject() throws Exception {
+    String metalakeName = "metalake";
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    CatalogEntity catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"), 
"catalog", auditInfo);
+    backend.insert(catalog, false);
+
+    SchemaEntity schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog"),
+            "schema",
+            auditInfo);
+    backend.insert(schema, false);
+
+    FilesetEntity fileset =
+        createFilesetEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "fileset",
+            auditInfo);
+    backend.insert(fileset, false);
+    TableEntity table =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "table",
+            auditInfo);
+    backend.insert(table, false);
+    TopicEntity topic =
+        createTopicEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "topic",
+            auditInfo);
+    backend.insert(topic, false);
+    ModelEntity model =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "model",
+            "comment",
+            1,
+            null,
+            auditInfo);
+    backend.insert(model, false);
+
+    // insert stats
+    List<StatisticEntity> statisticEntities = Lists.newArrayList();
+    StatisticEntity statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, table.nameIdentifier(), 
Entity.EntityType.TABLE);
+
+    statisticEntities.clear();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, topic.nameIdentifier(), 
Entity.EntityType.TOPIC);
+
+    statisticEntities.clear();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, fileset.nameIdentifier(), 
Entity.EntityType.FILESET);
+
+    statisticEntities.clear();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, model.nameIdentifier(), 
Entity.EntityType.MODEL);
+
+    // assert stats
+    Assertions.assertEquals(4, countActiveStats(metalake.id()));
+    Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+    // Test to delete model
+    ModelMetaService.getInstance().deleteModel(model.nameIdentifier());
+
+    // assert stats
+    Assertions.assertEquals(3, countActiveStats(metalake.id()));
+    Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+    // Test to delete table
+    TableMetaService.getInstance().deleteTable(table.nameIdentifier());
+    // assert stats
+    Assertions.assertEquals(2, countActiveStats(metalake.id()));
+    Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+    // Test to delete topic
+    TopicMetaService.getInstance().deleteTopic(topic.nameIdentifier());
+    // assert stats
+    Assertions.assertEquals(1, countActiveStats(metalake.id()));
+    Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+    // Test to delete fileset
+    FilesetMetaService.getInstance().deleteFileset(fileset.nameIdentifier());
+    // assert stats
+    Assertions.assertEquals(0, countActiveStats(metalake.id()));
+    Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+    // Test to delete schema
+    SchemaMetaService.getInstance().deleteSchema(schema.nameIdentifier(), 
false);
+    // assert stats
+    Assertions.assertEquals(0, countActiveStats(metalake.id()));
+    Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+    // Test to delete catalog
+    CatalogMetaService.getInstance().deleteCatalog(catalog.nameIdentifier(), 
false);
+    // assert stats
+    Assertions.assertEquals(0, countActiveStats(metalake.id()));
+    Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+    // Test to delete catalog with cascade mode
+    catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"), 
"catalog", auditInfo);
+    backend.insert(catalog, false);
+
+    schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog"),
+            "schema",
+            auditInfo);
+    backend.insert(schema, false);
+
+    fileset =
+        createFilesetEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "fileset",
+            auditInfo);
+    backend.insert(fileset, false);
+    table =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "table",
+            auditInfo);
+    backend.insert(table, false);
+
+    topic =
+        createTopicEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "topic",
+            auditInfo);
+    backend.insert(topic, false);
+
+    model =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "model",
+            "comment",
+            1,
+            null,
+            auditInfo);
+    backend.insert(model, false);
+    // insert stats
+    statisticEntities.clear();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, table.nameIdentifier(), 
Entity.EntityType.TABLE);
+
+    statisticEntities.clear();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, topic.nameIdentifier(), 
Entity.EntityType.TOPIC);
+
+    statisticEntities.clear();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, fileset.nameIdentifier(), 
Entity.EntityType.FILESET);
+
+    statisticEntities.clear();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, model.nameIdentifier(), 
Entity.EntityType.MODEL);
+
+    // assert stats
+    Assertions.assertEquals(4, countActiveStats(metalake.id()));
+    Assertions.assertEquals(8, countAllStats(metalake.id()));
+
+    CatalogMetaService.getInstance().deleteCatalog(catalog.nameIdentifier(), 
true);
+
+    // assert stats
+    Assertions.assertEquals(0, countActiveStats(metalake.id()));
+    Assertions.assertEquals(8, countAllStats(metalake.id()));
+
+    // Test to delete schema with cascade mode
+    catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"), 
"catalog", auditInfo);
+    backend.insert(catalog, false);
+
+    schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog"),
+            "schema",
+            auditInfo);
+    backend.insert(schema, false);
+
+    fileset =
+        createFilesetEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "fileset",
+            auditInfo);
+    backend.insert(fileset, false);
+    table =
+        createTableEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "table",
+            auditInfo);
+    backend.insert(table, false);
+    topic =
+        createTopicEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "topic",
+            auditInfo);
+    backend.insert(topic, false);
+    model =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of("metalake", "catalog", "schema"),
+            "model",
+            "comment",
+            1,
+            null,
+            auditInfo);
+    backend.insert(model, false);
+
+    // insert stats
+    statisticEntities = Lists.newArrayList();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, table.nameIdentifier(), 
Entity.EntityType.TABLE);
+
+    statisticEntities.clear();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, topic.nameIdentifier(), 
Entity.EntityType.TOPIC);
+
+    statisticEntities.clear();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, fileset.nameIdentifier(), 
Entity.EntityType.FILESET);
+
+    statisticEntities.clear();
+    statisticEntity = createStatisticEntity(auditInfo, 100L);
+    statisticEntities.add(statisticEntity);
+    statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+        statisticEntities, metalakeName, model.nameIdentifier(), 
Entity.EntityType.MODEL);
+
+    // assert stats count
+    Assertions.assertEquals(4, countActiveStats(metalake.id()));
+    Assertions.assertEquals(12, countAllStats(metalake.id()));
+
+    // delete object
+    SchemaMetaService.getInstance().deleteSchema(schema.nameIdentifier(), 
true);
+
+    // assert stats count
+    Assertions.assertEquals(0, countActiveStats(metalake.id()));
+    Assertions.assertEquals(12, countAllStats(metalake.id()));
+  }
+
+  private static StatisticEntity createStatisticEntity(AuditInfo auditInfo, 
long value) {
+    return StatisticEntity.builder()
+        .withId(RandomIdGenerator.INSTANCE.nextId())
+        .withName("test")
+        .withValue(StatisticValues.longValue(value))
+        .withAuditInfo(auditInfo)
+        .build();
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
index 2397ca99e5..908096236e 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
@@ -57,6 +57,7 @@ import org.apache.gravitino.meta.ModelVersionEntity;
 import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
+import org.apache.gravitino.meta.StatisticEntity;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.meta.TopicEntity;
@@ -66,6 +67,7 @@ import org.apache.gravitino.rel.expressions.Expression;
 import org.apache.gravitino.rel.expressions.literals.Literals;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.StatisticValues;
 import org.apache.gravitino.storage.relational.po.CatalogPO;
 import org.apache.gravitino.storage.relational.po.ColumnPO;
 import org.apache.gravitino.storage.relational.po.FilesetPO;
@@ -78,6 +80,7 @@ import org.apache.gravitino.storage.relational.po.OwnerRelPO;
 import org.apache.gravitino.storage.relational.po.PolicyPO;
 import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
 import org.apache.gravitino.storage.relational.po.TablePO;
 import org.apache.gravitino.storage.relational.po.TagMetadataObjectRelPO;
 import org.apache.gravitino.storage.relational.po.TagPO;
@@ -1211,6 +1214,56 @@ public class TestPOConverters {
     assertEquals(expectedModelVersionWithNull, convertedModelVersionWithNull);
   }
 
+  @Test
+  public void testStatisticPO() throws JsonProcessingException {
+    List<StatisticEntity> statisticEntities = Lists.newArrayList();
+    statisticEntities.add(
+        StatisticEntity.builder()
+            .withId(1L)
+            .withName("test_statistic")
+            .withNamespace(
+                NameIdentifierUtil.ofStatistic(
+                        NameIdentifierUtil.ofTable("test", "test", "test", 
"test"), "test")
+                    .namespace())
+            .withValue(StatisticValues.stringValue("test"))
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build())
+            .build());
+
+    List<StatisticPO> statisticPOs =
+        StatisticPO.initializeStatisticPOs(statisticEntities, 1L, 1L, 
MetadataObject.Type.CATALOG);
+
+    assertEquals(1, statisticPOs.get(0).getCurrentVersion());
+    assertEquals(1, statisticPOs.get(0).getLastVersion());
+    assertEquals(0, statisticPOs.get(0).getDeletedAt());
+    assertEquals("\"test\"", statisticPOs.get(0).getStatisticValue());
+    assertEquals("test_statistic", statisticPOs.get(0).getStatisticName());
+
+    StatisticPO statisticPO =
+        StatisticPO.builder()
+            .withStatisticId(1L)
+            .withLastVersion(1L)
+            .withCurrentVersion(1L)
+            .withStatisticName("test")
+            .withStatisticValue("\"test\"")
+            .withMetadataObjectId(1L)
+            .withMetadataObjectType("CATALOG")
+            .withDeletedAt(0L)
+            .withMetalakeId(1L)
+            .withAuditInfo(
+                JsonUtils.anyFieldMapper()
+                    .writeValueAsString(
+                        AuditInfo.builder()
+                            .withCreator("creator")
+                            .withCreateTime(FIX_INSTANT)
+                            .build()))
+            .build();
+    StatisticEntity entity = StatisticPO.fromStatisticPO(statisticPO);
+    Assertions.assertEquals(1L, entity.id());
+    Assertions.assertEquals("test", entity.name());
+    Assertions.assertEquals("test", entity.value().value());
+  }
+
   private static BaseMetalake createMetalake(Long id, String name, String 
comment) {
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
diff --git a/scripts/h2/schema-1.0.0-h2.sql b/scripts/h2/schema-1.0.0-h2.sql
index a245174735..d159e5de8d 100644
--- a/scripts/h2/schema-1.0.0-h2.sql
+++ b/scripts/h2/schema-1.0.0-h2.sql
@@ -384,6 +384,24 @@ CREATE TABLE IF NOT EXISTS `policy_relation_meta` (
     KEY `idx_prmid` (`metadata_object_id`)
 ) ENGINE=InnoDB;
 
+CREATE TABLE IF NOT EXISTS `statistic_meta` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `statistic_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'statistic id',
+    `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value',
+    `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object 
id',
+    `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'statistic audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'statistic 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_si_mi_mo_del` (`statistic_name`, `metadata_object_id`, 
`deleted_at`),
+    KEY `idx_stid` (`statistic_id`),
+    KEY `idx_moid` (`metadata_object_id`)
+) ENGINE=InnoDB;
+
 CREATE TABLE IF NOT EXISTS `job_template_meta` (
     `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
     `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
diff --git a/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql 
b/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
index 7ceca25db4..071cde1146 100644
--- a/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
+++ b/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
@@ -69,6 +69,24 @@ ALTER TABLE `model_version_info` ADD CONSTRAINT 
`uk_mid_ver_uri_del` UNIQUE (`mo
 -- remove the default value for model_version_uri_name
 ALTER TABLE `model_version_info` ALTER COLUMN `model_version_uri_name` DROP 
DEFAULT;
 
+CREATE TABLE IF NOT EXISTS `statistic_meta` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `statistic_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'statistic id',
+    `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value',
+    `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object 
id',
+    `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'statistic audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'statistic 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_si_mi_mo_del` (`statistic_name`, `metadata_object_id`, 
`deleted_at`),
+    KEY `idx_stid` (`statistic_id`),
+    KEY `idx_moid` (`metadata_object_id`)
+) ENGINE=InnoDB;
+
 CREATE TABLE IF NOT EXISTS `job_template_meta` (
     `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
     `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
diff --git a/scripts/mysql/schema-1.0.0-mysql.sql 
b/scripts/mysql/schema-1.0.0-mysql.sql
index 07c14ad304..06fa8c33b7 100644
--- a/scripts/mysql/schema-1.0.0-mysql.sql
+++ b/scripts/mysql/schema-1.0.0-mysql.sql
@@ -375,6 +375,24 @@ CREATE TABLE IF NOT EXISTS `policy_relation_meta` (
     KEY `idx_mid` (`metadata_object_id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'policy 
metadata object relation';
 
+CREATE TABLE IF NOT EXISTS `statistic_meta` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `statistic_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'statistic id',
+    `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value',
+    `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object 
id',
+    `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'statistic audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'statistic 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_si_mi_mo_del` (`statistic_name`, `metadata_object_id`, 
`deleted_at`),
+    KEY `idx_stid` (`statistic_id`),
+    KEY `idx_moid` (`metadata_object_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'statistic 
metadata';
+
 CREATE TABLE IF NOT EXISTS `job_template_meta` (
     `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
     `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
diff --git a/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql 
b/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
index c0890b5aa0..c44c94a41d 100644
--- a/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
+++ b/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
@@ -69,6 +69,24 @@ ALTER TABLE `model_version_info` ADD CONSTRAINT 
`uk_mid_ver_uri_del` UNIQUE KEY
 -- remove the default value for model_version_uri_name
 ALTER TABLE `model_version_info` ALTER COLUMN `model_version_uri_name` DROP 
DEFAULT;
 
+CREATE TABLE IF NOT EXISTS `statistic_meta` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `statistic_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'statistic id',
+    `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value',
+    `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object 
id',
+    `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'statistic audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'statistic 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_si_mi_mo_del` (`statistic_name`, `metadata_object_id`, 
`deleted_at`),
+    KEY `idx_stid` (`statistic_id`),
+    KEY `idx_moid` (`metadata_object_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'statistic 
metadata';
+
 CREATE TABLE IF NOT EXISTS `job_template_meta` (
     `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
     `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
diff --git a/scripts/postgresql/schema-1.0.0-postgresql.sql 
b/scripts/postgresql/schema-1.0.0-postgresql.sql
index d97ace9844..a815248673 100644
--- a/scripts/postgresql/schema-1.0.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.0.0-postgresql.sql
@@ -669,6 +669,36 @@ COMMENT ON COLUMN policy_relation_meta.current_version IS 
'policy relation curre
 COMMENT ON COLUMN policy_relation_meta.last_version IS 'policy relation last 
version';
 COMMENT ON COLUMN policy_relation_meta.deleted_at IS 'policy relation deleted 
at';
 
+CREATE TABLE IF NOT EXISTS statistic_meta (
+    id BIGINT NOT NULL,
+    statistic_id BIGINT NOT NULL,
+    statistic_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    statistic_value TEXT NOT NULL,
+    metadata_object_id BIGINT NOT NULL,
+    metadata_object_type VARCHAR(64) NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (statistic_id),
+    UNIQUE (statistic_name, metadata_object_id, deleted_at)
+);
+
+CREATE INDEX IF NOT EXISTS idx_stid ON statistic_meta (statistic_id);
+CREATE INDEX IF NOT EXISTS idx_moid ON statistic_meta (metadata_object_id);
+COMMENT ON TABLE statistic_meta IS 'statistic metadata';
+COMMENT ON COLUMN statistic_meta.id IS 'auto increment id';
+COMMENT ON COLUMN statistic_meta.statistic_id IS 'statistic id';
+COMMENT ON COLUMN statistic_meta.statistic_name IS 'statistic name';
+COMMENT ON COLUMN statistic_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN statistic_meta.statistic_value IS 'statistic value';
+COMMENT ON COLUMN statistic_meta.metadata_object_id IS 'metadata object id';
+COMMENT ON COLUMN statistic_meta.metadata_object_type IS 'metadata object 
type';
+COMMENT ON COLUMN statistic_meta.audit_info IS 'statistic audit info';
+COMMENT ON COLUMN statistic_meta.current_version IS 'statistic current 
version';
+COMMENT ON COLUMN statistic_meta.last_version IS 'statistic last version';
+COMMENT ON COLUMN statistic_meta.deleted_at IS 'statistic deleted at';
 
 CREATE TABLE IF NOT EXISTS job_template_meta (
     job_template_id BIGINT NOT NULL,
diff --git a/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql 
b/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
index a95ad69517..22c07c15a0 100644
--- a/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
@@ -105,6 +105,32 @@ ALTER TABLE model_version_info ADD CONSTRAINT 
uk_mid_ver_uri_del UNIQUE (model_i
 -- remove the default value for model_version_uri_name
 ALTER TABLE model_version_info ALTER COLUMN model_version_uri_name DROP 
DEFAULT;
 
+CREATE TABLE IF NOT EXISTS statistic_meta (
+    id BIGINT NOT NULL,
+    statistic_id BIGINT NOT NULL,
+    statistic_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    statistic_value TEXT NOT NULL,
+    metadata_object_id BIGINT NOT NULL,
+    metadata_object_type VARCHAR(64) NOT NULL,
+    PRIMARY KEY (statistic_id),
+    UNIQUE (statistic_name, metadata_object_id, deleted_at)
+);
+
+CREATE INDEX IF NOT EXISTS idx_stid ON statistic_meta (statistic_id);
+CREATE INDEX IF NOT EXISTS idx_moid ON statistic_meta (metadata_object_id);
+COMMENT ON TABLE statistic_meta IS 'statistic metadata';
+COMMENT ON COLUMN statistic_meta.id IS 'auto increment id';
+COMMENT ON COLUMN statistic_meta.statistic_id IS 'statistic id';
+COMMENT ON COLUMN statistic_meta.statistic_name IS 'statistic name';
+COMMENT ON COLUMN statistic_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN statistic_meta.statistic_value IS 'statistic value';
+COMMENT ON COLUMN statistic_meta.metadata_object_id IS 'metadata object id';
+COMMENT ON COLUMN statistic_meta.metadata_object_type IS 'metadata object 
type';
+COMMENT ON COLUMN statistic_meta.audit_info IS 'statistic audit info';
+COMMENT ON COLUMN statistic_meta.current_version IS 'statistic current 
version';
+COMMENT ON COLUMN statistic_meta.last_version IS 'statistic last version';
+COMMENT ON COLUMN statistic_meta.deleted_at IS 'statistic deleted at';
 
 CREATE TABLE IF NOT EXISTS job_template_meta (
     job_template_id BIGINT NOT NULL,

Reply via email to