This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch feature/als-mx
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 0019f6caa8ed00fce2aca4dbba58b9d5151d3cfe
Author: kezhenxu94 <kezhenx...@163.com>
AuthorDate: Mon Nov 2 18:53:39 2020 +0800

    Refactor Kubernetes analyzer
---
 .github/workflows/e2e.istio.yaml                   |   2 +
 .../envoy/AccessLogServiceGRPCHandler.java         |  14 +-
 .../server/receiver/envoy/als/ALSHTTPAnalysis.java |   4 +-
 ...SHTTPAnalysis.java => AbstractALSAnalyzer.java} |  32 +-
 .../envoy/als/LogEntry2MetricsAdapter.java         | 132 ++++++++
 .../als/k8s/K8sALSServiceMeshHTTPAnalysis.java     | 334 ---------------------
 .../als/k8s/K8sALSServiceMeshHTTPAnalyzer.java     | 186 ++++++++++++
 ...g.oap.server.receiver.envoy.als.ALSHTTPAnalysis |   2 +-
 .../envoy/als/k8s/K8sHTTPAnalysisTest.java         |  41 +--
 9 files changed, 364 insertions(+), 383 deletions(-)

diff --git a/.github/workflows/e2e.istio.yaml b/.github/workflows/e2e.istio.yaml
index 34d590e..43c051c 100644
--- a/.github/workflows/e2e.istio.yaml
+++ b/.github/workflows/e2e.istio.yaml
@@ -19,10 +19,12 @@ name: Istio
 on:
   pull_request:
     paths:
+      - '**'
       - '!**.md'
   push:
     branches:
       - master
+      - feature/als-mx
 
 env:
   SKIP_TEST: true
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
index b715ee3..99c22ed 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
@@ -25,9 +25,8 @@ import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.ServiceLoader;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.source.Source;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
 import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
@@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory;
 public class AccessLogServiceGRPCHandler extends 
AccessLogServiceGrpc.AccessLogServiceImplBase {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
     private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;
-    private final SourceReceiver sourceReceiver;
+
     private final CounterMetrics counter;
     private final HistogramMetrics histogram;
     private final CounterMetrics sourceDispatcherCounter;
@@ -61,8 +60,6 @@ public class AccessLogServiceGRPCHandler extends 
AccessLogServiceGrpc.AccessLogS
 
         LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);
 
-        sourceReceiver = 
manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
-
         MetricsCreator metricCreator = 
manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
         counter = metricCreator.createCounter("envoy_als_in_count", "The count 
of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
         histogram = 
metricCreator.createHistogramMetric("envoy_als_in_latency", "The process 
latency of service ALS metric receiver", MetricsTag.EMPTY_KEY, 
MetricsTag.EMPTY_VALUE);
@@ -103,7 +100,7 @@ public class AccessLogServiceGRPCHandler extends 
AccessLogServiceGrpc.AccessLogS
                         case HTTP_LOGS:
                             StreamAccessLogsMessage.HTTPAccessLogEntries logs 
= message.getHttpLogs();
 
-                            List<Source> sourceResult = new ArrayList<>();
+                            List<ServiceMeshMetric.Builder> sourceResult = new 
ArrayList<>();
                             for (ALSHTTPAnalysis analysis : 
envoyHTTPAnalysisList) {
                                 logs.getLogEntryList().forEach(log -> {
                                     
sourceResult.addAll(analysis.analysis(identifier, log, role));
@@ -111,7 +108,8 @@ public class AccessLogServiceGRPCHandler extends 
AccessLogServiceGrpc.AccessLogS
                             }
 
                             sourceDispatcherCounter.inc(sourceResult.size());
-                            sourceResult.forEach(sourceReceiver::receive);
+                            
sourceResult.forEach(TelemetryDataDispatcher::process);
+                            break;
                     }
                 } finally {
                     timer.finish();
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
index 4297f3d..426604e 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.receiver.envoy.als;
 import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
 import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
 import java.util.List;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
 import 
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
 
 /**
@@ -32,7 +32,7 @@ public interface ALSHTTPAnalysis {
 
     void init(EnvoyMetricReceiverConfig config);
 
-    List<Source> analysis(StreamAccessLogsMessage.Identifier identifier, 
HTTPAccessLogEntry entry, Role role);
+    List<ServiceMeshMetric.Builder> 
analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry 
entry, Role role);
 
     Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
 }
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
similarity index 58%
copy from 
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
copy to 
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
index 4297f3d..6f492db 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
@@ -18,21 +18,27 @@
 
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
-import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.api.v2.core.Node;
 import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
-import java.util.List;
-import org.apache.skywalking.oap.server.core.source.Source;
-import 
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
 
-/**
- * Analysis source metrics from ALS
- */
-public interface ALSHTTPAnalysis {
-    String name();
-
-    void init(EnvoyMetricReceiverConfig config);
+public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
 
-    List<Source> analysis(StreamAccessLogsMessage.Identifier identifier, 
HTTPAccessLogEntry entry, Role role);
+    @Override
+    public Role identify(final StreamAccessLogsMessage.Identifier 
alsIdentifier, final Role defaultRole) {
+        if (alsIdentifier == null) {
+            return defaultRole;
+        }
+        final Node node = alsIdentifier.getNode();
+        if (node == null) {
+            return defaultRole;
+        }
+        final String id = node.getId();
+        if (id.startsWith("router~")) {
+            return Role.PROXY;
+        } else if (id.startsWith("sidecar~")) {
+            return Role.SIDECAR;
+        }
+        return defaultRole;
+    }
 
-    Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
 }
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
new file mode 100644
index 0000000..f170f0f
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import com.google.protobuf.Duration;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.UInt32Value;
+import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties;
+import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties;
+import java.time.Instant;
+import java.util.Optional;
+import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Optional.ofNullable;
+
+/**
+ * Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} 
builders.
+ */
+public final class LogEntry2MetricsAdapter {
+
+    public static final String NON_TLS = "NONE";
+
+    public static final String M_TLS = "mTLS";
+
+    public static final String TLS = "TLS";
+
+    /**
+     * Adapt the {@code entry} partially, into a downstream metrics {@link 
ServiceMeshMetric.Builder}.
+     *
+     * @param entry the log entry to adapt.
+     * @return the {@link ServiceMeshMetric.Builder} adapted from the given 
entry.
+     */
+    public static ServiceMeshMetric.Builder adaptToDownstreamMetrics(final 
HTTPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final long startTime = formatAsLong(properties.getStartTime());
+        final long duration = 
formatAsLong(properties.getTimeToLastDownstreamTxByte());
+        return adaptCommonPart(entry)
+            .setStartTime(startTime)
+            .setEndTime(startTime + duration)
+            .setLatency((int) duration);
+    }
+
+    /**
+     * Adapt the {@code entry} partially, into a upstream metrics {@link 
ServiceMeshMetric.Builder}.
+     *
+     * @param entry the log entry to adapt.
+     * @return the {@link ServiceMeshMetric.Builder} adapted from the given 
entry.
+     */
+    public static ServiceMeshMetric.Builder adaptUpstreamMetrics(final 
HTTPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final long startTime = 
formatAsLong(properties.getTimeToFirstUpstreamTxByte());
+        final long duration = 
formatAsLong(properties.getTimeToLastUpstreamRxByte());
+        return adaptCommonPart(entry)
+            .setStartTime(startTime)
+            .setEndTime(startTime + duration)
+            .setLatency((int) duration);
+    }
+
+    private static ServiceMeshMetric.Builder adaptCommonPart(final 
HTTPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final String endpoint = 
ofNullable(entry.getRequest()).map(HTTPRequestProperties::getPath).orElse("/");
+        final int responseCode = 
ofNullable(entry.getResponse()).map(HTTPResponseProperties::getResponseCode).map(UInt32Value::getValue).orElse(200);
+        final boolean status = responseCode >= 200 && responseCode < 400;
+        final Protocol protocol = requestProtocol(entry.getRequest());
+        final String tlsMode = parseTLS(properties.getTlsProperties());
+
+        return ServiceMeshMetric.newBuilder()
+                                .setEndpoint(endpoint)
+                                .setResponseCode(Math.toIntExact(responseCode))
+                                .setStatus(status)
+                                .setProtocol(protocol)
+                                .setTlsMode(tlsMode);
+    }
+
+    private static long formatAsLong(final Timestamp timestamp) {
+        return Instant.ofEpochSecond(timestamp.getSeconds(), 
timestamp.getNanos()).toEpochMilli();
+    }
+
+    private static long formatAsLong(final Duration duration) {
+        return Instant.ofEpochSecond(duration.getSeconds(), 
duration.getNanos()).toEpochMilli();
+    }
+
+    private static Protocol requestProtocol(final HTTPRequestProperties 
request) {
+        if (request == null) {
+            return Protocol.HTTP;
+        }
+        final String scheme = request.getScheme();
+        if (scheme.startsWith("http")) {
+            return Protocol.HTTP;
+        }
+        return Protocol.gRPC;
+    }
+
+    private static String parseTLS(final TLSProperties properties) {
+        if (properties == null) {
+            return NON_TLS;
+        }
+        if 
(isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties())
+                                  
.orElse(TLSProperties.CertificateProperties.newBuilder().build())
+                                  .getSubject())) {
+            return NON_TLS;
+        }
+        if 
(isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties())
+                                  
.orElse(TLSProperties.CertificateProperties.newBuilder().build())
+                                  .getSubject())) {
+            return TLS;
+        }
+        return M_TLS;
+    }
+
+}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
deleted file mode 100644
index f06d941..0000000
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.envoy.als.k8s;
-
-import com.google.common.base.Strings;
-import com.google.protobuf.Duration;
-import com.google.protobuf.Timestamp;
-import io.envoyproxy.envoy.api.v2.core.Address;
-import io.envoyproxy.envoy.api.v2.core.Node;
-import io.envoyproxy.envoy.api.v2.core.SocketAddress;
-import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
-import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties;
-import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties;
-import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties;
-import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
-import org.apache.skywalking.apm.network.common.v3.DetectPoint;
-import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
-import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
-import org.apache.skywalking.oap.server.core.source.Source;
-import 
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
-import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
-import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
-
-/**
- * Analysis log based on ingress and mesh scenarios.
- */
-@Slf4j
-public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
-    private static final String NON_TLS = "NONE";
-
-    private static final String M_TLS = "mTLS";
-
-    private static final String TLS = "TLS";
-
-    protected K8SServiceRegistry serviceRegistry;
-
-    @Override
-    public String name() {
-        return "k8s-mesh";
-    }
-
-    @Override
-    @SneakyThrows
-    public void init(EnvoyMetricReceiverConfig config) {
-        serviceRegistry = new K8SServiceRegistry(config);
-        serviceRegistry.start();
-    }
-
-    @Override
-    public List<Source> analysis(StreamAccessLogsMessage.Identifier 
identifier, HTTPAccessLogEntry entry, Role role) {
-        if (serviceRegistry.isEmpty()) {
-            return Collections.emptyList();
-        }
-        switch (role) {
-            case PROXY:
-                analysisProxy(identifier, entry);
-                break;
-            case SIDECAR:
-                return analysisSideCar(identifier, entry);
-        }
-
-        return Collections.emptyList();
-    }
-
-    protected List<Source> analysisSideCar(StreamAccessLogsMessage.Identifier 
identifier, HTTPAccessLogEntry entry) {
-        List<Source> sources = new ArrayList<>();
-        AccessLogCommon properties = entry.getCommonProperties();
-        if (properties != null) {
-            String cluster = properties.getUpstreamCluster();
-            if (cluster != null) {
-                long startTime = formatAsLong(properties.getStartTime());
-                long duration = 
formatAsLong(properties.getTimeToLastDownstreamTxByte());
-
-                HTTPRequestProperties request = entry.getRequest();
-                String endpoint = "/";
-                Protocol protocol = Protocol.HTTP;
-                if (request != null) {
-                    endpoint = request.getPath();
-                    String schema = request.getScheme();
-                    if ("http".equals(schema) || "https".equals(schema)) {
-                        protocol = Protocol.HTTP;
-                    } else {
-                        protocol = Protocol.gRPC;
-                    }
-                }
-                HTTPResponseProperties response = entry.getResponse();
-                int responseCode = 200;
-                if (response != null) {
-                    responseCode = response.getResponseCode().getValue();
-                }
-                boolean status = responseCode >= 200 && responseCode < 400;
-
-                Address downstreamRemoteAddress = 
properties.getDownstreamRemoteAddress();
-                ServiceMetaInfo downstreamService = 
find(downstreamRemoteAddress.getSocketAddress().getAddress());
-                Address downstreamLocalAddress = 
properties.getDownstreamLocalAddress();
-                ServiceMetaInfo localService = 
find(downstreamLocalAddress.getSocketAddress().getAddress());
-                String tlsMode = parseTLS(properties.getTlsProperties());
-
-                ServiceMeshMetric.Builder metric = null;
-                if (cluster.startsWith("inbound|")) {
-                    // Server side
-                    if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
-                        // Ingress -> sidecar(server side)
-                        // Mesh telemetry without source, the relation would 
be generated.
-                        metric = ServiceMeshMetric.newBuilder()
-                                                  .setStartTime(startTime)
-                                                  .setEndTime(startTime + 
duration)
-                                                  
.setDestServiceName(localService.getServiceName())
-                                                  
.setDestServiceInstance(localService.getServiceInstanceName())
-                                                  .setEndpoint(endpoint)
-                                                  .setLatency((int) duration)
-                                                  
.setResponseCode(Math.toIntExact(responseCode))
-                                                  .setStatus(status)
-                                                  .setProtocol(protocol)
-                                                  .setTlsMode(tlsMode)
-                                                  
.setDetectPoint(DetectPoint.server);
-
-                        log.debug("Transformed ingress->sidecar inbound mesh 
metric {}", metric);
-                    } else {
-                        // sidecar -> sidecar(server side)
-                        metric = ServiceMeshMetric.newBuilder()
-                                                  .setStartTime(startTime)
-                                                  .setEndTime(startTime + 
duration)
-                                                  
.setSourceServiceName(downstreamService.getServiceName())
-                                                  
.setSourceServiceInstance(downstreamService.getServiceInstanceName())
-                                                  
.setDestServiceName(localService.getServiceName())
-                                                  
.setDestServiceInstance(localService.getServiceInstanceName())
-                                                  .setEndpoint(endpoint)
-                                                  .setLatency((int) duration)
-                                                  
.setResponseCode(Math.toIntExact(responseCode))
-                                                  .setStatus(status)
-                                                  .setProtocol(protocol)
-                                                  .setTlsMode(tlsMode)
-                                                  
.setDetectPoint(DetectPoint.server);
-
-                        log.debug("Transformed sidecar->sidecar(server side) 
inbound mesh metric {}", metric);
-                    }
-                } else if (cluster.startsWith("outbound|")) {
-                    // sidecar(client side) -> sidecar
-                    Address upstreamRemoteAddress = 
properties.getUpstreamRemoteAddress();
-                    ServiceMetaInfo destService = 
find(upstreamRemoteAddress.getSocketAddress().getAddress());
-
-                    metric = ServiceMeshMetric.newBuilder()
-                                              .setStartTime(startTime)
-                                              .setEndTime(startTime + duration)
-                                              
.setSourceServiceName(downstreamService.getServiceName())
-                                              
.setSourceServiceInstance(downstreamService.getServiceInstanceName())
-                                              
.setDestServiceName(destService.getServiceName())
-                                              
.setDestServiceInstance(destService.getServiceInstanceName())
-                                              .setEndpoint(endpoint)
-                                              .setLatency((int) duration)
-                                              
.setResponseCode(Math.toIntExact(responseCode))
-                                              .setStatus(status)
-                                              .setProtocol(protocol)
-                                              .setTlsMode(tlsMode)
-                                              
.setDetectPoint(DetectPoint.client);
-
-                    log.debug("Transformed sidecar->sidecar(server side) 
inbound mesh metric {}", metric);
-                }
-
-                Optional.ofNullable(metric).ifPresent(this::forward);
-            }
-        }
-        return sources;
-    }
-
-    private String parseTLS(TLSProperties properties) {
-        if (properties == null) {
-            return NON_TLS;
-        }
-        if 
(Strings.isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties())
-                                          
.orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject()))
 {
-            return NON_TLS;
-        }
-        if 
(Strings.isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties())
-                                          
.orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject()))
 {
-            return TLS;
-        }
-        return M_TLS;
-    }
-
-    protected void analysisProxy(StreamAccessLogsMessage.Identifier 
identifier, HTTPAccessLogEntry entry) {
-        AccessLogCommon properties = entry.getCommonProperties();
-        if (properties != null) {
-            Address downstreamLocalAddress = 
properties.getDownstreamLocalAddress();
-            Address downstreamRemoteAddress = 
properties.getDownstreamRemoteAddress();
-            Address upstreamRemoteAddress = 
properties.getUpstreamRemoteAddress();
-            if (downstreamLocalAddress != null && downstreamRemoteAddress != 
null && upstreamRemoteAddress != null) {
-                SocketAddress downstreamRemoteAddressSocketAddress = 
downstreamRemoteAddress.getSocketAddress();
-                ServiceMetaInfo outside = 
find(downstreamRemoteAddressSocketAddress.getAddress());
-
-                SocketAddress downstreamLocalAddressSocketAddress = 
downstreamLocalAddress.getSocketAddress();
-                ServiceMetaInfo ingress = 
find(downstreamLocalAddressSocketAddress.getAddress());
-
-                long startTime = formatAsLong(properties.getStartTime());
-                long duration = 
formatAsLong(properties.getTimeToLastDownstreamTxByte());
-
-                HTTPRequestProperties request = entry.getRequest();
-                String endpoint = "/";
-                Protocol protocol = Protocol.HTTP;
-                if (request != null) {
-                    endpoint = request.getPath();
-                    String schema = request.getScheme();
-                    if ("http".equals(schema) || "https".equals(schema)) {
-                        protocol = Protocol.HTTP;
-                    } else {
-                        protocol = Protocol.gRPC;
-                    }
-                }
-                HTTPResponseProperties response = entry.getResponse();
-                int responseCode = 200;
-                if (response != null) {
-                    responseCode = response.getResponseCode().getValue();
-                }
-                boolean status = responseCode >= 200 && responseCode < 400;
-                String tlsMode = parseTLS(properties.getTlsProperties());
-
-                ServiceMeshMetric.Builder metric = 
ServiceMeshMetric.newBuilder()
-                                                                    
.setStartTime(startTime)
-                                                                    
.setEndTime(startTime + duration)
-                                                                    
.setSourceServiceName(outside.getServiceName())
-                                                                    
.setSourceServiceInstance(
-                                                                        
outside.getServiceInstanceName())
-                                                                    
.setDestServiceName(ingress.getServiceName())
-                                                                    
.setDestServiceInstance(
-                                                                        
ingress.getServiceInstanceName())
-                                                                    
.setEndpoint(endpoint)
-                                                                    
.setLatency((int) duration)
-                                                                    
.setResponseCode(Math.toIntExact(responseCode))
-                                                                    
.setStatus(status)
-                                                                    
.setProtocol(protocol)
-                                                                    
.setTlsMode(tlsMode)
-                                                                    
.setDetectPoint(DetectPoint.server);
-
-                log.debug("Transformed ingress inbound mesh metric {}", 
metric);
-                forward(metric);
-
-                SocketAddress upstreamRemoteAddressSocketAddress = 
upstreamRemoteAddress.getSocketAddress();
-                ServiceMetaInfo targetService = 
find(upstreamRemoteAddressSocketAddress.getAddress());
-
-                long outboundStartTime = startTime + 
formatAsLong(properties.getTimeToFirstUpstreamTxByte());
-                long outboundEndTime = startTime + 
formatAsLong(properties.getTimeToLastUpstreamRxByte());
-
-                ServiceMeshMetric.Builder outboundMetric = 
ServiceMeshMetric.newBuilder()
-                                                                            
.setStartTime(outboundStartTime)
-                                                                            
.setEndTime(outboundEndTime)
-                                                                            
.setSourceServiceName(
-                                                                               
 ingress.getServiceName())
-                                                                            
.setSourceServiceInstance(
-                                                                               
 ingress.getServiceInstanceName())
-                                                                            
.setDestServiceName(
-                                                                               
 targetService.getServiceName())
-                                                                            
.setDestServiceInstance(
-                                                                               
 targetService.getServiceInstanceName())
-                                                                            
.setEndpoint(endpoint)
-                                                                            
.setLatency(
-                                                                               
 (int) (outboundEndTime - outboundStartTime))
-                                                                            
.setResponseCode(
-                                                                               
 Math.toIntExact(responseCode))
-                                                                            
.setStatus(status)
-                                                                            
.setProtocol(protocol)
-                                                                            // 
Can't parse it from tls properties, leave
-                                                                            // 
it to Server side.
-                                                                            
.setTlsMode(NON_TLS)
-                                                                            
.setDetectPoint(DetectPoint.client);
-
-                log.debug("Transformed ingress outbound mesh metric {}", 
outboundMetric);
-                forward(outboundMetric);
-            }
-        }
-    }
-
-    @Override
-    public Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, 
Role prev) {
-        if (alsIdentifier != null) {
-            Node node = alsIdentifier.getNode();
-            if (node != null) {
-                String id = node.getId();
-                if (id.startsWith("router~")) {
-                    return Role.PROXY;
-                } else if (id.startsWith("sidecar~")) {
-                    return Role.SIDECAR;
-                }
-            }
-        }
-
-        return prev;
-    }
-
-    /**
-     * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to 
represent not found.
-     */
-    protected ServiceMetaInfo find(String ip) {
-        return serviceRegistry.findService(ip);
-    }
-
-    protected void forward(ServiceMeshMetric.Builder metric) {
-        TelemetryDataDispatcher.process(metric);
-    }
-
-    private long formatAsLong(Timestamp timestamp) {
-        return Instant.ofEpochSecond(timestamp.getSeconds(), 
timestamp.getNanos()).toEpochMilli();
-    }
-
-    private long formatAsLong(Duration duration) {
-        return Instant.ofEpochSecond(duration.getSeconds(), 
duration.getNanos()).toEpochMilli();
-    }
-}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalyzer.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalyzer.java
new file mode 100644
index 0000000..5d933d3
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalyzer.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.k8s;
+
+import io.envoyproxy.envoy.api.v2.core.Address;
+import io.envoyproxy.envoy.api.v2.core.SocketAddress;
+import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.common.v3.DetectPoint;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import 
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AbstractALSAnalyzer;
+import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+
+import static 
org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+import static 
org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.adaptToDownstreamMetrics;
+import static 
org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.adaptUpstreamMetrics;
+
+/**
+ * Analysis log based on ingress and mesh scenarios.
+ */
+@Slf4j
+public class K8sALSServiceMeshHTTPAnalyzer extends AbstractALSAnalyzer {
+    protected K8SServiceRegistry serviceRegistry;
+
+    @Override
+    public String name() {
+        return "k8s-mesh";
+    }
+
+    @Override
+    @SneakyThrows
+    public void init(EnvoyMetricReceiverConfig config) {
+        serviceRegistry = new K8SServiceRegistry(config);
+        serviceRegistry.start();
+    }
+
+    @Override
+    public List<ServiceMeshMetric.Builder> 
analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry 
entry, Role role) {
+        if (serviceRegistry.isEmpty()) {
+            return Collections.emptyList();
+        }
+        switch (role) {
+            case PROXY:
+                return analyzeProxy(entry);
+            case SIDECAR:
+                return analyzeSideCar(entry);
+        }
+
+        return Collections.emptyList();
+    }
+
+    protected List<ServiceMeshMetric.Builder> analyzeSideCar(final 
HTTPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        if (properties == null) {
+            return Collections.emptyList();
+        }
+        final String cluster = properties.getUpstreamCluster();
+        if (cluster == null) {
+            return Collections.emptyList();
+        }
+
+        final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
+
+        final Address downstreamRemoteAddress = 
properties.getDownstreamRemoteAddress();
+        final ServiceMetaInfo downstreamService = 
find(downstreamRemoteAddress.getSocketAddress().getAddress());
+        final Address downstreamLocalAddress = 
properties.getDownstreamLocalAddress();
+        final ServiceMetaInfo localService = 
find(downstreamLocalAddress.getSocketAddress().getAddress());
+
+        ServiceMeshMetric.Builder metric = null;
+        if (cluster.startsWith("inbound|")) {
+            // Server side
+            metric = adaptToDownstreamMetrics(entry)
+                .setDestServiceName(localService.getServiceName())
+                .setDestServiceInstance(localService.getServiceInstanceName())
+                .setDetectPoint(DetectPoint.server);
+            if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
+                // Ingress -> sidecar(server side)
+                // Mesh telemetry without source, the relation would be 
generated.
+
+                log.debug("Transformed ingress->sidecar inbound mesh metric 
{}", metric);
+            } else {
+                // sidecar -> sidecar(server side)
+                metric.setSourceServiceName(downstreamService.getServiceName())
+                      
.setSourceServiceInstance(downstreamService.getServiceInstanceName());
+
+                log.debug("Transformed sidecar->sidecar(server side) inbound 
mesh metric {}", metric);
+            }
+        } else if (cluster.startsWith("outbound|")) {
+            // sidecar(client side) -> sidecar
+            Address upstreamRemoteAddress = 
properties.getUpstreamRemoteAddress();
+            ServiceMetaInfo destService = 
find(upstreamRemoteAddress.getSocketAddress().getAddress());
+
+            metric = adaptToDownstreamMetrics(entry)
+                .setSourceServiceName(downstreamService.getServiceName())
+                
.setSourceServiceInstance(downstreamService.getServiceInstanceName())
+                .setDestServiceName(destService.getServiceName())
+                .setDestServiceInstance(destService.getServiceInstanceName())
+                .setDetectPoint(DetectPoint.client);
+
+            log.debug("Transformed sidecar->sidecar(server side) inbound mesh 
metric {}", metric);
+        }
+
+        Optional.ofNullable(metric).ifPresent(sources::add);
+
+        return sources;
+    }
+
+    protected List<ServiceMeshMetric.Builder> analyzeProxy(final 
HTTPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        if (properties == null) {
+            return Collections.emptyList();
+        }
+        final Address downstreamLocalAddress = 
properties.getDownstreamLocalAddress();
+        final Address downstreamRemoteAddress = 
properties.getDownstreamRemoteAddress();
+        final Address upstreamRemoteAddress = 
properties.getUpstreamRemoteAddress();
+        if (downstreamLocalAddress == null || downstreamRemoteAddress == null 
|| upstreamRemoteAddress == null) {
+            return Collections.emptyList();
+        }
+
+        List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
+        SocketAddress downstreamRemoteAddressSocketAddress = 
downstreamRemoteAddress.getSocketAddress();
+        ServiceMetaInfo outside = 
find(downstreamRemoteAddressSocketAddress.getAddress());
+
+        SocketAddress downstreamLocalAddressSocketAddress = 
downstreamLocalAddress.getSocketAddress();
+        ServiceMetaInfo ingress = 
find(downstreamLocalAddressSocketAddress.getAddress());
+
+        ServiceMeshMetric.Builder metric = adaptToDownstreamMetrics(entry)
+            .setSourceServiceName(outside.getServiceName())
+            .setSourceServiceInstance(outside.getServiceInstanceName())
+            .setDestServiceName(ingress.getServiceName())
+            .setDestServiceInstance(ingress.getServiceInstanceName())
+            .setDetectPoint(DetectPoint.server);
+
+        log.debug("Transformed ingress inbound mesh metric {}", metric);
+        result.add(metric);
+
+        SocketAddress upstreamRemoteAddressSocketAddress = 
upstreamRemoteAddress.getSocketAddress();
+        ServiceMetaInfo targetService = 
find(upstreamRemoteAddressSocketAddress.getAddress());
+
+        ServiceMeshMetric.Builder outboundMetric = adaptUpstreamMetrics(entry)
+            .setSourceServiceName(ingress.getServiceName())
+            .setSourceServiceInstance(ingress.getServiceInstanceName())
+            .setDestServiceName(targetService.getServiceName())
+            .setDestServiceInstance(targetService.getServiceInstanceName())
+            // Can't parse it from tls properties, leave it to Server side.
+            .setTlsMode(NON_TLS)
+            .setDetectPoint(DetectPoint.client);
+
+        log.debug("Transformed ingress outbound mesh metric {}", 
outboundMetric);
+        result.add(outboundMetric);
+
+        return result;
+    }
+
+    /**
+     * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to 
represent not found.
+     */
+    protected ServiceMetaInfo find(String ip) {
+        return serviceRegistry.findService(ip);
+    }
+}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
index 215d7a0..f330757 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
@@ -17,4 +17,4 @@
 #
 
 
-org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8sALSServiceMeshHTTPAnalysis
+org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8sALSServiceMeshHTTPAnalyzer
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java
index 7ad64a5..871dac9 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java
@@ -23,7 +23,6 @@ import 
io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.util.ArrayList;
 import java.util.List;
 import org.apache.skywalking.apm.network.common.v3.DetectPoint;
 import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
@@ -41,11 +40,11 @@ import static org.mockito.Mockito.when;
 
 public class K8sHTTPAnalysisTest {
 
-    private MockK8sAnalysis analysis;
+    private MockK8SAnalyzer analysis;
 
     @Before
     public void setUp() {
-        analysis = new MockK8sAnalysis();
+        analysis = new MockK8SAnalyzer();
         analysis.init(null);
     }
 
@@ -77,16 +76,16 @@ public class K8sHTTPAnalysisTest {
             StreamAccessLogsMessage.Builder requestBuilder = 
StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
 
-            analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
+            List<ServiceMeshMetric.Builder> result = 
this.analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
 
-            Assert.assertEquals(2, analysis.metrics.size());
+            Assert.assertEquals(2, result.size());
 
-            ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+            ServiceMeshMetric.Builder incoming = result.get(0);
             Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName());
             Assert.assertEquals("ingress", incoming.getDestServiceName());
             Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
 
-            ServiceMeshMetric.Builder outgoing = analysis.metrics.get(1);
+            ServiceMeshMetric.Builder outgoing = result.get(1);
             Assert.assertEquals("ingress", outgoing.getSourceServiceName());
             Assert.assertEquals("productpage", outgoing.getDestServiceName());
             Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint());
@@ -99,12 +98,11 @@ public class K8sHTTPAnalysisTest {
             StreamAccessLogsMessage.Builder requestBuilder = 
StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
 
-            analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs()
-                                                                            
.getLogEntry(0), Role.SIDECAR);
+            List<ServiceMeshMetric.Builder> result = 
this.analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
 
-            Assert.assertEquals(1, analysis.metrics.size());
+            Assert.assertEquals(1, result.size());
 
-            ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+            ServiceMeshMetric.Builder incoming = result.get(0);
             Assert.assertEquals("", incoming.getSourceServiceName());
             Assert.assertEquals("productpage", incoming.getDestServiceName());
             Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
@@ -117,12 +115,11 @@ public class K8sHTTPAnalysisTest {
             StreamAccessLogsMessage.Builder requestBuilder = 
StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
 
-            analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs()
-                                                                            
.getLogEntry(0), Role.SIDECAR);
+            List<ServiceMeshMetric.Builder> result = 
this.analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
 
-            Assert.assertEquals(1, analysis.metrics.size());
+            Assert.assertEquals(1, result.size());
 
-            ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+            ServiceMeshMetric.Builder incoming = result.get(0);
             Assert.assertEquals("productpage", 
incoming.getSourceServiceName());
             Assert.assertEquals("review", incoming.getDestServiceName());
             Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
@@ -135,20 +132,18 @@ public class K8sHTTPAnalysisTest {
             StreamAccessLogsMessage.Builder requestBuilder = 
StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
 
-            analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs()
-                                                                            
.getLogEntry(0), Role.SIDECAR);
+            List<ServiceMeshMetric.Builder> result = 
this.analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
 
-            Assert.assertEquals(1, analysis.metrics.size());
+            Assert.assertEquals(1, result.size());
 
-            ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+            ServiceMeshMetric.Builder incoming = result.get(0);
             Assert.assertEquals("productpage", 
incoming.getSourceServiceName());
             Assert.assertEquals("detail", incoming.getDestServiceName());
             Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint());
         }
     }
 
-    public static class MockK8sAnalysis extends K8sALSServiceMeshHTTPAnalysis {
-        private List<ServiceMeshMetric.Builder> metrics = new ArrayList<>();
+    public static class MockK8SAnalyzer extends K8sALSServiceMeshHTTPAnalyzer {
 
         @Override
         public void init(EnvoyMetricReceiverConfig config) {
@@ -160,10 +155,6 @@ public class K8sHTTPAnalysisTest {
             when(serviceRegistry.findService("10.44.2.55")).thenReturn(new 
ServiceMetaInfo("review", "detail-Inst"));
         }
 
-        @Override
-        protected void forward(ServiceMeshMetric.Builder metric) {
-            metrics.add(metric);
-        }
     }
 
     private static InputStream getResourceAsStream(final String resource) {

Reply via email to