This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new ea2a1fab3e [#7269] feat(storage): Add storage for statistics (#7690)
ea2a1fab3e is described below
commit ea2a1fab3ee3b6f331765c06ba26e77fdc19e747
Author: roryqi <[email protected]>
AuthorDate: Tue Aug 12 10:44:46 2025 +0800
[#7269] feat(storage): Add storage for statistics (#7690)
### What changes were proposed in this pull request?
Add storage for statistics
### Why are the changes needed?
Fix: #7269
### Does this PR introduce _any_ user-facing change?
Add storage for statistics
### How was this patch tested?
Add UT
---
.../java/org/apache/gravitino/json/JsonUtils.java | 96 ++++-
.../org/apache/gravitino/json/TestJsonUtils.java | 62 ++++
.../src/main/java/org/apache/gravitino/Entity.java | 1 +
.../org/apache/gravitino/meta/StatisticEntity.java | 127 +++++++
.../gravitino/storage/relational/JDBCBackend.java | 6 +
.../relational/mapper/StatisticMetaMapper.java | 72 ++++
.../mapper/StatisticSQLProviderFactory.java | 94 +++++
.../provider/base/StatisticBaseSQLProvider.java | 185 ++++++++++
.../SecurableObjectPostgreSQLProvider.java | 4 +-
.../postgresql/StatisticPostgresSQLProvider.java | 62 ++++
.../storage/relational/po/StatisticPO.java | 214 +++++++++++
.../relational/service/CatalogMetaService.java | 11 +-
.../relational/service/FilesetMetaService.java | 5 +
.../relational/service/MetalakeMetaService.java | 9 +
.../relational/service/ModelMetaService.java | 5 +
.../relational/service/SchemaMetaService.java | 11 +-
.../relational/service/StatisticMetaService.java | 103 ++++++
.../relational/service/TableMetaService.java | 5 +
.../relational/service/TopicMetaService.java | 5 +
.../apache/gravitino/utils/NameIdentifierUtil.java | 13 +
.../storage/relational/TestJDBCBackend.java | 65 ++++
.../service/TestStatisticMetaService.java | 402 +++++++++++++++++++++
.../storage/relational/utils/TestPOConverters.java | 53 +++
scripts/h2/schema-1.0.0-h2.sql | 18 +
scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql | 18 +
scripts/mysql/schema-1.0.0-mysql.sql | 18 +
scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql | 18 +
scripts/postgresql/schema-1.0.0-postgresql.sql | 30 ++
.../upgrade-0.9.0-to-1.0.0-postgresql.sql | 26 ++
29 files changed, 1732 insertions(+), 6 deletions(-)
diff --git a/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
b/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
index 68b66220ad..5fd2ab3f8f 100644
--- a/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
+++ b/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
@@ -35,6 +35,7 @@ import com.fasterxml.jackson.databind.cfg.EnumFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.annotations.VisibleForTesting;
@@ -44,6 +45,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
@@ -85,6 +87,8 @@ import
org.apache.gravitino.rel.expressions.sorts.SortDirection;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.types.Type;
import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
/** Utility class for working with JSON data. */
public class JsonUtils {
@@ -301,7 +305,9 @@ public class JsonUtils {
.addDeserializer(Type.class, new TypeDeserializer())
.addSerializer(Type.class, new TypeSerializer())
.addDeserializer(Expression.class, new
ColumnDefaultValueDeserializer())
- .addSerializer(Expression.class, new
ColumnDefaultValueSerializer()));
+ .addSerializer(Expression.class, new
ColumnDefaultValueSerializer())
+ .addDeserializer(StatisticValue.class, new
StatisticValueDeserializer())
+ .addSerializer(StatisticValue.class, new
StatisticValueSerializer()));
}
/**
@@ -1343,6 +1349,94 @@ public class JsonUtils {
}
}
+ /** Custom JSON deserializer for StatisticValue objects. */
+ public static class StatisticValueDeserializer extends
JsonDeserializer<StatisticValue> {
+ @Override
+ public StatisticValue<?> deserialize(JsonParser p, DeserializationContext
ctxt)
+ throws IOException {
+ JsonNode node = p.getCodec().readTree(p);
+ return getStatisticValue(node);
+ }
+ }
+
+ private static StatisticValue<?> getStatisticValue(JsonNode node) throws
IOException {
+ Preconditions.checkArgument(
+ node != null && !node.isNull(), "Cannot parse statistic value from
invalid JSON: %s", node);
+ if (node.isIntegralNumber()) {
+ return StatisticValues.longValue(node.asLong());
+ } else if (node.isFloatingPointNumber()) {
+ return StatisticValues.doubleValue(node.asDouble());
+ } else if (node.isTextual()) {
+ return StatisticValues.stringValue(node.asText());
+ } else if (node.isBoolean()) {
+ return StatisticValues.booleanValue(node.asBoolean());
+ } else if (node.isArray()) {
+ ArrayNode arrayNode = (ArrayNode) node;
+ List<StatisticValue<Object>> values =
Lists.newArrayListWithCapacity(arrayNode.size());
+ for (JsonNode arrayElement : arrayNode) {
+ StatisticValue<?> value = getStatisticValue(arrayElement);
+ if (value != null) {
+ values.add((StatisticValue<Object>) value);
+ }
+ }
+ return StatisticValues.listValue(values);
+ } else if (node.isObject()) {
+ ObjectNode objectNode = (ObjectNode) node;
+ Map<String, StatisticValue<?>> map = Maps.newHashMap();
+ objectNode
+ .fields()
+ .forEachRemaining(
+ entry -> {
+ try {
+ StatisticValue<?> value =
getStatisticValue(entry.getValue());
+ if (value != null) {
+ map.put(entry.getKey(), value);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ return StatisticValues.objectValue(map);
+ } else {
+ throw new UnsupportedEncodingException(
+ String.format("Don't support json node type %s",
node.getNodeType()));
+ }
+ }
+
+ /** Custom JSON serializer for StatisticValue objects. */
+ public static class StatisticValueSerializer extends
JsonSerializer<StatisticValue> {
+
+ @Override
+ public void serialize(StatisticValue value, JsonGenerator gen,
SerializerProvider serializers)
+ throws IOException {
+ if (value.dataType().name() == Type.Name.BOOLEAN) {
+ gen.writeBoolean((Boolean) value.value());
+ } else if (value.dataType().name() == Type.Name.STRING) {
+ gen.writeString((String) value.value());
+ } else if (value.dataType().name() == Type.Name.DOUBLE) {
+ gen.writeNumber((Double) value.value());
+ } else if (value.dataType().name() == Type.Name.LONG) {
+ gen.writeNumber((Long) value.value());
+ } else if (value.dataType().name() == Type.Name.LIST) {
+ gen.writeStartArray();
+ for (StatisticValue<?> element : (List<StatisticValue<?>>)
value.value()) {
+ serialize(element, gen, serializers);
+ }
+ gen.writeEndArray();
+ } else if (value.dataType().name() == Type.Name.STRUCT) {
+ gen.writeStartObject();
+ for (Map.Entry<String, StatisticValue<?>> entry :
+ ((Map<String, StatisticValue<?>>) value.value()).entrySet()) {
+ gen.writeFieldName(entry.getKey());
+ serialize(entry.getValue(), gen, serializers);
+ }
+ gen.writeEndObject();
+ } else {
+ throw new IOException("Unsupported statistic value type: " +
value.dataType());
+ }
+ }
+ }
+
/** Custom JSON serializer for PartitionDTO objects. */
public static class PartitionDTOSerializer extends
JsonSerializer<PartitionDTO> {
@Override
diff --git a/common/src/test/java/org/apache/gravitino/json/TestJsonUtils.java
b/common/src/test/java/org/apache/gravitino/json/TestJsonUtils.java
index ff07635e20..c99af454e3 100644
--- a/common/src/test/java/org/apache/gravitino/json/TestJsonUtils.java
+++ b/common/src/test/java/org/apache/gravitino/json/TestJsonUtils.java
@@ -26,7 +26,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.List;
+import java.util.Map;
import org.apache.gravitino.dto.rel.expressions.LiteralDTO;
import org.apache.gravitino.dto.rel.indexes.IndexDTO;
import org.apache.gravitino.dto.rel.partitions.IdentityPartitionDTO;
@@ -36,6 +39,8 @@ import
org.apache.gravitino.dto.rel.partitions.RangePartitionDTO;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.types.Type;
import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -50,6 +55,8 @@ public class TestJsonUtils {
SimpleModule module = new SimpleModule();
module.addSerializer(Type.class, new JsonUtils.TypeSerializer());
module.addDeserializer(Type.class, new JsonUtils.TypeDeserializer());
+ module.addSerializer(StatisticValue.class, new
JsonUtils.StatisticValueSerializer());
+ module.addDeserializer(StatisticValue.class, new
JsonUtils.StatisticValueDeserializer());
objectMapper.registerModule(module);
}
@@ -458,4 +465,59 @@ public class TestJsonUtils {
objectMapper.readValue(expected, IndexDTO.class),
objectMapper.readValue(jsonValue, IndexDTO.class));
}
+
+ @Test
+ void testStatisticValueSerde() throws JsonProcessingException {
+ String expectJson = "\"1\"";
+ String strValue =
objectMapper.writeValueAsString(StatisticValues.stringValue("1"));
+ Assertions.assertEquals(strValue, expectJson);
+ Assertions.assertEquals(
+ objectMapper.readValue(expectJson, StatisticValue.class),
+ objectMapper.readValue(strValue, StatisticValue.class));
+
+ expectJson = "1";
+ String longValue =
objectMapper.writeValueAsString(StatisticValues.longValue(1));
+ Assertions.assertEquals(longValue, expectJson);
+ Assertions.assertEquals(
+ objectMapper.readValue(expectJson, StatisticValue.class),
+ objectMapper.readValue(longValue, StatisticValue.class));
+
+ expectJson = "1.0";
+ String doubleValue =
objectMapper.writeValueAsString(StatisticValues.doubleValue(1.0));
+ Assertions.assertEquals(doubleValue, expectJson);
+ Assertions.assertEquals(
+ objectMapper.readValue(expectJson, StatisticValue.class),
+ objectMapper.readValue(doubleValue, StatisticValue.class));
+
+ expectJson = "true";
+ String booleanValue =
objectMapper.writeValueAsString(StatisticValues.booleanValue(true));
+ Assertions.assertEquals(booleanValue, "true");
+ Assertions.assertEquals(
+ objectMapper.readValue(expectJson, StatisticValue.class),
+ objectMapper.readValue(booleanValue, StatisticValue.class));
+
+ expectJson = "[\"1\",\"2\"]";
+ String listValue =
+ objectMapper.writeValueAsString(
+ StatisticValues.listValue(
+ Lists.newArrayList(
+ StatisticValues.stringValue("1"),
StatisticValues.stringValue("2"))));
+ Assertions.assertEquals(listValue, expectJson);
+ Assertions.assertEquals(
+ objectMapper.readValue(expectJson, StatisticValue.class),
+ objectMapper.readValue(listValue, StatisticValue.class));
+
+ expectJson = "{\"key1\":\"value1\",\"key2\":2}";
+ Map<String, StatisticValue<?>> values = Maps.newHashMap();
+ values.put("key1", StatisticValues.stringValue("value1"));
+ values.put("key2", StatisticValues.longValue(2L));
+
+ String objectValue =
objectMapper.writeValueAsString(StatisticValues.objectValue(values));
+ Assertions.assertEquals(
+ objectValue, expectJson, "Object value serialization should match
expected format");
+
+ Assertions.assertEquals(
+ objectMapper.readValue(expectJson, StatisticValue.class),
+ objectMapper.readValue(objectValue, StatisticValue.class));
+ }
}
diff --git a/core/src/main/java/org/apache/gravitino/Entity.java
b/core/src/main/java/org/apache/gravitino/Entity.java
index c8d973f8c5..d08ddd0011 100644
--- a/core/src/main/java/org/apache/gravitino/Entity.java
+++ b/core/src/main/java/org/apache/gravitino/Entity.java
@@ -77,6 +77,7 @@ public interface Entity extends Serializable {
MODEL,
MODEL_VERSION,
POLICY,
+ STATISTIC,
JOB_TEMPLATE,
JOB,
AUDIT;
diff --git a/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
b/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
new file mode 100644
index 0000000000..284b347a73
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.meta;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.Auditable;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Field;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.stats.StatisticValue;
+
+public class StatisticEntity implements Entity, HasIdentifier, Auditable {
+ public static final Field ID =
+ Field.required("id", Long.class, "The unique identifier of the statistic
entity.");
+ public static final Field NAME =
+ Field.required("name", String.class, "The name of the statistic
entity.");
+ public static final Field VALUE =
+ Field.required("value", StatisticValue.class, "The value of the
statistic entity.");
+ public static final Field AUDIT_INFO =
+ Field.required("audit_info", Audit.class, "The audit details of the
statistic entity.");
+
+ private Long id;
+ private String name;
+ private StatisticValue<?> value;
+ private AuditInfo auditInfo;
+ private Namespace namespace;
+
+ @Override
+ public Audit auditInfo() {
+ return auditInfo;
+ }
+
+ @Override
+ public Map<Field, Object> fields() {
+ Map<Field, Object> fields = Maps.newHashMap();
+ fields.put(ID, id);
+ fields.put(NAME, name);
+ fields.put(VALUE, value);
+ fields.put(AUDIT_INFO, auditInfo);
+ return fields;
+ }
+
+ @Override
+ public EntityType type() {
+ return EntityType.STATISTIC;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Long id() {
+ return id;
+ }
+
+ @Override
+ public Namespace namespace() {
+ return namespace;
+ }
+
+ public StatisticValue<?> value() {
+ return value;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private final StatisticEntity statisticEntity;
+
+ private Builder() {
+ statisticEntity = new StatisticEntity();
+ }
+
+ public Builder withId(Long id) {
+ statisticEntity.id = id;
+ return this;
+ }
+
+ public Builder withName(String name) {
+ statisticEntity.name = name;
+ return this;
+ }
+
+ public Builder withValue(StatisticValue<?> value) {
+ statisticEntity.value = value;
+ return this;
+ }
+
+ public Builder withAuditInfo(AuditInfo auditInfo) {
+ statisticEntity.auditInfo = auditInfo;
+ return this;
+ }
+
+ public Builder withNamespace(Namespace namespace) {
+ statisticEntity.namespace = namespace;
+ return this;
+ }
+
+ public StatisticEntity build() {
+ statisticEntity.validate();
+ return statisticEntity;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 7c92e88a2b..bfe3ac72a9 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -67,6 +67,7 @@ import
org.apache.gravitino.storage.relational.service.OwnerMetaService;
import org.apache.gravitino.storage.relational.service.PolicyMetaService;
import org.apache.gravitino.storage.relational.service.RoleMetaService;
import org.apache.gravitino.storage.relational.service.SchemaMetaService;
+import org.apache.gravitino.storage.relational.service.StatisticMetaService;
import org.apache.gravitino.storage.relational.service.TableColumnMetaService;
import org.apache.gravitino.storage.relational.service.TableMetaService;
import org.apache.gravitino.storage.relational.service.TagMetaService;
@@ -374,6 +375,10 @@ public class JDBCBackend implements RelationalBackend {
return ModelVersionMetaService.getInstance()
.deleteModelVersionMetasByLegacyTimeline(
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
+ case STATISTIC:
+ return StatisticMetaService.getInstance()
+ .deleteStatisticsByLegacyTimeline(
+ legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case JOB_TEMPLATE:
return JobTemplateMetaService.getInstance()
.deleteJobTemplatesByLegacyTimeline(
@@ -408,6 +413,7 @@ public class JDBCBackend implements RelationalBackend {
case TAG:
case MODEL:
case MODEL_VERSION:
+ case STATISTIC:
case JOB_TEMPLATE:
case JOB:
// These entity types have not implemented multi-versions, so we can
skip.
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticMetaMapper.java
new file mode 100644
index 0000000000..0e422e5d6f
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticMetaMapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface StatisticMetaMapper {
+
+ String STATISTIC_META_TABLE_NAME = "statistic_meta";
+
+ @SelectProvider(type = StatisticSQLProviderFactory.class, method =
"listStatisticPOsByEntityId")
+ List<StatisticPO> listStatisticPOsByEntityId(
+ @Param("metalakeId") Long metalakeId, @Param("entityId") long entityId);
+
+ @InsertProvider(
+ type = StatisticSQLProviderFactory.class,
+ method = "batchInsertStatisticPOsOnDuplicateKeyUpdate")
+ void batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ @Param("statisticPOs") List<StatisticPO> statisticPOs);
+
+ @UpdateProvider(type = StatisticSQLProviderFactory.class, method =
"batchDeleteStatisticPOs")
+ Integer batchDeleteStatisticPOs(
+ @Param("entityId") Long entityId, @Param("statisticNames") List<String>
statisticNames);
+
+ @UpdateProvider(
+ type = StatisticSQLProviderFactory.class,
+ method = "softDeleteStatisticsByEntityId")
+ Integer softDeleteStatisticsByEntityId(@Param("entityId") Long entityId);
+
+ @UpdateProvider(
+ type = StatisticSQLProviderFactory.class,
+ method = "softDeleteStatisticsByMetalakeId")
+ Integer softDeleteStatisticsByMetalakeId(@Param("metalakeId") Long
metalakeId);
+
+ @UpdateProvider(
+ type = StatisticSQLProviderFactory.class,
+ method = "softDeleteStatisticsByCatalogId")
+ Integer softDeleteStatisticsByCatalogId(@Param("catalogId") Long catalogId);
+
+ @UpdateProvider(
+ type = StatisticSQLProviderFactory.class,
+ method = "softDeleteStatisticsBySchemaId")
+ Integer softDeleteStatisticsBySchemaId(@Param("schemaId") Long schemaId);
+
+ @DeleteProvider(
+ type = StatisticSQLProviderFactory.class,
+ method = "deleteStatisticsByLegacyTimeline")
+ Integer deleteStatisticsByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticSQLProviderFactory.java
new file mode 100644
index 0000000000..393ecd8618
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/StatisticSQLProviderFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.StatisticBaseSQLProvider;
+import
org.apache.gravitino.storage.relational.mapper.provider.postgresql.StatisticPostgresSQLProvider;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class StatisticSQLProviderFactory {
+
+ static class StatisticMySQLProvider extends StatisticBaseSQLProvider {}
+
+ static class StatisticH2Provider extends StatisticBaseSQLProvider {}
+
+ private static final Map<JDBCBackend.JDBCBackendType,
StatisticBaseSQLProvider>
+ STATISTIC_SQL_PROVIDERS =
+ ImmutableMap.of(
+ JDBCBackend.JDBCBackendType.H2,
+ new StatisticH2Provider(),
+ JDBCBackend.JDBCBackendType.MYSQL,
+ new StatisticMySQLProvider(),
+ JDBCBackend.JDBCBackendType.POSTGRESQL,
+ new StatisticPostgresSQLProvider());
+
+ public static StatisticBaseSQLProvider getProvider() {
+ String databaseId =
+ SqlSessionFactoryHelper.getInstance()
+ .getSqlSessionFactory()
+ .getConfiguration()
+ .getDatabaseId();
+ JDBCBackend.JDBCBackendType jdbcBackendType =
+ JDBCBackend.JDBCBackendType.fromString(databaseId);
+ return STATISTIC_SQL_PROVIDERS.get(jdbcBackendType);
+ }
+
+ public static String batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ @Param("statisticPOs") List<StatisticPO> statisticPOs) {
+ return
getProvider().batchInsertStatisticPOsOnDuplicateKeyUpdate(statisticPOs);
+ }
+
+ public static String batchDeleteStatisticPOs(
+ @Param("entityId") Long entityId, @Param("statisticNames") List<String>
statisticNames) {
+ return getProvider().batchDeleteStatisticPOs(entityId, statisticNames);
+ }
+
+ public static String softDeleteStatisticsByEntityId(@Param("entityId") Long
entityId) {
+ return getProvider().softDeleteStatisticsByEntityId(entityId);
+ }
+
+ public static String listStatisticPOsByEntityId(
+ @Param("metalakeId") Long metalakeId, @Param("entityId") Long entityId) {
+ return getProvider().listStatisticPOsByEntityId(metalakeId, entityId);
+ }
+
+ public static String softDeleteStatisticsByMetalakeId(@Param("metalakeId")
Long metalakeId) {
+ return getProvider().softDeleteStatisticsByMetalakeId(metalakeId);
+ }
+
+ public static String softDeleteStatisticsByCatalogId(@Param("catalogId")
Long catalogId) {
+ return getProvider().softDeleteStatisticsByCatalogId(catalogId);
+ }
+
+ public static String softDeleteStatisticsBySchemaId(@Param("schemaId") Long
schemaId) {
+ return getProvider().softDeleteStatisticsBySchemaId(schemaId);
+ }
+
+ public static String deleteStatisticsByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return getProvider().deleteStatisticsByLegacyTimeline(legacyTimeline,
limit);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/StatisticBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/StatisticBaseSQLProvider.java
new file mode 100644
index 0000000000..a3ffce5254
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/StatisticBaseSQLProvider.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static
org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper.STATISTIC_META_TABLE_NAME;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
+import org.apache.ibatis.annotations.Param;
+
+public class StatisticBaseSQLProvider {
+
+ public String batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ @Param("statisticPOs") List<StatisticPO> statisticPOs) {
+ return "<script>"
+ + "INSERT INTO "
+ + STATISTIC_META_TABLE_NAME
+ + " (statistic_id, statistic_name, statistic_value, metalake_id,
metadata_object_id,"
+ + " metadata_object_type, audit_info, current_version, last_version,
deleted_at) VALUES "
+ + "<foreach collection='statisticPOs' item='item' separator=','>"
+ + "(#{item.statisticId}, "
+ + "#{item.statisticName}, "
+ + "#{item.statisticValue}, "
+ + "#{item.metalakeId}, "
+ + "#{item.metadataObjectId}, "
+ + "#{item.metadataObjectType}, "
+ + "#{item.auditInfo}, "
+ + "#{item.currentVersion}, "
+ + "#{item.lastVersion}, "
+ + "#{item.deletedAt})"
+ + "</foreach>"
+ + " ON DUPLICATE KEY UPDATE "
+ + " statistic_value = VALUES(statistic_value),"
+ + " audit_info = VALUES(audit_info),"
+ + " current_version = VALUES(current_version),"
+ + " last_version = VALUES(last_version),"
+ + " deleted_at = VALUES(deleted_at)"
+ + "</script>";
+ }
+
+ public String batchDeleteStatisticPOs(
+ @Param("entityId") Long entityId, @Param("statisticNames") List<String>
statisticNames) {
+ return "<script>"
+ + "UPDATE "
+ + STATISTIC_META_TABLE_NAME
+ + softDeleteSQL()
+ + " WHERE "
+ + " statistic_name IN ("
+ + "<foreach collection='statisticNames' item='item' separator=','>"
+ + " #{item}"
+ + "</foreach>"
+ + " ) AND deleted_at = 0 AND metadata_object_id = #{entityId}"
+ + "</script>";
+ }
+
+ public String softDeleteStatisticsByEntityId(@Param("entityId") Long
entityId) {
+ return "UPDATE "
+ + STATISTIC_META_TABLE_NAME
+ + softDeleteSQL()
+ + " WHERE metadata_object_id = #{entityId} AND deleted_at = 0";
+ }
+
+ public String listStatisticPOsByEntityId(
+ @Param("metalakeId") Long metalakeId, @Param("entityId") Long entityId) {
+ return "SELECT statistic_id as statisticId, statistic_name as
statisticName, metalake_id as metalakeId,"
+ + " statistic_value as statisticValue, metadata_object_id as
metadataObjectId,"
+ + "metadata_object_type as metadataObjectType, audit_info as
auditInfo,"
+ + "current_version as currentVersion, last_version as lastVersion,
deleted_at as deletedAt FROM "
+ + STATISTIC_META_TABLE_NAME
+ + " WHERE metadata_object_id = #{entityId} AND deleted_at = 0 AND
metalake_id = #{metalakeId}";
+ }
+
+ public String softDeleteStatisticsByMetalakeId(@Param("metalakeId") Long
metalakeId) {
+ return "UPDATE "
+ + STATISTIC_META_TABLE_NAME
+ + " stat "
+ + softDeleteSQL()
+ + " WHERE stat.metalake_id = #{metalakeId} AND stat.deleted_at = 0";
+ }
+
+ public String softDeleteStatisticsByCatalogId(@Param("catalogId") Long
catalogId) {
+ return "UPDATE "
+ + STATISTIC_META_TABLE_NAME
+ + " stat "
+ + softDeleteSQL()
+ + " WHERE stat.deleted_at = 0 AND EXISTS ("
+ + " SELECT ct.catalog_id FROM "
+ + CatalogMetaMapper.TABLE_NAME
+ + " ct WHERE ct.catalog_id = #{catalogId} AND "
+ + " ct.catalog_id = stat.metadata_object_id AND
stat.metadata_object_type = 'CATALOG'"
+ + " UNION "
+ + " SELECT st.catalog_id FROM "
+ + SchemaMetaMapper.TABLE_NAME
+ + " st WHERE st.catalog_id = #{catalogId} AND "
+ + " st.schema_id = stat.metadata_object_id AND
stat.metadata_object_type = 'SCHEMA'"
+ + " UNION "
+ + " SELECT tt.catalog_id FROM "
+ + TopicMetaMapper.TABLE_NAME
+ + " tt WHERE tt.catalog_id = #{catalogId} AND "
+ + " tt.topic_id = stat.metadata_object_id AND
stat.metadata_object_type = 'TOPIC'"
+ + " UNION "
+ + " SELECT tat.catalog_id FROM "
+ + TableMetaMapper.TABLE_NAME
+ + " tat WHERE tat.catalog_id = #{catalogId} AND "
+ + " tat.table_id = stat.metadata_object_id AND
stat.metadata_object_type = 'TABLE'"
+ + " UNION "
+ + " SELECT ft.catalog_id FROM "
+ + FilesetMetaMapper.META_TABLE_NAME
+ + " ft WHERE ft.catalog_id = #{catalogId} AND"
+ + " ft.fileset_id = stat.metadata_object_id AND
stat.metadata_object_type = 'FILESET'"
+ + " UNION "
+ + " SELECT mt.catalog_id FROM "
+ + ModelMetaMapper.TABLE_NAME
+ + " mt WHERE mt.catalog_id = #{catalogId} AND"
+ + " mt.model_id = stat.metadata_object_id AND
stat.metadata_object_type = 'MODEL'"
+ + ")";
+ }
+
+ public String softDeleteStatisticsBySchemaId(@Param("schemaId") Long
schemaId) {
+ return "UPDATE "
+ + STATISTIC_META_TABLE_NAME
+ + " stat"
+ + softDeleteSQL()
+ + " WHERE stat.deleted_at = 0 AND EXISTS ("
+ + " SELECT st.schema_id FROM "
+ + SchemaMetaMapper.TABLE_NAME
+ + " st WHERE st.schema_id = #{schemaId} "
+ + " AND st.schema_id = stat.metadata_object_id AND
stat.metadata_object_type = 'SCHEMA'"
+ + " UNION "
+ + " SELECT tt.schema_id FROM "
+ + TopicMetaMapper.TABLE_NAME
+ + " tt WHERE tt.schema_id = #{schemaId} AND "
+ + " tt.topic_id = stat.metadata_object_id AND
stat.metadata_object_type = 'TOPIC'"
+ + " UNION "
+ + " SELECT tat.schema_id FROM "
+ + TableMetaMapper.TABLE_NAME
+ + " tat WHERE tat.schema_id = #{schemaId} AND "
+ + " tat.table_id = stat.metadata_object_id AND
stat.metadata_object_type = 'TABLE'"
+ + " UNION "
+ + " SELECT ft.schema_id FROM "
+ + FilesetMetaMapper.META_TABLE_NAME
+ + " ft WHERE ft.schema_id = #{schemaId} AND "
+ + " ft.fileset_id = stat.metadata_object_id AND
stat.metadata_object_type = 'FILESET'"
+ + " UNION "
+ + " SELECT mt.schema_id FROM "
+ + ModelMetaMapper.TABLE_NAME
+ + " mt WHERE mt.schema_id = #{schemaId} AND "
+ + " mt.model_id = stat.metadata_object_id AND
stat.metadata_object_type = 'MODEL'"
+ + ")";
+ }
+
+ public String deleteStatisticsByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + STATISTIC_META_TABLE_NAME
+ + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT
#{limit}";
+ }
+
+ protected String softDeleteSQL() {
+ return " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000 ";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
index 027f4fb720..3b25a7f469 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/SecurableObjectPostgreSQLProvider.java
@@ -44,9 +44,7 @@ public class SecurableObjectPostgreSQLProvider extends
SecurableObjectBaseSQLPro
+ " WHERE FALSE "
+ "<foreach collection='securableObjects' item='item' separator=' '>"
+ " OR (metadata_object_id = #{item.metadataObjectId} AND"
- + " role_id = #{item.roleId} AND deleted_at = 0 AND"
- + " privilege_names = #{item.privilegeNames} AND"
- + " privilege_conditions = #{item.privilegeConditions})"
+ + " role_id = #{item.roleId} AND deleted_at = 0)"
+ "</foreach>"
+ "</script>";
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/StatisticPostgresSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/StatisticPostgresSQLProvider.java
new file mode 100644
index 0000000000..f0e0e7d215
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/StatisticPostgresSQLProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import static
org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper.STATISTIC_META_TABLE_NAME;
+
+import java.util.List;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.StatisticBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
+
+public class StatisticPostgresSQLProvider extends StatisticBaseSQLProvider {
+ @Override
+ protected String softDeleteSQL() {
+ return " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000))) ";
+ }
+
+ @Override
+ public String batchInsertStatisticPOsOnDuplicateKeyUpdate(List<StatisticPO>
statisticPOs) {
+ return "<script>"
+ + "INSERT INTO "
+ + STATISTIC_META_TABLE_NAME
+ + " (statistic_id, statistic_name, statistic_value, metalake_id,
metadata_object_id,"
+ + " metadata_object_type, audit_info, current_version, last_version,
deleted_at) VALUES "
+ + "<foreach collection='statisticPOs' item='item' separator=','>"
+ + "(#{item.statisticId}, "
+ + "#{item.statisticName}, "
+ + "#{item.statisticValue}, "
+ + "#{item.metalakeId}, "
+ + "#{item.metadataObjectId}, "
+ + "#{item.metadataObjectType}, "
+ + "#{item.auditInfo}, "
+ + "#{item.currentVersion}, "
+ + "#{item.lastVersion}, "
+ + "#{item.deletedAt})"
+ + "</foreach>"
+ + " ON CONFLICT (statistic_name, metadata_object_id, deleted_at)"
+ + " DO UPDATE SET "
+ + " statistic_value = EXCLUDED.statistic_value,"
+ + " audit_info = EXCLUDED.audit_info,"
+ + " current_version = EXCLUDED.current_version,"
+ + " last_version = EXCLUDED.last_version,"
+ + " deleted_at = EXCLUDE.deleted_at"
+ + "</script>";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
b/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
new file mode 100644
index 0000000000..51dd1e0066
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.StatisticEntity;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.storage.relational.utils.POConverters;
+
+@Getter
+public class StatisticPO {
+ private Long metalakeId;
+ private Long statisticId;
+
+ private String statisticName;
+
+ private String statisticValue;
+ private Long metadataObjectId;
+
+ private String metadataObjectType;
+
+ private String auditInfo;
+
+ private Long currentVersion;
+ private Long lastVersion;
+ private Long deletedAt;
+
+ private StatisticPO() {}
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static StatisticEntity fromStatisticPO(StatisticPO statisticPO) {
+ try {
+ return StatisticEntity.builder()
+ .withId(statisticPO.getStatisticId())
+ .withName(statisticPO.getStatisticName())
+ .withValue(
+ JsonUtils.anyFieldMapper()
+ .readValue(statisticPO.getStatisticValue(),
StatisticValue.class))
+ .withAuditInfo(
+ JsonUtils.anyFieldMapper().readValue(statisticPO.getAuditInfo(),
AuditInfo.class))
+ .build();
+ } catch (JsonProcessingException je) {
+ throw new RuntimeException("Failed to deserialize json object: ", je);
+ }
+ }
+
+ public static List<StatisticPO> initializeStatisticPOs(
+ List<StatisticEntity> statisticEntities,
+ Long metalakeId,
+ Long objectId,
+ MetadataObject.Type objectType) {
+ return statisticEntities.stream()
+ .map(
+ statisticEntity -> {
+ try {
+ return builder()
+ .withMetalakeId(metalakeId)
+ .withMetadataObjectId(objectId)
+ .withMetadataObjectType(objectType.name())
+ .withStatisticId(statisticEntity.id())
+ .withStatisticName(statisticEntity.name())
+ .withStatisticValue(
+
JsonUtils.anyFieldMapper().writeValueAsString(statisticEntity.value()))
+ .withDeletedAt(POConverters.DEFAULT_DELETED_AT)
+ .withCurrentVersion(POConverters.INIT_VERSION)
+ .withLastVersion(POConverters.INIT_VERSION)
+ .withAuditInfo(
+
JsonUtils.anyFieldMapper().writeValueAsString(statisticEntity.auditInfo()))
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to serialize json object:",
e);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof StatisticPO)) {
+ return false;
+ }
+ StatisticPO that = (StatisticPO) o;
+ return statisticId.equals(that.statisticId)
+ && metadataObjectId.equals(that.metadataObjectId)
+ && metalakeId.equals(that.metalakeId)
+ && metadataObjectType.equals(that.metadataObjectType)
+ && statisticName.equals(that.statisticName)
+ && statisticValue.equals(that.statisticValue)
+ && auditInfo.equals(that.auditInfo)
+ && currentVersion.equals(that.currentVersion)
+ && lastVersion.equals(that.lastVersion)
+ && deletedAt.equals(that.deletedAt);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ metalakeId,
+ statisticId,
+ metadataObjectId,
+ metadataObjectType,
+ statisticName,
+ statisticValue,
+ auditInfo,
+ currentVersion,
+ lastVersion,
+ deletedAt);
+ }
+
+ public static class Builder {
+
+ private final StatisticPO statisticPO;
+
+ public Builder() {
+ this.statisticPO = new StatisticPO();
+ }
+
+ public Builder withMetalakeId(Long metalakeId) {
+ statisticPO.metalakeId = metalakeId;
+ return this;
+ }
+
+ public Builder withStatisticId(Long statisticId) {
+ statisticPO.statisticId = statisticId;
+ return this;
+ }
+
+ public Builder withMetadataObjectId(Long objectId) {
+ statisticPO.metadataObjectId = objectId;
+ return this;
+ }
+
+ public Builder withMetadataObjectType(String objectType) {
+ statisticPO.metadataObjectType = objectType;
+ return this;
+ }
+
+ public Builder withStatisticName(String statisticName) {
+ statisticPO.statisticName = statisticName;
+ return this;
+ }
+
+ public Builder withStatisticValue(String value) {
+ statisticPO.statisticValue = value;
+ return this;
+ }
+
+ public Builder withAuditInfo(String auditInfo) {
+ statisticPO.auditInfo = auditInfo;
+ return this;
+ }
+
+ public Builder withCurrentVersion(Long currentVersion) {
+ statisticPO.currentVersion = currentVersion;
+ return this;
+ }
+
+ public Builder withLastVersion(Long lastVersion) {
+ statisticPO.lastVersion = lastVersion;
+ return this;
+ }
+
+ public Builder withDeletedAt(Long deletedAt) {
+ statisticPO.deletedAt = deletedAt;
+ return this;
+ }
+
+ public StatisticPO build() {
+ Preconditions.checkArgument(statisticPO.metadataObjectId != null,
"`objectId is required");
+ Preconditions.checkArgument(
+ statisticPO.metadataObjectType != null, "`objectType` is required");
+ Preconditions.checkArgument(statisticPO.statisticId != null,
"`statisticId` is required");
+ Preconditions.checkArgument(statisticPO.statisticName != null,
"`statisticName` is required");
+ Preconditions.checkArgument(statisticPO.statisticValue != null, "`value`
is required");
+ Preconditions.checkArgument(statisticPO.auditInfo != null, "`auditInfo`
is required");
+ Preconditions.checkArgument(statisticPO.metalakeId != null,
"`metalakeId` is required");
+ Preconditions.checkArgument(statisticPO.deletedAt != null, "`deletedAt`
is required");
+ Preconditions.checkArgument(statisticPO.lastVersion != null,
"`lastVersion` is required");
+ Preconditions.checkArgument(
+ statisticPO.currentVersion != null, "`currentVersion` is required");
+ return statisticPO;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
index a297c47afb..e4d08f4130 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
@@ -44,6 +44,7 @@ import
org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
@@ -274,7 +275,11 @@ public class CatalogMetaService {
() ->
SessionUtils.doWithoutCommit(
ModelMetaMapper.class,
- mapper ->
mapper.softDeleteModelMetasByCatalogId(catalogId)));
+ mapper -> mapper.softDeleteModelMetasByCatalogId(catalogId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper ->
mapper.softDeleteStatisticsByCatalogId(catalogId)));
} else {
List<SchemaEntity> schemaEntities =
SchemaMetaService.getInstance()
@@ -307,6 +312,10 @@ public class CatalogMetaService {
mapper ->
mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
catalogId, MetadataObject.Type.CATALOG.name())),
+ () ->
+ SessionUtils.doWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper -> mapper.softDeleteStatisticsByEntityId(catalogId)),
() ->
SessionUtils.doWithoutCommit(
PolicyMetadataObjectRelMapper.class,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
index 669acbbe87..246414609b 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FilesetMetaService.java
@@ -35,6 +35,7 @@ import
org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.po.FilesetMaxVersionPO;
import org.apache.gravitino.storage.relational.po.FilesetPO;
@@ -262,6 +263,10 @@ public class FilesetMetaService {
mapper ->
mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
filesetId, MetadataObject.Type.FILESET.name())),
+ () ->
+ SessionUtils.doWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper -> mapper.softDeleteStatisticsByEntityId(filesetId)),
() ->
SessionUtils.doWithoutCommit(
PolicyMetadataObjectRelMapper.class,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
index 6c2b6b7c57..74fb01b1c6 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -48,6 +48,7 @@ import
org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper;
import org.apache.gravitino.storage.relational.mapper.RoleMetaMapper;
import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TagMetaMapper;
@@ -271,6 +272,10 @@ public class MetalakeMetaService {
SessionUtils.doWithoutCommit(
ModelMetaMapper.class,
mapper ->
mapper.softDeleteModelMetasByMetalakeId(metalakeId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper ->
mapper.softDeleteStatisticsByMetalakeId(metalakeId)),
() ->
SessionUtils.doWithoutCommit(
JobTemplateMetaMapper.class,
@@ -328,6 +333,10 @@ public class MetalakeMetaService {
SessionUtils.doWithoutCommit(
OwnerMetaMapper.class,
mapper ->
mapper.softDeleteOwnerRelByMetalakeId(metalakeId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper ->
mapper.softDeleteStatisticsByMetalakeId(metalakeId)),
() ->
SessionUtils.doWithoutCommit(
JobTemplateMetaMapper.class,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
index 5b98be0504..2a5db664c7 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
@@ -40,6 +40,7 @@ import
org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.po.ModelPO;
import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
@@ -158,6 +159,10 @@ public class ModelMetaService {
mapper ->
mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
modelId, MetadataObject.Type.MODEL.name())),
+ () ->
+ SessionUtils.doWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper -> mapper.softDeleteStatisticsByEntityId(modelId)),
() ->
SessionUtils.doWithoutCommit(
PolicyMetadataObjectRelMapper.class,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index e3f954239d..a793af4e5c 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -44,6 +44,7 @@ import
org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
@@ -257,7 +258,11 @@ public class SchemaMetaService {
() ->
SessionUtils.doWithoutCommit(
ModelMetaMapper.class,
- mapper ->
mapper.softDeleteModelMetasBySchemaId(schemaId)));
+ mapper -> mapper.softDeleteModelMetasBySchemaId(schemaId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper ->
mapper.softDeleteStatisticsBySchemaId(schemaId)));
} else {
List<TableEntity> tableEntities =
TableMetaService.getInstance()
@@ -316,6 +321,10 @@ public class SchemaMetaService {
mapper ->
mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
schemaId, MetadataObject.Type.SCHEMA.name())),
+ () ->
+ SessionUtils.doWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper -> mapper.softDeleteStatisticsByEntityId(schemaId)),
() ->
SessionUtils.doWithoutCommit(
PolicyMetadataObjectRelMapper.class,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
new file mode 100644
index 0000000000..8b92cc5a25
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.StatisticEntity;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/**
+ * The service class for statistic metadata. It provides the basic database
operations for
+ * statistic.
+ */
+public class StatisticMetaService {
+
+ private static final StatisticMetaService INSTANCE = new
StatisticMetaService();
+
+ public static StatisticMetaService getInstance() {
+ return INSTANCE;
+ }
+
+ private StatisticMetaService() {}
+
+ public List<StatisticEntity> listStatisticsByEntity(
+ NameIdentifier identifier, Entity.EntityType type) {
+ long metalakeId =
+ MetalakeMetaService.getInstance()
+ .getMetalakeIdByName(NameIdentifierUtil.getMetalake(identifier));
+ MetadataObject object = NameIdentifierUtil.toMetadataObject(identifier,
type);
+ long entityId =
+ MetadataObjectService.getMetadataObjectId(metalakeId,
object.fullName(), object.type());
+ List<StatisticPO> statisticPOs =
+ SessionUtils.getWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper -> mapper.listStatisticPOsByEntityId(metalakeId, entityId));
+ return
statisticPOs.stream().map(StatisticPO::fromStatisticPO).collect(Collectors.toList());
+ }
+
+ public void batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ List<StatisticEntity> statisticEntities,
+ String metalake,
+ NameIdentifier entity,
+ Entity.EntityType type) {
+ if (statisticEntities == null || statisticEntities.isEmpty()) {
+ return;
+ }
+ Long metalakeId =
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+ MetadataObject object = NameIdentifierUtil.toMetadataObject(entity, type);
+ Long entityId =
+ MetadataObjectService.getMetadataObjectId(metalakeId,
object.fullName(), object.type());
+
+ List<StatisticPO> pos =
+ StatisticPO.initializeStatisticPOs(statisticEntities, metalakeId,
entityId, object.type());
+ SessionUtils.doWithCommit(
+ StatisticMetaMapper.class,
+ mapper -> mapper.batchInsertStatisticPOsOnDuplicateKeyUpdate(pos));
+ }
+
+ public int batchDeleteStatisticPOs(
+ NameIdentifier identifier, Entity.EntityType type, List<String>
statisticNames) {
+ if (statisticNames == null || statisticNames.isEmpty()) {
+ return 0;
+ }
+ Long metalakeId =
+ MetalakeMetaService.getInstance()
+ .getMetalakeIdByName(NameIdentifierUtil.getMetalake(identifier));
+ MetadataObject object = NameIdentifierUtil.toMetadataObject(identifier,
type);
+ Long entityId =
+ MetadataObjectService.getMetadataObjectId(metalakeId,
object.fullName(), object.type());
+
+ return SessionUtils.doWithCommitAndFetchResult(
+ StatisticMetaMapper.class,
+ mapper -> mapper.batchDeleteStatisticPOs(entityId, statisticNames));
+ }
+
+ public int deleteStatisticsByLegacyTimeline(long legacyTimeline, int limit) {
+ return SessionUtils.doWithCommitAndFetchResult(
+ StatisticMetaMapper.class,
+ mapper -> mapper.deleteStatisticsByLegacyTimeline(legacyTimeline,
limit));
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index d4a93c1a9b..4d2a979342 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -35,6 +35,7 @@ import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.po.ColumnPO;
@@ -240,6 +241,10 @@ public class TableMetaService {
SessionUtils.doWithoutCommit(
TagMetadataObjectRelMapper.class,
mapper ->
mapper.softDeleteTagMetadataObjectRelsByTableId(tableId));
+
+ SessionUtils.doWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper -> mapper.softDeleteStatisticsByEntityId(tableId));
SessionUtils.doWithoutCommit(
PolicyMetadataObjectRelMapper.class,
mapper ->
mapper.softDeletePolicyMetadataObjectRelsByTableId(tableId));
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
index 8f3e7eedd5..bd7f46763a 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TopicMetaService.java
@@ -33,6 +33,7 @@ import org.apache.gravitino.meta.TopicEntity;
import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.PolicyMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
+import org.apache.gravitino.storage.relational.mapper.StatisticMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.TagMetadataObjectRelMapper;
import org.apache.gravitino.storage.relational.mapper.TopicMetaMapper;
import org.apache.gravitino.storage.relational.po.TopicPO;
@@ -205,6 +206,10 @@ public class TopicMetaService {
mapper ->
mapper.softDeleteTagMetadataObjectRelsByMetadataObject(
topicId, MetadataObject.Type.TOPIC.name())),
+ () ->
+ SessionUtils.doWithoutCommit(
+ StatisticMetaMapper.class,
+ mapper -> mapper.softDeleteStatisticsByEntityId(topicId)),
() ->
SessionUtils.doWithoutCommit(
PolicyMetadataObjectRelMapper.class,
diff --git
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index 10d7ec4f13..c1fd511807 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -547,4 +547,17 @@ public class NameIdentifierUtil {
public static NameIdentifier ofGroup(String metalake, String groupName) {
return NameIdentifier.of(NamespaceUtil.ofGroup(metalake), groupName);
}
+
+ /**
+ * Create a statistic {@link NameIdentifier} from the given identifier and
name. The statistic
+ * belongs to the given identifier. For example, if the identifier is a
table identifier, the
+ * statistic will be created for that table.
+ *
+ * @param entityIdent The identifier to use.
+ * @param name The name of the statistic
+ * @return The created statistic of {@link NameIdentifier}
+ */
+ public static NameIdentifier ofStatistic(NameIdentifier entityIdent, String
name) {
+ return NameIdentifier.of(Namespace.fromString(entityIdent.toString()),
name);
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index 56337a18aa..4b00d377a1 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -47,6 +47,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
@@ -1387,6 +1388,70 @@ public class TestJDBCBackend {
}
}
+ protected Integer countAllStats(Long metalakeId) {
+ try (SqlSession sqlSession =
+
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+ Connection connection = sqlSession.getConnection();
+ Statement statement1 = connection.createStatement();
+ ResultSet rs1 =
+ statement1.executeQuery(
+ String.format(
+ "SELECT count(*) FROM statistic_meta WHERE metalake_id =
%d", metalakeId))) {
+ if (rs1.next()) {
+ return rs1.getInt(1);
+ } else {
+ throw new RuntimeException("Doesn't contain data");
+ }
+ } catch (SQLException se) {
+ throw new RuntimeException("SQL execution failed", se);
+ }
+ }
+
+ protected Integer countActiveStats(Long metalakeId) {
+ try (SqlSession sqlSession =
+
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+ Connection connection = sqlSession.getConnection();
+ Statement statement1 = connection.createStatement();
+ ResultSet rs1 =
+ statement1.executeQuery(
+ String.format(
+ "SELECT count(*) FROM statistic_meta WHERE metalake_id =
%d AND deleted_at = 0",
+ metalakeId))) {
+ if (rs1.next()) {
+ return rs1.getInt(1);
+ } else {
+ throw new RuntimeException("Doesn't contain data");
+ }
+ } catch (SQLException se) {
+ throw new RuntimeException("SQL execution failed", se);
+ }
+ }
+
+ protected Integer testStats(Long metalakeId) {
+ try (SqlSession sqlSession =
+
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+ Connection connection = sqlSession.getConnection();
+ Statement statement1 = connection.createStatement();
+ ResultSet rs1 =
+ statement1.executeQuery(
+ String.format(
+ "SELECT * FROM statistic_meta WHERE metalake_id = %d AND
deleted_at = 0",
+ metalakeId))) {
+ ResultSetMetaData metaData = rs1.getMetaData();
+ int columnCount = metaData.getColumnCount();
+ while (rs1.next()) {
+ for (int index = 0; index < columnCount; index++) {
+ String columnName = metaData.getColumnName(index + 1);
+ Object value = rs1.getObject(index + 1);
+ System.out.printf("Column: %s, Value: %s%n", columnName, value);
+ }
+ }
+ } catch (SQLException se) {
+ throw new RuntimeException("SQL execution failed", se);
+ }
+ return 1;
+ }
+
protected Integer countAllTagRel(Long tagId) {
try (SqlSession sqlSession =
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
new file mode 100644
index 0000000000..a45705377d
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.collect.Lists;
+import java.time.Instant;
+import java.util.List;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.StatisticEntity;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TopicEntity;
+import org.apache.gravitino.stats.StatisticValues;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestStatisticMetaService extends TestJDBCBackend {
+ StatisticMetaService statisticMetaService =
StatisticMetaService.getInstance();
+
+ @Test
+ public void testStatisticsLifeCycle() throws Exception {
+ String metalakeName = "metalake";
+ AuditInfo auditInfo =
+
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName,
auditInfo);
+ backend.insert(metalake, false);
+
+ CatalogEntity catalog =
+ createCatalog(
+ RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"),
"catalog", auditInfo);
+ backend.insert(catalog, false);
+
+ SchemaEntity schema =
+ createSchemaEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog"),
+ "schema",
+ auditInfo);
+ backend.insert(schema, false);
+
+ TableEntity table =
+ createTableEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "table",
+ auditInfo);
+ backend.insert(table, false);
+
+ List<StatisticEntity> statisticEntities = Lists.newArrayList();
+ StatisticEntity statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, table.nameIdentifier(),
Entity.EntityType.TABLE);
+
+ List<StatisticEntity> listEntities =
+ statisticMetaService.listStatisticsByEntity(
+ table.nameIdentifier(), Entity.EntityType.TABLE);
+ Assertions.assertEquals(1, listEntities.size());
+ Assertions.assertEquals("test", listEntities.get(0).name());
+ Assertions.assertEquals(100L, listEntities.get(0).value().value());
+
+ // Update the duplicated key
+ statisticEntity = createStatisticEntity(auditInfo, 200L);
+ statisticEntities.clear();
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, table.nameIdentifier(),
Entity.EntityType.TABLE);
+
+ listEntities =
+ statisticMetaService.listStatisticsByEntity(
+ table.nameIdentifier(), Entity.EntityType.TABLE);
+ Assertions.assertEquals(1, listEntities.size());
+ Assertions.assertEquals("test", listEntities.get(0).name());
+ Assertions.assertEquals(200L, listEntities.get(0).value().value());
+
+ List<String> names = Lists.newArrayList(statisticEntity.name());
+ statisticMetaService.batchDeleteStatisticPOs(table.nameIdentifier(),
table.type(), names);
+ listEntities =
+ statisticMetaService.listStatisticsByEntity(table.nameIdentifier(),
table.type());
+ Assertions.assertEquals(0, listEntities.size());
+ }
+
+ @Test
+ public void testDeleteMetadataObject() throws Exception {
+ String metalakeName = "metalake";
+ AuditInfo auditInfo =
+
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName,
auditInfo);
+ backend.insert(metalake, false);
+
+ CatalogEntity catalog =
+ createCatalog(
+ RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"),
"catalog", auditInfo);
+ backend.insert(catalog, false);
+
+ SchemaEntity schema =
+ createSchemaEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog"),
+ "schema",
+ auditInfo);
+ backend.insert(schema, false);
+
+ FilesetEntity fileset =
+ createFilesetEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "fileset",
+ auditInfo);
+ backend.insert(fileset, false);
+ TableEntity table =
+ createTableEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "table",
+ auditInfo);
+ backend.insert(table, false);
+ TopicEntity topic =
+ createTopicEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "topic",
+ auditInfo);
+ backend.insert(topic, false);
+ ModelEntity model =
+ createModelEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "model",
+ "comment",
+ 1,
+ null,
+ auditInfo);
+ backend.insert(model, false);
+
+ // insert stats
+ List<StatisticEntity> statisticEntities = Lists.newArrayList();
+ StatisticEntity statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, table.nameIdentifier(),
Entity.EntityType.TABLE);
+
+ statisticEntities.clear();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, topic.nameIdentifier(),
Entity.EntityType.TOPIC);
+
+ statisticEntities.clear();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, fileset.nameIdentifier(),
Entity.EntityType.FILESET);
+
+ statisticEntities.clear();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, model.nameIdentifier(),
Entity.EntityType.MODEL);
+
+ // assert stats
+ Assertions.assertEquals(4, countActiveStats(metalake.id()));
+ Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+ // Test to delete model
+ ModelMetaService.getInstance().deleteModel(model.nameIdentifier());
+
+ // assert stats
+ Assertions.assertEquals(3, countActiveStats(metalake.id()));
+ Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+ // Test to delete table
+ TableMetaService.getInstance().deleteTable(table.nameIdentifier());
+ // assert stats
+ Assertions.assertEquals(2, countActiveStats(metalake.id()));
+ Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+ // Test to delete topic
+ TopicMetaService.getInstance().deleteTopic(topic.nameIdentifier());
+ // assert stats
+ Assertions.assertEquals(1, countActiveStats(metalake.id()));
+ Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+ // Test to delete fileset
+ FilesetMetaService.getInstance().deleteFileset(fileset.nameIdentifier());
+ // assert stats
+ Assertions.assertEquals(0, countActiveStats(metalake.id()));
+ Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+ // Test to delete schema
+ SchemaMetaService.getInstance().deleteSchema(schema.nameIdentifier(),
false);
+ // assert stats
+ Assertions.assertEquals(0, countActiveStats(metalake.id()));
+ Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+ // Test to delete catalog
+ CatalogMetaService.getInstance().deleteCatalog(catalog.nameIdentifier(),
false);
+ // assert stats
+ Assertions.assertEquals(0, countActiveStats(metalake.id()));
+ Assertions.assertEquals(4, countAllStats(metalake.id()));
+
+ // Test to delete catalog with cascade mode
+ catalog =
+ createCatalog(
+ RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"),
"catalog", auditInfo);
+ backend.insert(catalog, false);
+
+ schema =
+ createSchemaEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog"),
+ "schema",
+ auditInfo);
+ backend.insert(schema, false);
+
+ fileset =
+ createFilesetEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "fileset",
+ auditInfo);
+ backend.insert(fileset, false);
+ table =
+ createTableEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "table",
+ auditInfo);
+ backend.insert(table, false);
+
+ topic =
+ createTopicEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "topic",
+ auditInfo);
+ backend.insert(topic, false);
+
+ model =
+ createModelEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "model",
+ "comment",
+ 1,
+ null,
+ auditInfo);
+ backend.insert(model, false);
+ // insert stats
+ statisticEntities.clear();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, table.nameIdentifier(),
Entity.EntityType.TABLE);
+
+ statisticEntities.clear();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, topic.nameIdentifier(),
Entity.EntityType.TOPIC);
+
+ statisticEntities.clear();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, fileset.nameIdentifier(),
Entity.EntityType.FILESET);
+
+ statisticEntities.clear();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, model.nameIdentifier(),
Entity.EntityType.MODEL);
+
+ // assert stats
+ Assertions.assertEquals(4, countActiveStats(metalake.id()));
+ Assertions.assertEquals(8, countAllStats(metalake.id()));
+
+ CatalogMetaService.getInstance().deleteCatalog(catalog.nameIdentifier(),
true);
+
+ // assert stats
+ Assertions.assertEquals(0, countActiveStats(metalake.id()));
+ Assertions.assertEquals(8, countAllStats(metalake.id()));
+
+ // Test to delete schema with cascade mode
+ catalog =
+ createCatalog(
+ RandomIdGenerator.INSTANCE.nextId(), Namespace.of("metalake"),
"catalog", auditInfo);
+ backend.insert(catalog, false);
+
+ schema =
+ createSchemaEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog"),
+ "schema",
+ auditInfo);
+ backend.insert(schema, false);
+
+ fileset =
+ createFilesetEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "fileset",
+ auditInfo);
+ backend.insert(fileset, false);
+ table =
+ createTableEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "table",
+ auditInfo);
+ backend.insert(table, false);
+ topic =
+ createTopicEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "topic",
+ auditInfo);
+ backend.insert(topic, false);
+ model =
+ createModelEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ Namespace.of("metalake", "catalog", "schema"),
+ "model",
+ "comment",
+ 1,
+ null,
+ auditInfo);
+ backend.insert(model, false);
+
+ // insert stats
+ statisticEntities = Lists.newArrayList();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, table.nameIdentifier(),
Entity.EntityType.TABLE);
+
+ statisticEntities.clear();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, topic.nameIdentifier(),
Entity.EntityType.TOPIC);
+
+ statisticEntities.clear();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, fileset.nameIdentifier(),
Entity.EntityType.FILESET);
+
+ statisticEntities.clear();
+ statisticEntity = createStatisticEntity(auditInfo, 100L);
+ statisticEntities.add(statisticEntity);
+ statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
+ statisticEntities, metalakeName, model.nameIdentifier(),
Entity.EntityType.MODEL);
+
+ // assert stats count
+ Assertions.assertEquals(4, countActiveStats(metalake.id()));
+ Assertions.assertEquals(12, countAllStats(metalake.id()));
+
+ // delete object
+ SchemaMetaService.getInstance().deleteSchema(schema.nameIdentifier(),
true);
+
+ // assert stats count
+ Assertions.assertEquals(0, countActiveStats(metalake.id()));
+ Assertions.assertEquals(12, countAllStats(metalake.id()));
+ }
+
+ private static StatisticEntity createStatisticEntity(AuditInfo auditInfo,
long value) {
+ return StatisticEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("test")
+ .withValue(StatisticValues.longValue(value))
+ .withAuditInfo(auditInfo)
+ .build();
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
index 2397ca99e5..908096236e 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
@@ -57,6 +57,7 @@ import org.apache.gravitino.meta.ModelVersionEntity;
import org.apache.gravitino.meta.PolicyEntity;
import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.meta.SchemaVersion;
+import org.apache.gravitino.meta.StatisticEntity;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.meta.TagEntity;
import org.apache.gravitino.meta.TopicEntity;
@@ -66,6 +67,7 @@ import org.apache.gravitino.rel.expressions.Expression;
import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.types.Type;
import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.StatisticValues;
import org.apache.gravitino.storage.relational.po.CatalogPO;
import org.apache.gravitino.storage.relational.po.ColumnPO;
import org.apache.gravitino.storage.relational.po.FilesetPO;
@@ -78,6 +80,7 @@ import org.apache.gravitino.storage.relational.po.OwnerRelPO;
import org.apache.gravitino.storage.relational.po.PolicyPO;
import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
import org.apache.gravitino.storage.relational.po.SchemaPO;
+import org.apache.gravitino.storage.relational.po.StatisticPO;
import org.apache.gravitino.storage.relational.po.TablePO;
import org.apache.gravitino.storage.relational.po.TagMetadataObjectRelPO;
import org.apache.gravitino.storage.relational.po.TagPO;
@@ -1211,6 +1214,56 @@ public class TestPOConverters {
assertEquals(expectedModelVersionWithNull, convertedModelVersionWithNull);
}
+ @Test
+ public void testStatisticPO() throws JsonProcessingException {
+ List<StatisticEntity> statisticEntities = Lists.newArrayList();
+ statisticEntities.add(
+ StatisticEntity.builder()
+ .withId(1L)
+ .withName("test_statistic")
+ .withNamespace(
+ NameIdentifierUtil.ofStatistic(
+ NameIdentifierUtil.ofTable("test", "test", "test",
"test"), "test")
+ .namespace())
+ .withValue(StatisticValues.stringValue("test"))
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build())
+ .build());
+
+ List<StatisticPO> statisticPOs =
+ StatisticPO.initializeStatisticPOs(statisticEntities, 1L, 1L,
MetadataObject.Type.CATALOG);
+
+ assertEquals(1, statisticPOs.get(0).getCurrentVersion());
+ assertEquals(1, statisticPOs.get(0).getLastVersion());
+ assertEquals(0, statisticPOs.get(0).getDeletedAt());
+ assertEquals("\"test\"", statisticPOs.get(0).getStatisticValue());
+ assertEquals("test_statistic", statisticPOs.get(0).getStatisticName());
+
+ StatisticPO statisticPO =
+ StatisticPO.builder()
+ .withStatisticId(1L)
+ .withLastVersion(1L)
+ .withCurrentVersion(1L)
+ .withStatisticName("test")
+ .withStatisticValue("\"test\"")
+ .withMetadataObjectId(1L)
+ .withMetadataObjectType("CATALOG")
+ .withDeletedAt(0L)
+ .withMetalakeId(1L)
+ .withAuditInfo(
+ JsonUtils.anyFieldMapper()
+ .writeValueAsString(
+ AuditInfo.builder()
+ .withCreator("creator")
+ .withCreateTime(FIX_INSTANT)
+ .build()))
+ .build();
+ StatisticEntity entity = StatisticPO.fromStatisticPO(statisticPO);
+ Assertions.assertEquals(1L, entity.id());
+ Assertions.assertEquals("test", entity.name());
+ Assertions.assertEquals("test", entity.value().value());
+ }
+
private static BaseMetalake createMetalake(Long id, String name, String
comment) {
AuditInfo auditInfo =
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
diff --git a/scripts/h2/schema-1.0.0-h2.sql b/scripts/h2/schema-1.0.0-h2.sql
index a245174735..d159e5de8d 100644
--- a/scripts/h2/schema-1.0.0-h2.sql
+++ b/scripts/h2/schema-1.0.0-h2.sql
@@ -384,6 +384,24 @@ CREATE TABLE IF NOT EXISTS `policy_relation_meta` (
KEY `idx_prmid` (`metadata_object_id`)
) ENGINE=InnoDB;
+CREATE TABLE IF NOT EXISTS `statistic_meta` (
+ `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment
id',
+ `statistic_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'statistic id',
+ `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value',
+ `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object
id',
+ `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'statistic audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic
current version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'statistic
deleted at',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_si_mi_mo_del` (`statistic_name`, `metadata_object_id`,
`deleted_at`),
+ KEY `idx_stid` (`statistic_id`),
+ KEY `idx_moid` (`metadata_object_id`)
+) ENGINE=InnoDB;
+
CREATE TABLE IF NOT EXISTS `job_template_meta` (
`job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
`job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
diff --git a/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
b/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
index 7ceca25db4..071cde1146 100644
--- a/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
+++ b/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
@@ -69,6 +69,24 @@ ALTER TABLE `model_version_info` ADD CONSTRAINT
`uk_mid_ver_uri_del` UNIQUE (`mo
-- remove the default value for model_version_uri_name
ALTER TABLE `model_version_info` ALTER COLUMN `model_version_uri_name` DROP
DEFAULT;
+CREATE TABLE IF NOT EXISTS `statistic_meta` (
+ `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment
id',
+ `statistic_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'statistic id',
+ `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value',
+ `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object
id',
+ `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'statistic audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic
current version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'statistic
deleted at',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_si_mi_mo_del` (`statistic_name`, `metadata_object_id`,
`deleted_at`),
+ KEY `idx_stid` (`statistic_id`),
+ KEY `idx_moid` (`metadata_object_id`)
+) ENGINE=InnoDB;
+
CREATE TABLE IF NOT EXISTS `job_template_meta` (
`job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
`job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
diff --git a/scripts/mysql/schema-1.0.0-mysql.sql
b/scripts/mysql/schema-1.0.0-mysql.sql
index 07c14ad304..06fa8c33b7 100644
--- a/scripts/mysql/schema-1.0.0-mysql.sql
+++ b/scripts/mysql/schema-1.0.0-mysql.sql
@@ -375,6 +375,24 @@ CREATE TABLE IF NOT EXISTS `policy_relation_meta` (
KEY `idx_mid` (`metadata_object_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'policy
metadata object relation';
+CREATE TABLE IF NOT EXISTS `statistic_meta` (
+ `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment
id',
+ `statistic_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'statistic id',
+ `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value',
+ `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object
id',
+ `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'statistic audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic
current version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'statistic
deleted at',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_si_mi_mo_del` (`statistic_name`, `metadata_object_id`,
`deleted_at`),
+ KEY `idx_stid` (`statistic_id`),
+ KEY `idx_moid` (`metadata_object_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'statistic
metadata';
+
CREATE TABLE IF NOT EXISTS `job_template_meta` (
`job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
`job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
diff --git a/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
b/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
index c0890b5aa0..c44c94a41d 100644
--- a/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
+++ b/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
@@ -69,6 +69,24 @@ ALTER TABLE `model_version_info` ADD CONSTRAINT
`uk_mid_ver_uri_del` UNIQUE KEY
-- remove the default value for model_version_uri_name
ALTER TABLE `model_version_info` ALTER COLUMN `model_version_uri_name` DROP
DEFAULT;
+CREATE TABLE IF NOT EXISTS `statistic_meta` (
+ `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment
id',
+ `statistic_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'statistic id',
+ `statistic_name` VARCHAR(128) NOT NULL COMMENT 'statistic name',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `statistic_value` MEDIUMTEXT NOT NULL COMMENT 'statistic value',
+ `metadata_object_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metadata object
id',
+ `metadata_object_type` VARCHAR(64) NOT NULL COMMENT 'metadata object type',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'statistic audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic
current version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'statistic last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'statistic
deleted at',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uk_si_mi_mo_del` (`statistic_name`, `metadata_object_id`,
`deleted_at`),
+ KEY `idx_stid` (`statistic_id`),
+ KEY `idx_moid` (`metadata_object_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'statistic
metadata';
+
CREATE TABLE IF NOT EXISTS `job_template_meta` (
`job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
`job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
diff --git a/scripts/postgresql/schema-1.0.0-postgresql.sql
b/scripts/postgresql/schema-1.0.0-postgresql.sql
index d97ace9844..a815248673 100644
--- a/scripts/postgresql/schema-1.0.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.0.0-postgresql.sql
@@ -669,6 +669,36 @@ COMMENT ON COLUMN policy_relation_meta.current_version IS
'policy relation curre
COMMENT ON COLUMN policy_relation_meta.last_version IS 'policy relation last
version';
COMMENT ON COLUMN policy_relation_meta.deleted_at IS 'policy relation deleted
at';
+CREATE TABLE IF NOT EXISTS statistic_meta (
+ id BIGINT NOT NULL,
+ statistic_id BIGINT NOT NULL,
+ statistic_name VARCHAR(128) NOT NULL,
+ metalake_id BIGINT NOT NULL,
+ statistic_value TEXT NOT NULL,
+ metadata_object_id BIGINT NOT NULL,
+ metadata_object_type VARCHAR(64) NOT NULL,
+ audit_info TEXT NOT NULL,
+ current_version INT NOT NULL DEFAULT 1,
+ last_version INT NOT NULL DEFAULT 1,
+ deleted_at BIGINT NOT NULL DEFAULT 0,
+ PRIMARY KEY (statistic_id),
+ UNIQUE (statistic_name, metadata_object_id, deleted_at)
+);
+
+CREATE INDEX IF NOT EXISTS idx_stid ON statistic_meta (statistic_id);
+CREATE INDEX IF NOT EXISTS idx_moid ON statistic_meta (metadata_object_id);
+COMMENT ON TABLE statistic_meta IS 'statistic metadata';
+COMMENT ON COLUMN statistic_meta.id IS 'auto increment id';
+COMMENT ON COLUMN statistic_meta.statistic_id IS 'statistic id';
+COMMENT ON COLUMN statistic_meta.statistic_name IS 'statistic name';
+COMMENT ON COLUMN statistic_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN statistic_meta.statistic_value IS 'statistic value';
+COMMENT ON COLUMN statistic_meta.metadata_object_id IS 'metadata object id';
+COMMENT ON COLUMN statistic_meta.metadata_object_type IS 'metadata object
type';
+COMMENT ON COLUMN statistic_meta.audit_info IS 'statistic audit info';
+COMMENT ON COLUMN statistic_meta.current_version IS 'statistic current
version';
+COMMENT ON COLUMN statistic_meta.last_version IS 'statistic last version';
+COMMENT ON COLUMN statistic_meta.deleted_at IS 'statistic deleted at';
CREATE TABLE IF NOT EXISTS job_template_meta (
job_template_id BIGINT NOT NULL,
diff --git a/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
b/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
index a95ad69517..22c07c15a0 100644
--- a/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
@@ -105,6 +105,32 @@ ALTER TABLE model_version_info ADD CONSTRAINT
uk_mid_ver_uri_del UNIQUE (model_i
-- remove the default value for model_version_uri_name
ALTER TABLE model_version_info ALTER COLUMN model_version_uri_name DROP
DEFAULT;
+CREATE TABLE IF NOT EXISTS statistic_meta (
+ id BIGINT NOT NULL,
+ statistic_id BIGINT NOT NULL,
+ statistic_name VARCHAR(128) NOT NULL,
+ metalake_id BIGINT NOT NULL,
+ statistic_value TEXT NOT NULL,
+ metadata_object_id BIGINT NOT NULL,
+ metadata_object_type VARCHAR(64) NOT NULL,
+ PRIMARY KEY (statistic_id),
+ UNIQUE (statistic_name, metadata_object_id, deleted_at)
+);
+
+CREATE INDEX IF NOT EXISTS idx_stid ON statistic_meta (statistic_id);
+CREATE INDEX IF NOT EXISTS idx_moid ON statistic_meta (metadata_object_id);
+COMMENT ON TABLE statistic_meta IS 'statistic metadata';
+COMMENT ON COLUMN statistic_meta.id IS 'auto increment id';
+COMMENT ON COLUMN statistic_meta.statistic_id IS 'statistic id';
+COMMENT ON COLUMN statistic_meta.statistic_name IS 'statistic name';
+COMMENT ON COLUMN statistic_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN statistic_meta.statistic_value IS 'statistic value';
+COMMENT ON COLUMN statistic_meta.metadata_object_id IS 'metadata object id';
+COMMENT ON COLUMN statistic_meta.metadata_object_type IS 'metadata object
type';
+COMMENT ON COLUMN statistic_meta.audit_info IS 'statistic audit info';
+COMMENT ON COLUMN statistic_meta.current_version IS 'statistic current
version';
+COMMENT ON COLUMN statistic_meta.last_version IS 'statistic last version';
+COMMENT ON COLUMN statistic_meta.deleted_at IS 'statistic deleted at';
CREATE TABLE IF NOT EXISTS job_template_meta (
job_template_id BIGINT NOT NULL,