This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 86260002b2 Fix the malfunctioning alarm feature of MAL metrics (#13543)
86260002b2 is described below
commit 86260002b262a9531cebd496c546ebbf47cd434f
Author: weixiang1862 <[email protected]>
AuthorDate: Fri Oct 17 06:28:15 2025 +0800
Fix the malfunctioning alarm feature of MAL metrics (#13543)
---
docs/en/changes/changes.md | 1 +
.../analysis/worker/MetricsStreamProcessor.java | 2 +-
.../server/core/remote/RemoteServiceHandler.java | 2 +-
.../server/core/worker/IWorkerInstanceSetter.java | 5 +-
.../oap/server/core/worker/RemoteHandleWorker.java | 43 ++++++++++--
.../server/core/worker/WorkerInstancesService.java | 5 +-
...e.java => RemoteServiceHandlerMALTestCase.java} | 77 ++++++++--------------
...e.java => RemoteServiceHandlerOALTestCase.java} | 9 +--
.../remote/client/GRPCRemoteClientTestCase.java | 3 +-
.../core/mock/MockWorkerInstancesService.java | 4 +-
10 files changed, 88 insertions(+), 63 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 48a0984adf..5bdbef86cc 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -107,6 +107,7 @@
* OAP Self Observability: make Trace analysis metrics separate by label
`protocol`, add Zipkin span dropped metrics.
* BanyanDB: Move data write logic from BanyanDB Java Client to OAP and support
observe metrics for write operations.
* Self Observability: add write latency metrics for BanyanDB and ElasticSearch.
+* Fix the malfunctioning alarm feature of MAL metrics due to unknown metadata
in L2 aggregate worker.
#### UI
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
index ee1a3bdca6..fe70ebdd7f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsStreamProcessor.java
@@ -191,7 +191,7 @@ public class MetricsStreamProcessor implements
StreamProcessor<Metrics> {
IWorkerInstanceSetter workerInstanceSetter =
moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(IWorkerInstanceSetter.class);
- workerInstanceSetter.put(remoteReceiverWorkerName,
minutePersistentWorker, metricsClass);
+ workerInstanceSetter.put(remoteReceiverWorkerName,
minutePersistentWorker, kind, metricsClass);
MetricsRemoteWorker remoteWorker = new
MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
MetricsAggregateWorker aggregateWorker;
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
index 7da9a72344..b7f80c8384 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.java
@@ -122,7 +122,7 @@ public class RemoteServiceHandler extends
RemoteServiceGrpc.RemoteServiceImplBas
AbstractWorker nextWorker = handleWorker.getWorker();
StreamData streamData;
try {
- streamData =
handleWorker.getStreamDataClass().newInstance();
+ streamData = handleWorker.newStreamDataInstance();
} catch (Throwable t) {
remoteInErrorCounter.inc();
LOGGER.error(t.getMessage(), t);
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
index c71698a090..88b84578be 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/IWorkerInstanceSetter.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.worker;
import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.library.module.Service;
@@ -29,7 +30,9 @@ public interface IWorkerInstanceSetter extends Service {
/**
* @param remoteReceiverWorkName worker name
* @param instance The worker instance processes the given streamDataClass.
+ * @param kind Metric kind (OAL, MAL).
* @param streamDataClass Type of metrics.
*/
- void put(String remoteReceiverWorkName, AbstractWorker instance, Class<?
extends StreamData> streamDataClass);
+ void put(String remoteReceiverWorkName, AbstractWorker instance,
+ MetricStreamKind kind, Class<? extends StreamData>
streamDataClass);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java
index e24799e55f..4a3bab4038 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/RemoteHandleWorker.java
@@ -18,13 +18,48 @@
package org.apache.skywalking.oap.server.core.worker;
-import lombok.AllArgsConstructor;
import lombok.Getter;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
-@AllArgsConstructor
@Getter
public class RemoteHandleWorker {
- private AbstractWorker worker;
- private Class<? extends StreamData> streamDataClass;
+ private final AbstractWorker worker;
+ private final MetricStreamKind kind;
+ private final Class<? extends StreamData> streamDataClass;
+
+ private AcceptableValue<?> meterClassPrototype;
+
+ public RemoteHandleWorker(AbstractWorker worker, MetricStreamKind kind,
+ Class<? extends StreamData> streamDataClass) {
+ this.worker = worker;
+ this.kind = kind;
+ this.streamDataClass = streamDataClass;
+
+ if (MetricStreamKind.MAL == kind) {
+ try {
+ meterClassPrototype = (AcceptableValue<?>)
streamDataClass.newInstance();
+ } catch (Exception e) {
+ throw new UnexpectedException("Can't create mal meter
prototype with stream class" + streamDataClass);
+ }
+ }
+ }
+
+ /**
+ * Create a new StreamData instance with metadata {@link WithMetadata} for
RemoteServiceHandler to deserialize the RemoteMessage.
+ * OAL metrics can initialize metadata through the constructor,
+ * while MAL metrics need to initialize metadata through {@link
AcceptableValue#createNew}.
+ */
+ public StreamData newStreamDataInstance() throws InstantiationException,
IllegalAccessException {
+ switch (kind) {
+ case OAL:
+ return streamDataClass.newInstance();
+ case MAL:
+ return (StreamData) meterClassPrototype.createNew();
+ }
+ throw new UnexpectedException("Unsupported metrics stream kind" +
kind);
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
index 16476100ef..e0347aa0bc 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/worker/WorkerInstancesService.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.worker;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,11 +45,11 @@ public class WorkerInstancesService implements
IWorkerInstanceSetter, IWorkerIns
@Override
public void put(String remoteReceiverWorkName, AbstractWorker instance,
- Class<? extends StreamData> streamDataClass) {
+ MetricStreamKind kind, Class<? extends StreamData>
streamDataClass) {
if (instances.containsKey(remoteReceiverWorkName)) {
throw new UnexpectedException("Duplicate worker name:" +
remoteReceiverWorkName);
}
- instances.put(remoteReceiverWorkName, new RemoteHandleWorker(instance,
streamDataClass));
+ instances.put(remoteReceiverWorkName, new RemoteHandleWorker(instance,
kind, streamDataClass));
LOGGER.debug("Worker {} has been registered as {}",
instance.toString(), remoteReceiverWorkName);
}
}
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerMALTestCase.java
similarity index 79%
copy from
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
copy to
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerMALTestCase.java
index 245b5c22b4..5c2cdf1ded 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerMALTestCase.java
@@ -24,8 +24,13 @@ import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import
org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import
org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgFunction;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
@@ -48,15 +53,11 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class RemoteServiceHandlerTestCase {
+public class RemoteServiceHandlerMALTestCase {
private Server server;
private ManagedChannel channel;
@@ -67,9 +68,9 @@ public class RemoteServiceHandlerTestCase {
serviceRegistry = new MutableHandlerRegistry();
final String name = UUID.randomUUID().toString();
InProcessServerBuilder serverBuilder =
- InProcessServerBuilder
- .forName(name)
- .fallbackHandlerRegistry(serviceRegistry);
+ InProcessServerBuilder
+ .forName(name)
+ .fallbackHandlerRegistry(serviceRegistry);
server = serverBuilder.build();
server.start();
@@ -109,9 +110,8 @@ public class RemoteServiceHandlerTestCase {
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class,
workerInstancesService);
TestWorker worker = new TestWorker(moduleManager);
- workerInstancesService.put(testWorkerId, worker, TestRemoteData.class);
+ workerInstancesService.put(testWorkerId, worker, MetricStreamKind.MAL,
TestRemoteData.class);
- String serverName = InProcessServerBuilder.generateName();
MetricsCreator metricsCreator = mock(MetricsCreator.class);
when(metricsCreator.createCounter(any(), any(), any(),
any())).thenReturn(new CounterMetrics() {
@Override
@@ -139,6 +139,7 @@ public class RemoteServiceHandlerTestCase {
ModuleDefineTesting telemetryModuleDefine = new ModuleDefineTesting();
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class,
metricsCreator);
+ serviceRegistry.addService(new RemoteServiceHandler(moduleManager));
RemoteServiceGrpc.RemoteServiceStub remoteServiceStub =
RemoteServiceGrpc.newStub(channel);
@@ -159,49 +160,30 @@ public class RemoteServiceHandlerTestCase {
}
});
- RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
- remoteMessage.setNextWorkerName(testWorkerId);
+ TestRemoteData testRemoteData = new TestRemoteData();
+ testRemoteData.setCount(10);
+ testRemoteData.setSummation(100);
+ testRemoteData.setTimeBucket(202510161550L);
+ testRemoteData.setEntityId("test-entity-id");
+ testRemoteData.setServiceId("test-service-id");
- RemoteData.Builder remoteData = RemoteData.newBuilder();
- remoteData.addDataStrings("test1");
- remoteData.addDataStrings("test2");
+ RemoteData.Builder remoteData = testRemoteData.serialize();
- remoteData.addDataLongs(10);
- remoteData.addDataLongs(20);
+ RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
+ remoteMessage.setNextWorkerName(testWorkerId);
remoteMessage.setRemoteData(remoteData);
streamObserver.onNext(remoteMessage.build());
streamObserver.onCompleted();
}
- static class TestRemoteData extends StreamData {
-
- private String str1;
- private String str2;
- private long long1;
- private long long2;
-
- @Override
- public int remoteHashCode() {
- return 10;
- }
-
- @Override
- public void deserialize(RemoteData remoteData) {
- str1 = remoteData.getDataStrings(0);
- str2 = remoteData.getDataStrings(1);
- long1 = remoteData.getDataLongs(0);
- long2 = remoteData.getDataLongs(1);
-
- Assertions.assertEquals("test1", str1);
- Assertions.assertEquals("test2", str2);
- Assertions.assertEquals(10, long1);
- Assertions.assertEquals(20, long2);
- }
+ public static class TestRemoteData extends AvgFunction {
@Override
- public RemoteData.Builder serialize() {
- return null;
+ public AcceptableValue<Long> createNew() {
+ TestRemoteData testRemoteData = new TestRemoteData();
+ testRemoteData.initMeta("test-avg-meter", 1);
+ return testRemoteData;
}
}
@@ -215,10 +197,9 @@ public class RemoteServiceHandlerTestCase {
public void in(Object o) {
TestRemoteData data = (TestRemoteData) o;
- Assertions.assertEquals("test1", data.str1);
- Assertions.assertEquals("test2", data.str2);
- Assertions.assertEquals(10, data.long1);
- Assertions.assertEquals(20, data.long2);
+ Assertions.assertEquals(10, data.getValue());
+ Assertions.assertEquals("test-avg-meter",
data.getMeta().getMetricsName());
+ Assertions.assertEquals(1, data.getMeta().getScope());
}
}
}
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerOALTestCase.java
similarity index 95%
rename from
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
rename to
oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerOALTestCase.java
index 245b5c22b4..5283bc683c 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/RemoteServiceHandlerOALTestCase.java
@@ -25,6 +25,7 @@ import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
@@ -56,7 +57,7 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class RemoteServiceHandlerTestCase {
+public class RemoteServiceHandlerOALTestCase {
private Server server;
private ManagedChannel channel;
@@ -109,9 +110,8 @@ public class RemoteServiceHandlerTestCase {
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class,
workerInstancesService);
TestWorker worker = new TestWorker(moduleManager);
- workerInstancesService.put(testWorkerId, worker, TestRemoteData.class);
+ workerInstancesService.put(testWorkerId, worker, MetricStreamKind.OAL,
TestRemoteData.class);
- String serverName = InProcessServerBuilder.generateName();
MetricsCreator metricsCreator = mock(MetricsCreator.class);
when(metricsCreator.createCounter(any(), any(), any(),
any())).thenReturn(new CounterMetrics() {
@Override
@@ -139,6 +139,7 @@ public class RemoteServiceHandlerTestCase {
ModuleDefineTesting telemetryModuleDefine = new ModuleDefineTesting();
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class,
metricsCreator);
+ serviceRegistry.addService(new RemoteServiceHandler(moduleManager));
RemoteServiceGrpc.RemoteServiceStub remoteServiceStub =
RemoteServiceGrpc.newStub(channel);
@@ -174,7 +175,7 @@ public class RemoteServiceHandlerTestCase {
streamObserver.onCompleted();
}
- static class TestRemoteData extends StreamData {
+ public static class TestRemoteData extends StreamData {
private String str1;
private String str2;
diff --git
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
index 1765742185..0f81d5a232 100644
---
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
+++
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClientTestCase.java
@@ -24,6 +24,7 @@ import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.util.MutableHandlerRegistry;
import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
@@ -84,7 +85,7 @@ public class GRPCRemoteClientTestCase {
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class,
workerInstancesService);
TestWorker worker = new TestWorker(moduleManager);
- workerInstancesService.put(nextWorkerName, worker,
TestStreamData.class);
+ workerInstancesService.put(nextWorkerName, worker,
MetricStreamKind.OAL, TestStreamData.class);
}
@AfterEach
diff --git
a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockWorkerInstancesService.java
b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockWorkerInstancesService.java
index b9e3a93f45..54a734471e 100644
---
a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockWorkerInstancesService.java
+++
b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-server-mock/src/main/java/org/apache/skywalking/oap/server/tool/profile/core/mock/MockWorkerInstancesService.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.tool.profile.core.mock;
+import org.apache.skywalking.oap.server.core.analysis.worker.MetricStreamKind;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
@@ -35,6 +36,7 @@ public class MockWorkerInstancesService implements
IWorkerInstanceSetter, IWorke
}
@Override
- public void put(String remoteReceiverWorkName, AbstractWorker instance,
Class<? extends StreamData> streamDataClass) {
+ public void put(String remoteReceiverWorkName, AbstractWorker instance,
+ MetricStreamKind kind, Class<? extends StreamData>
streamDataClass) {
}
}