This is an automated email from the ASF dual-hosted git repository.

wankai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 71c469c9ce Add `LatestLabeledFunction` for meter. (#13624)
71c469c9ce is described below

commit 71c469c9ce42a7290ea10e597952c78f169022c7
Author: Wan Kai <[email protected]>
AuthorDate: Wed Dec 17 10:34:43 2025 +0800

    Add `LatestLabeledFunction` for meter. (#13624)
---
 docs/en/changes/changes.md                         |   1 +
 .../function/latest/LatestLabeledFunction.java     | 189 +++++++++++++++++++++
 .../function/latest/LatestLabeledFunctionTest.java | 154 +++++++++++++++++
 3 files changed, 344 insertions(+)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index feba6cc0f5..9eac35f638 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -11,6 +11,7 @@
 * Init `log-mal-rules` at module provider start stage to avoid re-init for 
every LAL.
 * Fail fast if SampleFamily is empty after MAL filter expression.
 * Fix range matrix and scalar binary operation in PromQL.
+* Add `LatestLabeledFunction` for meter.
 
 #### UI
 * Fix the missing icon in new native trace view.
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunction.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunction.java
new file mode 100644
index 0000000000..039d4d90a5
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunction.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function.latest;
+
+import java.util.Objects;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import 
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.meter.Meter;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import 
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import 
org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import 
org.apache.skywalking.oap.server.core.analysis.metrics.LabeledValueHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageID;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+@MeterFunction(functionName = "latestLabeled")
+@ToString
+public abstract class LatestLabeledFunction extends Meter implements 
AcceptableValue<DataTable>, LabeledValueHolder {
+
+    public static final String VALUE = "datatable_value";
+
+    @Setter
+    @Getter
+    @ElasticSearch.EnableDocValues
+    @Column(name = ENTITY_ID, length = 512)
+    @BanyanDB.SeriesID(index = 0)
+    private String entityId;
+
+    /**
+     * Service ID is required for sort query.
+     */
+    @Setter
+    @Getter
+    @Column(name = InstanceTraffic.SERVICE_ID)
+    private String serviceId;
+
+    @Getter
+    @Setter
+    @Column(name = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, 
storageOnly = true)
+    @BanyanDB.MeasureField
+    private DataTable value = new DataTable(30);
+
+    @Override
+    public void accept(final MeterEntity entity, final DataTable value) {
+        setEntityId(entity.id());
+        setServiceId(entity.serviceId());
+        this.value = value;
+    }
+
+    @Override
+    public final boolean combine(Metrics metrics) {
+        final LatestLabeledFunction latestLabeledFunction = 
(LatestLabeledFunction) metrics;
+        this.value = latestLabeledFunction.getValue();
+        return true;
+    }
+
+    @Override
+    public final void calculate() {
+
+    }
+
+    @Override
+    public Metrics toHour() {
+        LatestLabeledFunction metrics = (LatestLabeledFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInHour());
+        metrics.setServiceId(getServiceId());
+        metrics.getValue().copyFrom(getValue());
+        return metrics;
+    }
+
+    @Override
+    public Metrics toDay() {
+        LatestLabeledFunction metrics = (LatestLabeledFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInDay());
+        metrics.setServiceId(getServiceId());
+        metrics.getValue().copyFrom(getValue());
+        return metrics;
+    }
+
+    @Override
+    protected StorageID id0() {
+        return new StorageID()
+            .append(TIME_BUCKET, getTimeBucket())
+            .append(ENTITY_ID, getEntityId());
+    }
+
+    @Override
+    public void deserialize(final RemoteData remoteData) {
+        setValue(new DataTable(remoteData.getDataObjectStrings(0)));
+        setTimeBucket(remoteData.getDataLongs(0));
+
+        setEntityId(remoteData.getDataStrings(0));
+        setServiceId(remoteData.getDataStrings(1));
+    }
+
+    @Override
+    public RemoteData.Builder serialize() {
+        final RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+        remoteBuilder.addDataObjectStrings(value.toStorageData());
+        remoteBuilder.addDataLongs(getTimeBucket());
+
+        remoteBuilder.addDataStrings(entityId);
+        remoteBuilder.addDataStrings(serviceId);
+
+        return remoteBuilder;
+    }
+
+    @Override
+    public int remoteHashCode() {
+        return entityId.hashCode();
+    }
+
+    @Override
+    public Class<? extends LatestLabeledStorageBuilder> builder() {
+        return LatestLabeledStorageBuilder.class;
+    }
+
+    public static class LatestLabeledStorageBuilder implements 
StorageBuilder<LatestLabeledFunction> {
+        @Override
+        public LatestLabeledFunction storage2Entity(final Convert2Entity 
converter) {
+            LatestLabeledFunction metrics = new LatestLabeledFunction() {
+                @Override
+                public AcceptableValue<DataTable> createNew() {
+                    throw new UnexpectedException("createNew should not be 
called");
+                }
+            };
+            metrics.setValue(new DataTable((String) converter.get(VALUE)));
+            metrics.setTimeBucket(((Number) 
converter.get(TIME_BUCKET)).longValue());
+            metrics.setServiceId((String) 
converter.get(InstanceTraffic.SERVICE_ID));
+            metrics.setEntityId((String) converter.get(ENTITY_ID));
+            return metrics;
+        }
+
+        @Override
+        public void entity2Storage(final LatestLabeledFunction storageData, 
final Convert2Storage converter) {
+            converter.accept(VALUE, storageData.getValue());
+            converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+            converter.accept(InstanceTraffic.SERVICE_ID, 
storageData.getServiceId());
+            converter.accept(ENTITY_ID, storageData.getEntityId());
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof LatestLabeledFunction)) {
+            return false;
+        }
+        LatestLabeledFunction function = (LatestLabeledFunction) o;
+        return Objects.equals(entityId, function.entityId) &&
+            getTimeBucket() == function.getTimeBucket();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(entityId, getTimeBucket());
+    }
+}
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunctionTest.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunctionTest.java
new file mode 100644
index 0000000000..7492868930
--- /dev/null
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunctionTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function.latest;
+
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import 
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
+import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
+import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+@ExtendWith(MockitoExtension.class)
+public class LatestLabeledFunctionTest {
+
+    private static final DataTable HTTP_CODE_COUNT_1 = new 
DataTable("200,2|301,2|404,3|502,4");
+
+    private static final DataTable HTTP_CODE_COUNT_2 = new 
DataTable("200,1|301,4|404,5|502,1|505,1");
+
+    private static final DataTable HTTP_CODE_COUNT_3 = new 
DataTable("200,2|301,4|404,5|502,4|505,1");
+
+    private LatestLabeledFunction function;
+
+    @BeforeAll
+    public static void setup() {
+        MeterEntity.setNamingControl(
+            new NamingControl(512, 512, 512, new EndpointNameGrouping()));
+    }
+
+    @BeforeEach
+    public void before() {
+        function = new LatestLabeledFunctionInst();
+        
function.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+    }
+
+    @AfterAll
+    public static void tearDown() {
+        MeterEntity.setNamingControl(null);
+    }
+
+    @Test
+    public void testAccept() {
+        function.accept(MeterEntity.newService("service-test", Layer.GENERAL), 
HTTP_CODE_COUNT_1);
+        assertThat(function.getValue()).isEqualTo(HTTP_CODE_COUNT_1);
+
+        function.accept(MeterEntity.newService("service-test", Layer.GENERAL), 
HTTP_CODE_COUNT_2);
+        function.accept(MeterEntity.newService("service-test", Layer.GENERAL), 
HTTP_CODE_COUNT_3);
+        assertThat(function.getValue()).isEqualTo(HTTP_CODE_COUNT_3);
+    }
+
+    @Test
+    public void testCalculate() {
+        function.accept(MeterEntity.newService("service-test", Layer.GENERAL), 
HTTP_CODE_COUNT_1);
+        function.accept(MeterEntity.newService("service-test", Layer.GENERAL), 
HTTP_CODE_COUNT_2);
+        function.accept(MeterEntity.newService("service-test", Layer.GENERAL), 
HTTP_CODE_COUNT_3);
+        function.calculate();
+
+        assertThat(function.getValue()).isEqualTo(HTTP_CODE_COUNT_3);
+    }
+
+    @Test
+    public void testToHour() {
+        function.accept(MeterEntity.newService("service-test", Layer.GENERAL), 
HTTP_CODE_COUNT_1);
+        function.accept(MeterEntity.newService("service-test", Layer.GENERAL), 
HTTP_CODE_COUNT_2);
+        function.calculate();
+
+        final LatestLabeledFunction hourFunction = (LatestLabeledFunction) 
function.toHour();
+        hourFunction.calculate();
+
+        assertThat(hourFunction.getValue()).isEqualTo(HTTP_CODE_COUNT_2);
+    }
+
+    @Test
+    public void testToDay() {
+        function.accept(
+            MeterEntity.newService("service-test", Layer.GENERAL),
+            HTTP_CODE_COUNT_1
+        );
+        function.accept(
+            MeterEntity.newService("service-test", Layer.GENERAL),
+            HTTP_CODE_COUNT_2
+        );
+        function.calculate();
+
+        final LatestLabeledFunction dayFunction = (LatestLabeledFunction) 
function.toDay();
+        dayFunction.calculate();
+
+        assertThat(dayFunction.getValue()).isEqualTo(HTTP_CODE_COUNT_2);
+    }
+
+    @Test
+    public void testSerialize() {
+        function.accept(MeterEntity.newService("service-test", Layer.GENERAL), 
HTTP_CODE_COUNT_1);
+
+        LatestLabeledFunction function2 = new LatestLabeledFunctionInst();
+        function2.deserialize(function.serialize().build());
+
+        assertThat(function2.getEntityId()).isEqualTo(function.getEntityId());
+        
assertThat(function2.getTimeBucket()).isEqualTo(function.getTimeBucket());
+        
assertThat(function2.getServiceId()).isEqualTo(function.getServiceId());
+        assertThat(function2.getValue()).isEqualTo(function.getValue());
+    }
+
+    @Test
+    public void testBuilder() throws IllegalAccessException, 
InstantiationException {
+        function.accept(MeterEntity.newService("service-test", Layer.GENERAL), 
HTTP_CODE_COUNT_1);
+        function.calculate();
+
+        StorageBuilder<LatestLabeledFunction> storageBuilder = 
function.builder().newInstance();
+
+        final HashMapConverter.ToStorage toStorage = new 
HashMapConverter.ToStorage();
+        storageBuilder.entity2Storage(function, toStorage);
+        final Map<String, Object> map = toStorage.obtain();
+        map.put(LatestLabeledFunction.VALUE, ((DataTable) 
map.get(LatestLabeledFunction.VALUE)).toStorageData());
+
+        LatestLabeledFunction function2 = storageBuilder.storage2Entity(new 
HashMapConverter.ToEntity(map));
+
+        assertThat(function2.getValue()).isEqualTo(function.getValue());
+    }
+
+    private static class LatestLabeledFunctionInst extends 
LatestLabeledFunction {
+        @Override
+        public AcceptableValue<DataTable> createNew() {
+            return new LatestLabeledFunctionInst();
+        }
+    }
+}

Reply via email to