This is an automated email from the ASF dual-hosted git repository.
wankai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 71c469c9ce Add `LatestLabeledFunction` for meter. (#13624)
71c469c9ce is described below
commit 71c469c9ce42a7290ea10e597952c78f169022c7
Author: Wan Kai <[email protected]>
AuthorDate: Wed Dec 17 10:34:43 2025 +0800
Add `LatestLabeledFunction` for meter. (#13624)
---
docs/en/changes/changes.md | 1 +
.../function/latest/LatestLabeledFunction.java | 189 +++++++++++++++++++++
.../function/latest/LatestLabeledFunctionTest.java | 154 +++++++++++++++++
3 files changed, 344 insertions(+)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index feba6cc0f5..9eac35f638 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -11,6 +11,7 @@
* Init `log-mal-rules` at module provider start stage to avoid re-init for
every LAL.
* Fail fast if SampleFamily is empty after MAL filter expression.
* Fix range matrix and scalar binary operation in PromQL.
+* Add `LatestLabeledFunction` for meter.
#### UI
* Fix the missing icon in new native trace view.
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunction.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunction.java
new file mode 100644
index 0000000000..039d4d90a5
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunction.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function.latest;
+
+import java.util.Objects;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.meter.Meter;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import
org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import
org.apache.skywalking.oap.server.core.analysis.metrics.LabeledValueHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageID;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+@MeterFunction(functionName = "latestLabeled")
+@ToString
+public abstract class LatestLabeledFunction extends Meter implements
AcceptableValue<DataTable>, LabeledValueHolder {
+
+ public static final String VALUE = "datatable_value";
+
+ @Setter
+ @Getter
+ @ElasticSearch.EnableDocValues
+ @Column(name = ENTITY_ID, length = 512)
+ @BanyanDB.SeriesID(index = 0)
+ private String entityId;
+
+ /**
+ * Service ID is required for sort query.
+ */
+ @Setter
+ @Getter
+ @Column(name = InstanceTraffic.SERVICE_ID)
+ private String serviceId;
+
+ @Getter
+ @Setter
+ @Column(name = VALUE, dataType = Column.ValueDataType.LABELED_VALUE,
storageOnly = true)
+ @BanyanDB.MeasureField
+ private DataTable value = new DataTable(30);
+
+ @Override
+ public void accept(final MeterEntity entity, final DataTable value) {
+ setEntityId(entity.id());
+ setServiceId(entity.serviceId());
+ this.value = value;
+ }
+
+ @Override
+ public final boolean combine(Metrics metrics) {
+ final LatestLabeledFunction latestLabeledFunction =
(LatestLabeledFunction) metrics;
+ this.value = latestLabeledFunction.getValue();
+ return true;
+ }
+
+ @Override
+ public final void calculate() {
+
+ }
+
+ @Override
+ public Metrics toHour() {
+ LatestLabeledFunction metrics = (LatestLabeledFunction) createNew();
+ metrics.setEntityId(getEntityId());
+ metrics.setTimeBucket(toTimeBucketInHour());
+ metrics.setServiceId(getServiceId());
+ metrics.getValue().copyFrom(getValue());
+ return metrics;
+ }
+
+ @Override
+ public Metrics toDay() {
+ LatestLabeledFunction metrics = (LatestLabeledFunction) createNew();
+ metrics.setEntityId(getEntityId());
+ metrics.setTimeBucket(toTimeBucketInDay());
+ metrics.setServiceId(getServiceId());
+ metrics.getValue().copyFrom(getValue());
+ return metrics;
+ }
+
+ @Override
+ protected StorageID id0() {
+ return new StorageID()
+ .append(TIME_BUCKET, getTimeBucket())
+ .append(ENTITY_ID, getEntityId());
+ }
+
+ @Override
+ public void deserialize(final RemoteData remoteData) {
+ setValue(new DataTable(remoteData.getDataObjectStrings(0)));
+ setTimeBucket(remoteData.getDataLongs(0));
+
+ setEntityId(remoteData.getDataStrings(0));
+ setServiceId(remoteData.getDataStrings(1));
+ }
+
+ @Override
+ public RemoteData.Builder serialize() {
+ final RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+ remoteBuilder.addDataObjectStrings(value.toStorageData());
+ remoteBuilder.addDataLongs(getTimeBucket());
+
+ remoteBuilder.addDataStrings(entityId);
+ remoteBuilder.addDataStrings(serviceId);
+
+ return remoteBuilder;
+ }
+
+ @Override
+ public int remoteHashCode() {
+ return entityId.hashCode();
+ }
+
+ @Override
+ public Class<? extends LatestLabeledStorageBuilder> builder() {
+ return LatestLabeledStorageBuilder.class;
+ }
+
+ public static class LatestLabeledStorageBuilder implements
StorageBuilder<LatestLabeledFunction> {
+ @Override
+ public LatestLabeledFunction storage2Entity(final Convert2Entity
converter) {
+ LatestLabeledFunction metrics = new LatestLabeledFunction() {
+ @Override
+ public AcceptableValue<DataTable> createNew() {
+ throw new UnexpectedException("createNew should not be
called");
+ }
+ };
+ metrics.setValue(new DataTable((String) converter.get(VALUE)));
+ metrics.setTimeBucket(((Number)
converter.get(TIME_BUCKET)).longValue());
+ metrics.setServiceId((String)
converter.get(InstanceTraffic.SERVICE_ID));
+ metrics.setEntityId((String) converter.get(ENTITY_ID));
+ return metrics;
+ }
+
+ @Override
+ public void entity2Storage(final LatestLabeledFunction storageData,
final Convert2Storage converter) {
+ converter.accept(VALUE, storageData.getValue());
+ converter.accept(TIME_BUCKET, storageData.getTimeBucket());
+ converter.accept(InstanceTraffic.SERVICE_ID,
storageData.getServiceId());
+ converter.accept(ENTITY_ID, storageData.getEntityId());
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof LatestLabeledFunction)) {
+ return false;
+ }
+ LatestLabeledFunction function = (LatestLabeledFunction) o;
+ return Objects.equals(entityId, function.entityId) &&
+ getTimeBucket() == function.getTimeBucket();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(entityId, getTimeBucket());
+ }
+}
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunctionTest.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunctionTest.java
new file mode 100644
index 0000000000..7492868930
--- /dev/null
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/latest/LatestLabeledFunctionTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function.latest;
+
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.config.NamingControl;
+import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
+import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+@ExtendWith(MockitoExtension.class)
+public class LatestLabeledFunctionTest {
+
+ private static final DataTable HTTP_CODE_COUNT_1 = new
DataTable("200,2|301,2|404,3|502,4");
+
+ private static final DataTable HTTP_CODE_COUNT_2 = new
DataTable("200,1|301,4|404,5|502,1|505,1");
+
+ private static final DataTable HTTP_CODE_COUNT_3 = new
DataTable("200,2|301,4|404,5|502,4|505,1");
+
+ private LatestLabeledFunction function;
+
+ @BeforeAll
+ public static void setup() {
+ MeterEntity.setNamingControl(
+ new NamingControl(512, 512, 512, new EndpointNameGrouping()));
+ }
+
+ @BeforeEach
+ public void before() {
+ function = new LatestLabeledFunctionInst();
+
function.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ MeterEntity.setNamingControl(null);
+ }
+
+ @Test
+ public void testAccept() {
+ function.accept(MeterEntity.newService("service-test", Layer.GENERAL),
HTTP_CODE_COUNT_1);
+ assertThat(function.getValue()).isEqualTo(HTTP_CODE_COUNT_1);
+
+ function.accept(MeterEntity.newService("service-test", Layer.GENERAL),
HTTP_CODE_COUNT_2);
+ function.accept(MeterEntity.newService("service-test", Layer.GENERAL),
HTTP_CODE_COUNT_3);
+ assertThat(function.getValue()).isEqualTo(HTTP_CODE_COUNT_3);
+ }
+
+ @Test
+ public void testCalculate() {
+ function.accept(MeterEntity.newService("service-test", Layer.GENERAL),
HTTP_CODE_COUNT_1);
+ function.accept(MeterEntity.newService("service-test", Layer.GENERAL),
HTTP_CODE_COUNT_2);
+ function.accept(MeterEntity.newService("service-test", Layer.GENERAL),
HTTP_CODE_COUNT_3);
+ function.calculate();
+
+ assertThat(function.getValue()).isEqualTo(HTTP_CODE_COUNT_3);
+ }
+
+ @Test
+ public void testToHour() {
+ function.accept(MeterEntity.newService("service-test", Layer.GENERAL),
HTTP_CODE_COUNT_1);
+ function.accept(MeterEntity.newService("service-test", Layer.GENERAL),
HTTP_CODE_COUNT_2);
+ function.calculate();
+
+ final LatestLabeledFunction hourFunction = (LatestLabeledFunction)
function.toHour();
+ hourFunction.calculate();
+
+ assertThat(hourFunction.getValue()).isEqualTo(HTTP_CODE_COUNT_2);
+ }
+
+ @Test
+ public void testToDay() {
+ function.accept(
+ MeterEntity.newService("service-test", Layer.GENERAL),
+ HTTP_CODE_COUNT_1
+ );
+ function.accept(
+ MeterEntity.newService("service-test", Layer.GENERAL),
+ HTTP_CODE_COUNT_2
+ );
+ function.calculate();
+
+ final LatestLabeledFunction dayFunction = (LatestLabeledFunction)
function.toDay();
+ dayFunction.calculate();
+
+ assertThat(dayFunction.getValue()).isEqualTo(HTTP_CODE_COUNT_2);
+ }
+
+ @Test
+ public void testSerialize() {
+ function.accept(MeterEntity.newService("service-test", Layer.GENERAL),
HTTP_CODE_COUNT_1);
+
+ LatestLabeledFunction function2 = new LatestLabeledFunctionInst();
+ function2.deserialize(function.serialize().build());
+
+ assertThat(function2.getEntityId()).isEqualTo(function.getEntityId());
+
assertThat(function2.getTimeBucket()).isEqualTo(function.getTimeBucket());
+
assertThat(function2.getServiceId()).isEqualTo(function.getServiceId());
+ assertThat(function2.getValue()).isEqualTo(function.getValue());
+ }
+
+ @Test
+ public void testBuilder() throws IllegalAccessException,
InstantiationException {
+ function.accept(MeterEntity.newService("service-test", Layer.GENERAL),
HTTP_CODE_COUNT_1);
+ function.calculate();
+
+ StorageBuilder<LatestLabeledFunction> storageBuilder =
function.builder().newInstance();
+
+ final HashMapConverter.ToStorage toStorage = new
HashMapConverter.ToStorage();
+ storageBuilder.entity2Storage(function, toStorage);
+ final Map<String, Object> map = toStorage.obtain();
+ map.put(LatestLabeledFunction.VALUE, ((DataTable)
map.get(LatestLabeledFunction.VALUE)).toStorageData());
+
+ LatestLabeledFunction function2 = storageBuilder.storage2Entity(new
HashMapConverter.ToEntity(map));
+
+ assertThat(function2.getValue()).isEqualTo(function.getValue());
+ }
+
+ private static class LatestLabeledFunctionInst extends
LatestLabeledFunction {
+ @Override
+ public AcceptableValue<DataTable> createNew() {
+ return new LatestLabeledFunctionInst();
+ }
+ }
+}