This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 8da9bf9  ALS analyzer based on Envoy metadata exchange (#5800)
8da9bf9 is described below

commit 8da9bf94abb23e34a3c76168abacc8b59a7e5d70
Author: kezhenxu94 <kezhenx...@apache.org>
AuthorDate: Mon Nov 9 00:01:12 2020 +0800

    ALS analyzer based on Envoy metadata exchange (#5800)
---
 .github/workflows/docker-ci.yaml                   |   1 +
 .github/workflows/e2e.istio.yaml                   |   8 +-
 CHANGES.md                                         |   1 +
 LICENSE                                            |   1 +
 dist-material/release-docs/LICENSE                 |   2 +
 docker/oap/log4j2.xml                              |   2 +-
 docs/en/setup/envoy/als_setting.md                 |   9 +-
 oap-server/pom.xml                                 |   7 +
 .../main/resources/metadata-service-mapping.yaml}  |   7 +-
 .../envoy-metrics-receiver-plugin/pom.xml          |   5 +
 .../envoy/AccessLogServiceGRPCHandler.java         |  19 +-
 .../server/receiver/envoy/als/ALSHTTPAnalysis.java |   8 +-
 .../receiver/envoy/als/AbstractALSAnalyzer.java    |  64 +++++
 .../envoy/als/LogEntry2MetricsAdapter.java         | 163 +++++++++++
 .../server/receiver/envoy/als/ServiceMetaInfo.java |  30 +-
 .../als/k8s/K8sALSServiceMeshHTTPAnalysis.java     | 320 +++++----------------
 .../server/receiver/envoy/als/mx/FieldsHelper.java | 137 +++++++++
 .../envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java  | 125 ++++++++
 .../envoy/als/mx/ServiceMetaInfoAdapter.java       |  85 ++++++
 ...g.oap.server.receiver.envoy.als.ALSHTTPAnalysis |   1 +
 ...java => K8SALSServiceMeshHTTPAnalysisTest.java} |  52 ++--
 .../receiver/envoy/als/mx/FieldsHelperTest.java    |  95 ++++++
 .../src/test/resources/field-helper.msg            |  90 ++++++
 .../server-receiver-plugin/receiver-proto/pom.xml  | 120 +++++++-
 .../src/main/fbs/istio/node-info.fbs               |  51 ++++
 pom.xml                                            |   4 +
 .../skywalking/e2e/retryable/RetryableTest.java    |   2 +-
 .../org/apache/skywalking/e2e/mesh/IDManager.java  |  58 ----
 .../src/test/resources/expected/als/topo.yml       |   1 +
 .../known-oap-backend-dependencies-es7.txt         |   1 +
 .../known-oap-backend-dependencies.txt             |   1 +
 31 files changed, 1092 insertions(+), 378 deletions(-)

diff --git a/.github/workflows/docker-ci.yaml b/.github/workflows/docker-ci.yaml
index 49f1ffc..60834cd 100644
--- a/.github/workflows/docker-ci.yaml
+++ b/.github/workflows/docker-ci.yaml
@@ -31,6 +31,7 @@ jobs:
     runs-on: ubuntu-16.04
     timeout-minutes: 90
     strategy:
+      fail-fast: true
       matrix:
         es: [es6, es7]
     steps:
diff --git a/.github/workflows/e2e.istio.yaml b/.github/workflows/e2e.istio.yaml
index 0ff13c0..4d24cee 100644
--- a/.github/workflows/e2e.istio.yaml
+++ b/.github/workflows/e2e.istio.yaml
@@ -37,7 +37,11 @@ jobs:
   als:
     runs-on: ubuntu-16.04
     timeout-minutes: 60
-    name: Istio+Envoy Access Log Service
+    strategy:
+      fail-fast: true
+      matrix:
+        analyzer: [k8s-mesh, mx-mesh]
+    name: Istio+ALS(${{ matrix.analyzer }})
     steps:
       - uses: actions/checkout@v2
         with:
@@ -73,7 +77,7 @@ jobs:
                --set elasticsearch.replicas=1 \
                --set elasticsearch.minimumMasterNodes=1 \
                --set elasticsearch.imageTag=7.5.1 \
-               --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \
+               --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=${{ 
matrix.analyzer }} \
                --set oap.envoy.als.enabled=true \
                --set oap.replicas=1 \
                --set ui.image.repository=skywalking/ui \
diff --git a/CHANGES.md b/CHANGES.md
index 42eb2f8..46e4279 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -23,6 +23,7 @@ Release Notes.
 * Add the `@SuperDataset` annotation for BrowserErrorLog.
 * Add the thread pool to the Kafka fetcher to increase the performance.
 * Add `contain` and `not contain` OPS in OAL.
+* Add Envoy ALS analyzer based on metadata exchange.
 * Support keeping collecting the slowly segments in the sampling mechanism.
 * Support choose files to active the meter analyzer.
 * Improve Kubernetes service registry for ALS analysis.
diff --git a/LICENSE b/LICENSE
index b3f1f51..cb1efe5 100644
--- a/LICENSE
+++ b/LICENSE
@@ -221,6 +221,7 @@ The text of each license is the standard Apache 2.0 license.
    proto files from prometheus/client_model: 
https://github.com/prometheus/client_model Apache 2.0
    proto files from lyft/protoc-gen-validate: 
https://github.com/lyft/protoc-gen-validate Apache 2.0
    proto files from gogo/googleapis: https://github.com/gogo/googleapis Apache 
2.0
+   flatbuffers files from istio/proxy: https://github.com/istio/proxy Apache 
2.0
    mvnw files from https://github.com/takari/maven-wrapper Apache 2.0
    svg files from skywalking-ui/src/assets/icons: 
https://github.com/google/material-design-icons Apache 2.0
     
diff --git a/dist-material/release-docs/LICENSE 
b/dist-material/release-docs/LICENSE
index b5d3a52..c390f00 100755
--- a/dist-material/release-docs/LICENSE
+++ b/dist-material/release-docs/LICENSE
@@ -232,6 +232,7 @@ The text of each license is the standard Apache 2.0 license.
     Google: gson 2.8.6: https://github.com/google/gson , Apache 2.0
     Google: proto-google-common-protos 1.17.0: 
https://github.com/googleapis/googleapis , Apache 2.0
     Google: jsr305 3.0.2: 
http://central.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.pom
 , Apache 2.0
+    Google: flatbuffers-java 1.12.0: https://github.com/google/flatbuffers/ , 
Apache 2.0
     Elasticsearch BV (Elasticsearch) 6.3.2: 
https://www.elastic.co/products/elasticsearch , Apache 2.0
     Elasticsearch BV (Elasticsearch) 7.5.0: 
https://www.elastic.co/products/elasticsearch , Apache 2.0
     aggs-matrix-stats-client 6.3.2, 7.5.0: 
https://github.com/elastic/elasticsearch/tree/master/modules/aggs-matrix-stats 
Apache 2.0
@@ -319,6 +320,7 @@ The text of each license is the standard Apache 2.0 license.
     proto files from prometheus/client_model: 
https://github.com/prometheus/client_model Apache 2.0
     proto files from lyft/protoc-gen-validate: 
https://github.com/lyft/protoc-gen-validate Apache 2.0
     proto files from gogo/googleapis: https://github.com/gogo/googleapis 
Apache 2.0
+    flatbuffers files from istio/proxy: https://github.com/istio/proxy Apache 
2.0
     json-flatter 0.6.0: https://github.com/wnameless/json-flattener  Apache 2.0
     Apache: commons-text 1.4: https://github.com/apache/commons-text Apache 2.0
     sundrio 0.9.2: https://github.com/sundrio/sundrio Apache 2.0
diff --git a/docker/oap/log4j2.xml b/docker/oap/log4j2.xml
index fadfaa1..28994ea 100644
--- a/docker/oap/log4j2.xml
+++ b/docker/oap/log4j2.xml
@@ -29,7 +29,7 @@
         <logger name="org.elasticsearch.common.network.IfConfig" level="INFO"/>
         <logger name="io.grpc.netty" level="INFO"/>
         <logger 
name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
-        <logger 
name="org.apache.skywalking.oap.server.receiver.envoy.als.K8SServiceRegistry" 
level="DEBUG"/>
+        <logger name="org.apache.skywalking.oap.server.receiver.envoy.als" 
level="DEBUG"/>
         <Root level="INFO">
             <AppenderRef ref="Console"/>
         </Root>
diff --git a/docs/en/setup/envoy/als_setting.md 
b/docs/en/setup/envoy/als_setting.md
index 5c02dbd..7a5bb3e 100644
--- a/docs/en/setup/envoy/als_setting.md
+++ b/docs/en/setup/envoy/als_setting.md
@@ -19,7 +19,14 @@ You need three steps to open ALS.
     Note: SkyWalking OAP service is at skywalking namespace, and the port of 
gRPC service is 11800
     
 2. (Default is ACTIVATED) Activate SkyWalking [envoy 
receiver](../backend/backend-receivers.md). 
-3. Active ALS k8s-mesh analysis, set system env variable 
`SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS`=`k8s-mesh`
+3. Active ALS analyzer, there are two available analyzers, `k8s-mesh` and 
`mx-mesh`,
+`k8s-mesh` uses the metadata from Kubernetes cluster, hence in this analyzer 
OAP needs access roles to `Pod`, `Service`, and `Endpoints`;
+`mx-mesh` uses the Envoy metadata exchange mechanism to get the service name, 
etc.,
+this analyzer requires Istio to enable the metadata exchange filter(you can 
enable it by
+`--set telemetry.v2.enabled=true`, or if you're using Istio 1.7+ and 
installing it with profile `demo`/`preview`,
+it should be enabled then).
+Setting system env variable **SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS** to activate 
the analyzer,
+such as `SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh`.
 ```yaml
 envoy-metric:
   selector: ${SW_ENVOY_METRIC:default}
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 90185e9..319bc2c 100755
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -100,6 +100,7 @@
         <kafka-clients.version>2.4.1</kafka-clients.version>
         <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
         <commons-beanutils.version>1.9.4</commons-beanutils.version>
+        <flatbuffers-java.version>1.12.0</flatbuffers-java.version>
     </properties>
 
     <dependencies>
@@ -554,6 +555,12 @@
                     </exclusion>
                 </exclusions>
             </dependency>
+
+            <dependency>
+                <groupId>com.google.flatbuffers</groupId>
+                <artifactId>flatbuffers-java</artifactId>
+                <version>${flatbuffers-java.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
 b/oap-server/server-bootstrap/src/main/resources/metadata-service-mapping.yaml
similarity index 89%
copy from 
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
copy to 
oap-server/server-bootstrap/src/main/resources/metadata-service-mapping.yaml
index 215d7a0..3526f66 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
+++ 
b/oap-server/server-bootstrap/src/main/resources/metadata-service-mapping.yaml
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,8 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
-#
-
 
-org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8sALSServiceMeshHTTPAnalysis
+serviceName: ${LABELS.app}
+serviceInstanceName: ${NAME}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
index 5e2428f..5d46fd3 100644
--- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml
@@ -61,5 +61,10 @@
             <version>${org.apache.tomcat.annotations-api.version}</version>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.google.flatbuffers</groupId>
+            <artifactId>flatbuffers-java</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
index b715ee3..4b8b79d 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
@@ -25,10 +25,10 @@ import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.ServiceLoader;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.source.Source;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
 import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
@@ -42,18 +42,18 @@ import org.slf4j.LoggerFactory;
 public class AccessLogServiceGRPCHandler extends 
AccessLogServiceGrpc.AccessLogServiceImplBase {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class);
     private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList;
-    private final SourceReceiver sourceReceiver;
+
     private final CounterMetrics counter;
     private final HistogramMetrics histogram;
     private final CounterMetrics sourceDispatcherCounter;
 
-    public AccessLogServiceGRPCHandler(ModuleManager manager, 
EnvoyMetricReceiverConfig config) {
+    public AccessLogServiceGRPCHandler(ModuleManager manager, 
EnvoyMetricReceiverConfig config) throws ModuleStartException {
         ServiceLoader<ALSHTTPAnalysis> alshttpAnalyses = 
ServiceLoader.load(ALSHTTPAnalysis.class);
         envoyHTTPAnalysisList = new ArrayList<>();
         for (String httpAnalysisName : config.getAlsHTTPAnalysis()) {
             for (ALSHTTPAnalysis httpAnalysis : alshttpAnalyses) {
                 if (httpAnalysisName.equals(httpAnalysis.name())) {
-                    httpAnalysis.init(config);
+                    httpAnalysis.init(manager, config);
                     envoyHTTPAnalysisList.add(httpAnalysis);
                 }
             }
@@ -61,8 +61,6 @@ public class AccessLogServiceGRPCHandler extends 
AccessLogServiceGrpc.AccessLogS
 
         LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList);
 
-        sourceReceiver = 
manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
-
         MetricsCreator metricCreator = 
manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
         counter = metricCreator.createCounter("envoy_als_in_count", "The count 
of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
         histogram = 
metricCreator.createHistogramMetric("envoy_als_in_latency", "The process 
latency of service ALS metric receiver", MetricsTag.EMPTY_KEY, 
MetricsTag.EMPTY_VALUE);
@@ -103,7 +101,7 @@ public class AccessLogServiceGRPCHandler extends 
AccessLogServiceGrpc.AccessLogS
                         case HTTP_LOGS:
                             StreamAccessLogsMessage.HTTPAccessLogEntries logs 
= message.getHttpLogs();
 
-                            List<Source> sourceResult = new ArrayList<>();
+                            List<ServiceMeshMetric.Builder> sourceResult = new 
ArrayList<>();
                             for (ALSHTTPAnalysis analysis : 
envoyHTTPAnalysisList) {
                                 logs.getLogEntryList().forEach(log -> {
                                     
sourceResult.addAll(analysis.analysis(identifier, log, role));
@@ -111,7 +109,8 @@ public class AccessLogServiceGRPCHandler extends 
AccessLogServiceGrpc.AccessLogS
                             }
 
                             sourceDispatcherCounter.inc(sourceResult.size());
-                            sourceResult.forEach(sourceReceiver::receive);
+                            
sourceResult.forEach(TelemetryDataDispatcher::process);
+                            break;
                     }
                 } finally {
                     timer.finish();
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
index 4297f3d..ae75c32 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java
@@ -21,7 +21,9 @@ package org.apache.skywalking.oap.server.receiver.envoy.als;
 import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
 import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
 import java.util.List;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import 
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
 
 /**
@@ -30,9 +32,9 @@ import 
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig
 public interface ALSHTTPAnalysis {
     String name();
 
-    void init(EnvoyMetricReceiverConfig config);
+    void init(ModuleManager manager, EnvoyMetricReceiverConfig config) throws 
ModuleStartException;
 
-    List<Source> analysis(StreamAccessLogsMessage.Identifier identifier, 
HTTPAccessLogEntry entry, Role role);
+    List<ServiceMeshMetric.Builder> 
analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry 
entry, Role role);
 
     Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev);
 }
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
new file mode 100644
index 0000000..fae8cd1
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import io.envoyproxy.envoy.api.v2.core.Node;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+
+@Slf4j
+public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis {
+
+    @Override
+    public Role identify(final StreamAccessLogsMessage.Identifier 
alsIdentifier, final Role defaultRole) {
+        if (alsIdentifier == null) {
+            return defaultRole;
+        }
+        final Node node = alsIdentifier.getNode();
+        if (node == null) {
+            return defaultRole;
+        }
+        final String id = node.getId();
+        if (id.startsWith("router~")) {
+            return Role.PROXY;
+        } else if (id.startsWith("sidecar~")) {
+            return Role.SIDECAR;
+        }
+        return defaultRole;
+    }
+
+    /**
+     * Create an adapter to adapt the {@link HTTPAccessLogEntry log entry} 
into a {@link ServiceMeshMetric.Builder}.
+     *
+     * @param entry         the access log entry that is to be adapted from.
+     * @param sourceService the source service.
+     * @param targetService the target/destination service.
+     * @return an adapter that adapts {@link HTTPAccessLogEntry log entry} 
into a {@link ServiceMeshMetric.Builder}.
+     */
+    protected LogEntry2MetricsAdapter newAdapter(
+        final HTTPAccessLogEntry entry,
+        final ServiceMetaInfo sourceService,
+        final ServiceMetaInfo targetService
+    ) {
+        return new LogEntry2MetricsAdapter(entry, sourceService, 
targetService);
+    }
+
+}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
new file mode 100644
index 0000000..de05c7a
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import com.google.protobuf.Duration;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.UInt32Value;
+import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties;
+import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties;
+import java.time.Instant;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.apm.network.common.v3.DetectPoint;
+import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Optional.ofNullable;
+
+/**
+ * Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} 
builders.
+ */
+@RequiredArgsConstructor
+public class LogEntry2MetricsAdapter {
+
+    public static final String NON_TLS = "NONE";
+
+    public static final String M_TLS = "mTLS";
+
+    public static final String TLS = "TLS";
+
+    /**
+     * The access log entry that is to be adapted into metrics builders.
+     */
+    private final HTTPAccessLogEntry entry;
+
+    private final ServiceMetaInfo sourceService;
+
+    private final ServiceMetaInfo targetService;
+
+    /**
+     * Adapt the {@code entry} into a downstream metrics {@link 
ServiceMeshMetric.Builder}.
+     *
+     * @return the {@link ServiceMeshMetric.Builder} adapted from the given 
entry.
+     */
+    public ServiceMeshMetric.Builder adaptToDownstreamMetrics() {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final long startTime = formatAsLong(properties.getStartTime());
+        final long duration = 
formatAsLong(properties.getTimeToLastDownstreamTxByte());
+
+        return adaptCommonPart()
+            .setStartTime(startTime)
+            .setEndTime(startTime + duration)
+            .setLatency((int) Math.max(1L, duration))
+            .setDetectPoint(DetectPoint.server);
+    }
+
+    /**
+     * Adapt the {@code entry} into a upstream metrics {@link 
ServiceMeshMetric.Builder}.
+     *
+     * @return the {@link ServiceMeshMetric.Builder} adapted from the given 
entry.
+     */
+    public ServiceMeshMetric.Builder adaptToUpstreamMetrics() {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final long startTime = formatAsLong(properties.getStartTime());
+        final long outboundStartTime = startTime + 
formatAsLong(properties.getTimeToFirstUpstreamTxByte());
+        final long outboundEndTime = startTime + 
formatAsLong(properties.getTimeToLastUpstreamRxByte());
+
+        return adaptCommonPart()
+            .setStartTime(outboundStartTime)
+            .setEndTime(outboundEndTime)
+            .setLatency((int) Math.max(1L, outboundEndTime - 
outboundStartTime))
+            .setDetectPoint(DetectPoint.client);
+    }
+
+    protected ServiceMeshMetric.Builder adaptCommonPart() {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        final String endpoint = 
ofNullable(entry.getRequest()).map(HTTPRequestProperties::getPath).orElse("/");
+        final int responseCode = 
ofNullable(entry.getResponse()).map(HTTPResponseProperties::getResponseCode).map(UInt32Value::getValue).orElse(200);
+        final boolean status = responseCode >= 200 && responseCode < 400;
+        final Protocol protocol = requestProtocol(entry.getRequest());
+        final String tlsMode = parseTLS(properties.getTlsProperties());
+
+        final ServiceMeshMetric.Builder builder =
+            ServiceMeshMetric.newBuilder()
+                             .setEndpoint(endpoint)
+                             .setResponseCode(Math.toIntExact(responseCode))
+                             .setStatus(status)
+                             .setProtocol(protocol)
+                             .setTlsMode(tlsMode);
+
+        Optional.ofNullable(sourceService)
+                .map(ServiceMetaInfo::getServiceName)
+                .ifPresent(builder::setSourceServiceName);
+        Optional.ofNullable(sourceService)
+                .map(ServiceMetaInfo::getServiceInstanceName)
+                .ifPresent(builder::setSourceServiceInstance);
+        Optional.ofNullable(targetService)
+                .map(ServiceMetaInfo::getServiceName)
+                .ifPresent(builder::setDestServiceName);
+        Optional.ofNullable(targetService)
+                .map(ServiceMetaInfo::getServiceInstanceName)
+                .ifPresent(builder::setDestServiceInstance);
+
+        return builder;
+    }
+
+    protected static long formatAsLong(final Timestamp timestamp) {
+        return Instant.ofEpochSecond(timestamp.getSeconds(), 
timestamp.getNanos()).toEpochMilli();
+    }
+
+    protected static long formatAsLong(final Duration duration) {
+        return Instant.ofEpochSecond(duration.getSeconds(), 
duration.getNanos()).toEpochMilli();
+    }
+
+    protected static Protocol requestProtocol(final HTTPRequestProperties 
request) {
+        if (request == null) {
+            return Protocol.HTTP;
+        }
+        final String scheme = request.getScheme();
+        if (scheme.startsWith("http")) {
+            return Protocol.HTTP;
+        }
+        return Protocol.gRPC;
+    }
+
+    protected static String parseTLS(final TLSProperties properties) {
+        if (properties == null) {
+            return NON_TLS;
+        }
+        if 
(isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties())
+                                  
.orElse(TLSProperties.CertificateProperties.newBuilder().build())
+                                  .getSubject())) {
+            return NON_TLS;
+        }
+        if 
(isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties())
+                                  
.orElse(TLSProperties.CertificateProperties.newBuilder().build())
+                                  .getSubject())) {
+            return TLS;
+        }
+        return M_TLS;
+    }
+
+}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
index 41ba0e5..75a9c3c 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
@@ -19,8 +19,9 @@
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
 import java.util.List;
-import java.util.Objects;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
@@ -28,13 +29,16 @@ import lombok.ToString;
 @Getter
 @Setter
 @ToString
+@NoArgsConstructor
+@EqualsAndHashCode(onlyExplicitlyIncluded = true)
 public class ServiceMetaInfo {
+    @EqualsAndHashCode.Include
     private String serviceName;
+
+    @EqualsAndHashCode.Include
     private String serviceInstanceName;
-    private List<KeyValue> tags;
 
-    public ServiceMetaInfo() {
-    }
+    private List<KeyValue> tags;
 
     public ServiceMetaInfo(String serviceName, String serviceInstanceName) {
         this.serviceName = serviceName;
@@ -43,26 +47,12 @@ public class ServiceMetaInfo {
 
     @Setter
     @Getter
-    @RequiredArgsConstructor
     @ToString
+    @RequiredArgsConstructor
     public static class KeyValue {
         private final String key;
-        private final String value;
-    }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-        ServiceMetaInfo info = (ServiceMetaInfo) o;
-        return Objects.equals(serviceName, info.serviceName) && 
Objects.equals(serviceInstanceName, info.serviceInstanceName);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(serviceName, serviceInstanceName);
+        private final String value;
     }
 
     public static final ServiceMetaInfo UNKNOWN = new 
ServiceMetaInfo("UNKNOWN", "UNKNOWN");
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
index f06d941..0e2e068 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
@@ -18,46 +18,30 @@
 
 package org.apache.skywalking.oap.server.receiver.envoy.als.k8s;
 
-import com.google.common.base.Strings;
-import com.google.protobuf.Duration;
-import com.google.protobuf.Timestamp;
 import io.envoyproxy.envoy.api.v2.core.Address;
-import io.envoyproxy.envoy.api.v2.core.Node;
 import io.envoyproxy.envoy.api.v2.core.SocketAddress;
 import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
 import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
-import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties;
-import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties;
-import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties;
 import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
-import org.apache.skywalking.apm.network.common.v3.DetectPoint;
-import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
 import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
-import org.apache.skywalking.oap.server.core.source.Source;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import 
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AbstractALSAnalyzer;
 import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
 import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
 
+import static 
org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+
 /**
  * Analysis log based on ingress and mesh scenarios.
  */
 @Slf4j
-public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
-    private static final String NON_TLS = "NONE";
-
-    private static final String M_TLS = "mTLS";
-
-    private static final String TLS = "TLS";
-
+public class K8sALSServiceMeshHTTPAnalysis extends AbstractALSAnalyzer {
     protected K8SServiceRegistry serviceRegistry;
 
     @Override
@@ -67,250 +51,113 @@ public class K8sALSServiceMeshHTTPAnalysis implements 
ALSHTTPAnalysis {
 
     @Override
     @SneakyThrows
-    public void init(EnvoyMetricReceiverConfig config) {
+    public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) {
         serviceRegistry = new K8SServiceRegistry(config);
         serviceRegistry.start();
     }
 
     @Override
-    public List<Source> analysis(StreamAccessLogsMessage.Identifier 
identifier, HTTPAccessLogEntry entry, Role role) {
+    public List<ServiceMeshMetric.Builder> 
analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry 
entry, Role role) {
         if (serviceRegistry.isEmpty()) {
             return Collections.emptyList();
         }
         switch (role) {
             case PROXY:
-                analysisProxy(identifier, entry);
-                break;
+                return analyzeProxy(entry);
             case SIDECAR:
-                return analysisSideCar(identifier, entry);
+                return analyzeSideCar(entry);
         }
 
         return Collections.emptyList();
     }
 
-    protected List<Source> analysisSideCar(StreamAccessLogsMessage.Identifier 
identifier, HTTPAccessLogEntry entry) {
-        List<Source> sources = new ArrayList<>();
-        AccessLogCommon properties = entry.getCommonProperties();
-        if (properties != null) {
-            String cluster = properties.getUpstreamCluster();
-            if (cluster != null) {
-                long startTime = formatAsLong(properties.getStartTime());
-                long duration = 
formatAsLong(properties.getTimeToLastDownstreamTxByte());
-
-                HTTPRequestProperties request = entry.getRequest();
-                String endpoint = "/";
-                Protocol protocol = Protocol.HTTP;
-                if (request != null) {
-                    endpoint = request.getPath();
-                    String schema = request.getScheme();
-                    if ("http".equals(schema) || "https".equals(schema)) {
-                        protocol = Protocol.HTTP;
-                    } else {
-                        protocol = Protocol.gRPC;
-                    }
-                }
-                HTTPResponseProperties response = entry.getResponse();
-                int responseCode = 200;
-                if (response != null) {
-                    responseCode = response.getResponseCode().getValue();
-                }
-                boolean status = responseCode >= 200 && responseCode < 400;
-
-                Address downstreamRemoteAddress = 
properties.getDownstreamRemoteAddress();
-                ServiceMetaInfo downstreamService = 
find(downstreamRemoteAddress.getSocketAddress().getAddress());
-                Address downstreamLocalAddress = 
properties.getDownstreamLocalAddress();
-                ServiceMetaInfo localService = 
find(downstreamLocalAddress.getSocketAddress().getAddress());
-                String tlsMode = parseTLS(properties.getTlsProperties());
-
-                ServiceMeshMetric.Builder metric = null;
-                if (cluster.startsWith("inbound|")) {
-                    // Server side
-                    if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
-                        // Ingress -> sidecar(server side)
-                        // Mesh telemetry without source, the relation would 
be generated.
-                        metric = ServiceMeshMetric.newBuilder()
-                                                  .setStartTime(startTime)
-                                                  .setEndTime(startTime + 
duration)
-                                                  
.setDestServiceName(localService.getServiceName())
-                                                  
.setDestServiceInstance(localService.getServiceInstanceName())
-                                                  .setEndpoint(endpoint)
-                                                  .setLatency((int) duration)
-                                                  
.setResponseCode(Math.toIntExact(responseCode))
-                                                  .setStatus(status)
-                                                  .setProtocol(protocol)
-                                                  .setTlsMode(tlsMode)
-                                                  
.setDetectPoint(DetectPoint.server);
-
-                        log.debug("Transformed ingress->sidecar inbound mesh 
metric {}", metric);
-                    } else {
-                        // sidecar -> sidecar(server side)
-                        metric = ServiceMeshMetric.newBuilder()
-                                                  .setStartTime(startTime)
-                                                  .setEndTime(startTime + 
duration)
-                                                  
.setSourceServiceName(downstreamService.getServiceName())
-                                                  
.setSourceServiceInstance(downstreamService.getServiceInstanceName())
-                                                  
.setDestServiceName(localService.getServiceName())
-                                                  
.setDestServiceInstance(localService.getServiceInstanceName())
-                                                  .setEndpoint(endpoint)
-                                                  .setLatency((int) duration)
-                                                  
.setResponseCode(Math.toIntExact(responseCode))
-                                                  .setStatus(status)
-                                                  .setProtocol(protocol)
-                                                  .setTlsMode(tlsMode)
-                                                  
.setDetectPoint(DetectPoint.server);
-
-                        log.debug("Transformed sidecar->sidecar(server side) 
inbound mesh metric {}", metric);
-                    }
-                } else if (cluster.startsWith("outbound|")) {
-                    // sidecar(client side) -> sidecar
-                    Address upstreamRemoteAddress = 
properties.getUpstreamRemoteAddress();
-                    ServiceMetaInfo destService = 
find(upstreamRemoteAddress.getSocketAddress().getAddress());
+    protected List<ServiceMeshMetric.Builder> analyzeSideCar(final 
HTTPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        if (properties == null) {
+            return Collections.emptyList();
+        }
+        final String cluster = properties.getUpstreamCluster();
+        if (cluster == null) {
+            return Collections.emptyList();
+        }
 
-                    metric = ServiceMeshMetric.newBuilder()
-                                              .setStartTime(startTime)
-                                              .setEndTime(startTime + duration)
-                                              
.setSourceServiceName(downstreamService.getServiceName())
-                                              
.setSourceServiceInstance(downstreamService.getServiceInstanceName())
-                                              
.setDestServiceName(destService.getServiceName())
-                                              
.setDestServiceInstance(destService.getServiceInstanceName())
-                                              .setEndpoint(endpoint)
-                                              .setLatency((int) duration)
-                                              
.setResponseCode(Math.toIntExact(responseCode))
-                                              .setStatus(status)
-                                              .setProtocol(protocol)
-                                              .setTlsMode(tlsMode)
-                                              
.setDetectPoint(DetectPoint.client);
+        final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
+
+        final Address downstreamRemoteAddress =
+            properties.hasDownstreamDirectRemoteAddress()
+                ? properties.getDownstreamDirectRemoteAddress()
+                : properties.getDownstreamRemoteAddress();
+        final ServiceMetaInfo downstreamService = 
find(downstreamRemoteAddress.getSocketAddress().getAddress());
+        final Address downstreamLocalAddress = 
properties.getDownstreamLocalAddress();
+        final ServiceMetaInfo localService = 
find(downstreamLocalAddress.getSocketAddress().getAddress());
+
+        if (cluster.startsWith("inbound|")) {
+            // Server side
+            final ServiceMeshMetric.Builder metrics;
+            if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
+                // Ingress -> sidecar(server side)
+                // Mesh telemetry without source, the relation would be 
generated.
+                metrics = newAdapter(entry, null, 
localService).adaptToDownstreamMetrics();
+
+                log.debug("Transformed ingress->sidecar inbound mesh metrics 
{}", metrics);
+            } else {
+                // sidecar -> sidecar(server side)
+                metrics = newAdapter(entry, downstreamService, 
localService).adaptToDownstreamMetrics();
+
+                log.debug("Transformed sidecar->sidecar(server side) inbound 
mesh metrics {}", metrics);
+            }
+            sources.add(metrics);
+        } else if (cluster.startsWith("outbound|")) {
+            // sidecar(client side) -> sidecar
+            final Address upstreamRemoteAddress = 
properties.getUpstreamRemoteAddress();
+            final ServiceMetaInfo destService = 
find(upstreamRemoteAddress.getSocketAddress().getAddress());
 
-                    log.debug("Transformed sidecar->sidecar(server side) 
inbound mesh metric {}", metric);
-                }
+            final ServiceMeshMetric.Builder metric = newAdapter(entry, 
downstreamService, destService).adaptToUpstreamMetrics();
 
-                Optional.ofNullable(metric).ifPresent(this::forward);
-            }
+            log.debug("Transformed sidecar->sidecar(server side) inbound mesh 
metric {}", metric);
+            sources.add(metric);
         }
+
         return sources;
     }
 
-    private String parseTLS(TLSProperties properties) {
+    protected List<ServiceMeshMetric.Builder> analyzeProxy(final 
HTTPAccessLogEntry entry) {
+        final AccessLogCommon properties = entry.getCommonProperties();
         if (properties == null) {
-            return NON_TLS;
-        }
-        if 
(Strings.isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties())
-                                          
.orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject()))
 {
-            return NON_TLS;
+            return Collections.emptyList();
         }
-        if 
(Strings.isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties())
-                                          
.orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject()))
 {
-            return TLS;
+        final Address downstreamLocalAddress = 
properties.getDownstreamLocalAddress();
+        final Address downstreamRemoteAddress = 
properties.getDownstreamRemoteAddress();
+        final Address upstreamRemoteAddress = 
properties.getUpstreamRemoteAddress();
+        if (downstreamLocalAddress == null || downstreamRemoteAddress == null 
|| upstreamRemoteAddress == null) {
+            return Collections.emptyList();
         }
-        return M_TLS;
-    }
-
-    protected void analysisProxy(StreamAccessLogsMessage.Identifier 
identifier, HTTPAccessLogEntry entry) {
-        AccessLogCommon properties = entry.getCommonProperties();
-        if (properties != null) {
-            Address downstreamLocalAddress = 
properties.getDownstreamLocalAddress();
-            Address downstreamRemoteAddress = 
properties.getDownstreamRemoteAddress();
-            Address upstreamRemoteAddress = 
properties.getUpstreamRemoteAddress();
-            if (downstreamLocalAddress != null && downstreamRemoteAddress != 
null && upstreamRemoteAddress != null) {
-                SocketAddress downstreamRemoteAddressSocketAddress = 
downstreamRemoteAddress.getSocketAddress();
-                ServiceMetaInfo outside = 
find(downstreamRemoteAddressSocketAddress.getAddress());
-
-                SocketAddress downstreamLocalAddressSocketAddress = 
downstreamLocalAddress.getSocketAddress();
-                ServiceMetaInfo ingress = 
find(downstreamLocalAddressSocketAddress.getAddress());
 
-                long startTime = formatAsLong(properties.getStartTime());
-                long duration = 
formatAsLong(properties.getTimeToLastDownstreamTxByte());
+        final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
+        final SocketAddress downstreamRemoteAddressSocketAddress = 
downstreamRemoteAddress.getSocketAddress();
+        final ServiceMetaInfo outside = 
find(downstreamRemoteAddressSocketAddress.getAddress());
 
-                HTTPRequestProperties request = entry.getRequest();
-                String endpoint = "/";
-                Protocol protocol = Protocol.HTTP;
-                if (request != null) {
-                    endpoint = request.getPath();
-                    String schema = request.getScheme();
-                    if ("http".equals(schema) || "https".equals(schema)) {
-                        protocol = Protocol.HTTP;
-                    } else {
-                        protocol = Protocol.gRPC;
-                    }
-                }
-                HTTPResponseProperties response = entry.getResponse();
-                int responseCode = 200;
-                if (response != null) {
-                    responseCode = response.getResponseCode().getValue();
-                }
-                boolean status = responseCode >= 200 && responseCode < 400;
-                String tlsMode = parseTLS(properties.getTlsProperties());
+        final SocketAddress downstreamLocalAddressSocketAddress = 
downstreamLocalAddress.getSocketAddress();
+        final ServiceMetaInfo ingress = 
find(downstreamLocalAddressSocketAddress.getAddress());
 
-                ServiceMeshMetric.Builder metric = 
ServiceMeshMetric.newBuilder()
-                                                                    
.setStartTime(startTime)
-                                                                    
.setEndTime(startTime + duration)
-                                                                    
.setSourceServiceName(outside.getServiceName())
-                                                                    
.setSourceServiceInstance(
-                                                                        
outside.getServiceInstanceName())
-                                                                    
.setDestServiceName(ingress.getServiceName())
-                                                                    
.setDestServiceInstance(
-                                                                        
ingress.getServiceInstanceName())
-                                                                    
.setEndpoint(endpoint)
-                                                                    
.setLatency((int) duration)
-                                                                    
.setResponseCode(Math.toIntExact(responseCode))
-                                                                    
.setStatus(status)
-                                                                    
.setProtocol(protocol)
-                                                                    
.setTlsMode(tlsMode)
-                                                                    
.setDetectPoint(DetectPoint.server);
+        final ServiceMeshMetric.Builder metric = newAdapter(entry, outside, 
ingress).adaptToDownstreamMetrics();
 
-                log.debug("Transformed ingress inbound mesh metric {}", 
metric);
-                forward(metric);
+        log.debug("Transformed ingress inbound mesh metric {}", metric);
+        result.add(metric);
 
-                SocketAddress upstreamRemoteAddressSocketAddress = 
upstreamRemoteAddress.getSocketAddress();
-                ServiceMetaInfo targetService = 
find(upstreamRemoteAddressSocketAddress.getAddress());
+        final SocketAddress upstreamRemoteAddressSocketAddress = 
upstreamRemoteAddress.getSocketAddress();
+        final ServiceMetaInfo targetService = 
find(upstreamRemoteAddressSocketAddress.getAddress());
 
-                long outboundStartTime = startTime + 
formatAsLong(properties.getTimeToFirstUpstreamTxByte());
-                long outboundEndTime = startTime + 
formatAsLong(properties.getTimeToLastUpstreamRxByte());
-
-                ServiceMeshMetric.Builder outboundMetric = 
ServiceMeshMetric.newBuilder()
-                                                                            
.setStartTime(outboundStartTime)
-                                                                            
.setEndTime(outboundEndTime)
-                                                                            
.setSourceServiceName(
-                                                                               
 ingress.getServiceName())
-                                                                            
.setSourceServiceInstance(
-                                                                               
 ingress.getServiceInstanceName())
-                                                                            
.setDestServiceName(
-                                                                               
 targetService.getServiceName())
-                                                                            
.setDestServiceInstance(
-                                                                               
 targetService.getServiceInstanceName())
-                                                                            
.setEndpoint(endpoint)
-                                                                            
.setLatency(
-                                                                               
 (int) (outboundEndTime - outboundStartTime))
-                                                                            
.setResponseCode(
-                                                                               
 Math.toIntExact(responseCode))
-                                                                            
.setStatus(status)
-                                                                            
.setProtocol(protocol)
-                                                                            // 
Can't parse it from tls properties, leave
-                                                                            // 
it to Server side.
-                                                                            
.setTlsMode(NON_TLS)
-                                                                            
.setDetectPoint(DetectPoint.client);
-
-                log.debug("Transformed ingress outbound mesh metric {}", 
outboundMetric);
-                forward(outboundMetric);
-            }
-        }
-    }
+        final ServiceMeshMetric.Builder outboundMetric =
+            newAdapter(entry, ingress, targetService)
+                .adaptToUpstreamMetrics()
+                // Can't parse it from tls properties, leave it to Server side.
+                .setTlsMode(NON_TLS);
 
-    @Override
-    public Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, 
Role prev) {
-        if (alsIdentifier != null) {
-            Node node = alsIdentifier.getNode();
-            if (node != null) {
-                String id = node.getId();
-                if (id.startsWith("router~")) {
-                    return Role.PROXY;
-                } else if (id.startsWith("sidecar~")) {
-                    return Role.SIDECAR;
-                }
-            }
-        }
+        log.debug("Transformed ingress outbound mesh metric {}", 
outboundMetric);
+        result.add(outboundMetric);
 
-        return prev;
+        return result;
     }
 
     /**
@@ -320,15 +167,4 @@ public class K8sALSServiceMeshHTTPAnalysis implements 
ALSHTTPAnalysis {
         return serviceRegistry.findService(ip);
     }
 
-    protected void forward(ServiceMeshMetric.Builder metric) {
-        TelemetryDataDispatcher.process(metric);
-    }
-
-    private long formatAsLong(Timestamp timestamp) {
-        return Instant.ofEpochSecond(timestamp.getSeconds(), 
timestamp.getNanos()).toEpochMilli();
-    }
-
-    private long formatAsLong(Duration duration) {
-        return Instant.ofEpochSecond(duration.getSeconds(), 
duration.getNanos()).toEpochMilli();
-    }
 }
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java
new file mode 100644
index 0000000..1ef5c45
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.mx;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.reflect.Invokable;
+import com.google.common.reflect.TypeToken;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Value;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.ResourceUtils;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.yaml.snakeyaml.Yaml;
+
+@Slf4j
+@SuppressWarnings("UnstableApiUsage")
+enum FieldsHelper {
+    SINGLETON;
+
+    private boolean initialized = false;
+
+    /**
+     * The mappings from the field name of {@link ServiceMetaInfo} to the 
field name of {@code flatbuffers}.
+     */
+    private Map<String, ServiceNameFormat> fieldNameMapping;
+
+    /**
+     * The mappings from the field name of {@link ServiceMetaInfo} to its 
{@code setter}.
+     */
+    private Map<String, Invokable<ServiceMetaInfo, ?>> fieldSetterMapping;
+
+    public void init(final String file) throws Exception {
+        init(ResourceUtils.readToStream(file));
+    }
+
+    @SuppressWarnings("unchecked")
+    void init(final InputStream inputStream) throws ModuleStartException {
+        if (initialized) {
+            return;
+        }
+        final Yaml yaml = new Yaml();
+        final Map<String, String> config = (Map<String, String>) 
yaml.load(inputStream);
+
+        fieldNameMapping = new HashMap<>(config.size());
+        fieldSetterMapping = new HashMap<>(config.size());
+
+        for (final Map.Entry<String, String> entry : config.entrySet()) {
+            final String serviceMetaInfoFieldName = entry.getKey();
+            final String flatBuffersFieldName = entry.getValue();
+
+            final Pattern p = Pattern.compile("(\\$\\{(?<property>.+?)})");
+            final Matcher m = p.matcher(flatBuffersFieldName);
+            final List<List<String>> flatBuffersFieldNames = new 
ArrayList<>(m.groupCount());
+            final StringBuffer serviceNamePattern = new StringBuffer();
+            while (m.find()) {
+                final String property = m.group("property");
+                
flatBuffersFieldNames.add(Splitter.on('.').omitEmptyStrings().splitToList(property));
+                m.appendReplacement(serviceNamePattern, "%s");
+            }
+
+            fieldNameMapping.put(
+                serviceMetaInfoFieldName,
+                new ServiceNameFormat(serviceNamePattern.toString(), 
flatBuffersFieldNames)
+            );
+
+            try {
+                final Method setterMethod = 
ServiceMetaInfo.class.getMethod("set" + 
StringUtils.capitalize(serviceMetaInfoFieldName), String.class);
+                final Invokable<ServiceMetaInfo, ?> setter = new 
TypeToken<ServiceMetaInfo>() {
+                }.method(setterMethod);
+                setter.setAccessible(true);
+                fieldSetterMapping.put(serviceMetaInfoFieldName, setter);
+            } catch (final NoSuchMethodException e) {
+                throw new ModuleStartException("Initialize method error", e);
+            }
+        }
+        initialized = true;
+    }
+
+    /**
+     * Inflates the {@code serviceMetaInfo} with the given {@link Struct 
struct}.
+     *
+     * @param metadata        the {@link Struct} metadata from where to 
retrieve and inflate the {@code serviceMetaInfo}.
+     * @param serviceMetaInfo the {@code serviceMetaInfo} to be inflated.
+     * @throws Exception if failed to inflate the {@code serviceMetaInfo}
+     */
+    public void inflate(final Struct metadata, final ServiceMetaInfo 
serviceMetaInfo) throws Exception {
+        final Value root = Value.newBuilder().setStructValue(metadata).build();
+        for (final Map.Entry<String, ServiceNameFormat> entry : 
fieldNameMapping.entrySet()) {
+            final ServiceNameFormat serviceNameFormat = entry.getValue();
+            final Object[] values = new 
String[serviceNameFormat.properties.size()];
+            for (int i = 0; i < serviceNameFormat.properties.size(); i++) {
+                final List<String> properties = 
serviceNameFormat.properties.get(i);
+                Value value = root;
+                for (final String property : properties) {
+                    value = value.getStructValue().getFieldsOrThrow(property);
+                }
+                values[i] = value.getStringValue();
+            }
+            fieldSetterMapping.get(entry.getKey()).invoke(serviceMetaInfo, 
Strings.lenientFormat(serviceNameFormat.format, values));
+        }
+    }
+
+    @RequiredArgsConstructor
+    private static class ServiceNameFormat {
+        private final String format;
+
+        private final List<List<String>> properties;
+    }
+}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
new file mode 100644
index 0000000..796c23e
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.mx;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.TextFormat;
+import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon;
+import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import 
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AbstractALSAnalyzer;
+import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+
+import static 
org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS;
+import static 
org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo.UNKNOWN;
+
+@Slf4j
+public class MetaExchangeALSHTTPAnalyzer extends AbstractALSAnalyzer {
+
+    private static final String UPSTREAM_KEY = "wasm.upstream_peer";
+
+    private static final String DOWNSTREAM_KEY = "wasm.downstream_peer";
+
+    @Override
+    public String name() {
+        return "mx-mesh";
+    }
+
+    @Override
+    public void init(ModuleManager manager, EnvoyMetricReceiverConfig config) 
throws ModuleStartException {
+        try {
+            FieldsHelper.SINGLETON.init("metadata-service-mapping.yaml");
+        } catch (final Exception e) {
+            throw new ModuleStartException("Failed to load 
metadata-service-mapping.yaml", e);
+        }
+    }
+
+    @Override
+    public List<ServiceMeshMetric.Builder> 
analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry 
entry, Role role) {
+        final AccessLogCommon properties = entry.getCommonProperties();
+        if (properties == null) {
+            return Collections.emptyList();
+        }
+        final Map<String, Any> stateMap = 
properties.getFilterStateObjectsMap();
+        if (stateMap == null) {
+            return Collections.emptyList();
+        }
+        final ServiceMetaInfo currSvc;
+        try {
+            currSvc = new 
ServiceMetaInfoAdapter(identifier.getNode().getMetadata());
+        } catch (Exception e) {
+            log.error("Failed to inflate the ServiceMetaInfo from 
identifier.node.metadata. ", e);
+            return Collections.emptyList();
+        }
+
+        final List<ServiceMeshMetric.Builder> result = new ArrayList<>();
+        final AtomicBoolean downstreamExists = new AtomicBoolean();
+        stateMap.forEach((key, value) -> {
+            if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
+                return;
+            }
+            final ServiceMetaInfo svc;
+            try {
+                svc = new ServiceMetaInfoAdapter(value);
+            } catch (Exception e) {
+                log.error("Fail to parse metadata {} to FlatNode", 
Base64.getEncoder().encode(value.toByteArray()));
+                return;
+            }
+            final ServiceMeshMetric.Builder metrics;
+            switch (key) {
+                case UPSTREAM_KEY:
+                    metrics = newAdapter(entry, currSvc, 
svc).adaptToUpstreamMetrics().setTlsMode(NON_TLS);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Transformed a {} outbound mesh metrics {}", 
role, TextFormat.shortDebugString(metrics));
+                    }
+                    result.add(metrics);
+                    break;
+                case DOWNSTREAM_KEY:
+                    metrics = newAdapter(entry, svc, 
currSvc).adaptToDownstreamMetrics();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Transformed a {} inbound mesh metrics {}", 
role, TextFormat.shortDebugString(metrics));
+                    }
+                    result.add(metrics);
+                    downstreamExists.set(true);
+                    break;
+            }
+        });
+        if (role.equals(Role.PROXY) && !downstreamExists.get()) {
+            final ServiceMeshMetric.Builder metric = newAdapter(entry, 
UNKNOWN, currSvc).adaptToDownstreamMetrics();
+            if (log.isDebugEnabled()) {
+                log.debug("Transformed a {} inbound mesh metric {}", role, 
TextFormat.shortDebugString(metric));
+            }
+            result.add(metric);
+        }
+        return result;
+    }
+
+}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/ServiceMetaInfoAdapter.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/ServiceMetaInfoAdapter.java
new file mode 100644
index 0000000..c8ca1ac
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/ServiceMetaInfoAdapter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.mx;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.Struct;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import Wasm.Common.FlatNode;
+import Wasm.Common.KeyVal;
+
+import static java.util.Objects.nonNull;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Adapter to {@link ServiceMetaInfo} from various of other datastructures.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class ServiceMetaInfoAdapter extends ServiceMetaInfo {
+
+    /**
+     * Try to adapt a {@link ByteString} to {@link ServiceMetaInfo} instance.
+     *
+     * @param bv the {@link ByteString byte string} to adapt from.
+     * @throws Exception if the {@link ByteString byte string} can not be 
adapted to a {@link ServiceMetaInfo}.
+     */
+    public ServiceMetaInfoAdapter(final ByteString bv) throws Exception {
+        final ByteBuffer buffer = 
ByteBuffer.wrap(BytesValue.parseFrom(bv).getValue().toByteArray());
+        final FlatNode flatNode = FlatNode.getRootAsFlatNode(buffer);
+        if (log.isDebugEnabled()) {
+            for (int i = 0; i < flatNode.labelsLength(); i++) {
+                final KeyVal kv = flatNode.labels(i);
+                if (nonNull(kv)) {
+                    log.debug("wasm label: {} : {}", kv.key(), kv.value());
+                }
+            }
+        }
+
+        
setServiceName(Optional.ofNullable(flatNode.labelsByKey("app")).map(KeyVal::value).orElse("-"));
+        setServiceInstanceName(flatNode.name());
+    }
+
+    /**
+     * The same functionality with {@link 
ServiceMetaInfoAdapter#ServiceMetaInfoAdapter(com.google.protobuf.ByteString)}.
+     *
+     * @param any {@link Any any object} to adapt from.
+     * @throws Exception if the {@link Any any object} can not be adapted to a 
{@link ServiceMetaInfo}.
+     */
+    public ServiceMetaInfoAdapter(final Any any) throws Exception {
+        this(any.getValue());
+    }
+
+    /**
+     * The same functionality with {@link 
ServiceMetaInfoAdapter#ServiceMetaInfoAdapter(com.google.protobuf.ByteString)}.
+     *
+     * @param metadata the {@link Struct struct} to adapt from.
+     * @throws Exception if the {@link Struct struct} can not be adapted to a 
{@link ServiceMetaInfo}.
+     */
+    public ServiceMetaInfoAdapter(final Struct metadata) throws Exception {
+        FieldsHelper.SINGLETON.inflate(requireNonNull(metadata), this);
+    }
+
+}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
index 215d7a0..bb5a7d9 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis
@@ -18,3 +18,4 @@
 
 
 
org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8sALSServiceMeshHTTPAnalysis
+org.apache.skywalking.oap.server.receiver.envoy.als.mx.MetaExchangeALSHTTPAnalyzer
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
similarity index 76%
rename from 
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java
rename to 
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
index 7ad64a5..718996c 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
@@ -23,10 +23,10 @@ import 
io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.util.ArrayList;
 import java.util.List;
 import org.apache.skywalking.apm.network.common.v3.DetectPoint;
 import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import 
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
 import 
org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain;
 import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
@@ -39,14 +39,14 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class K8sHTTPAnalysisTest {
+public class K8SALSServiceMeshHTTPAnalysisTest {
 
-    private MockK8sAnalysis analysis;
+    private MockK8SAnalysis analysis;
 
     @Before
     public void setUp() {
-        analysis = new MockK8sAnalysis();
-        analysis.init(null);
+        analysis = new MockK8SAnalysis();
+        analysis.init(null, null);
     }
 
     @Test
@@ -77,16 +77,16 @@ public class K8sHTTPAnalysisTest {
             StreamAccessLogsMessage.Builder requestBuilder = 
StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
 
-            analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
+            List<ServiceMeshMetric.Builder> result = 
this.analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
 
-            Assert.assertEquals(2, analysis.metrics.size());
+            Assert.assertEquals(2, result.size());
 
-            ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+            ServiceMeshMetric.Builder incoming = result.get(0);
             Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName());
             Assert.assertEquals("ingress", incoming.getDestServiceName());
             Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
 
-            ServiceMeshMetric.Builder outgoing = analysis.metrics.get(1);
+            ServiceMeshMetric.Builder outgoing = result.get(1);
             Assert.assertEquals("ingress", outgoing.getSourceServiceName());
             Assert.assertEquals("productpage", outgoing.getDestServiceName());
             Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint());
@@ -99,12 +99,11 @@ public class K8sHTTPAnalysisTest {
             StreamAccessLogsMessage.Builder requestBuilder = 
StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
 
-            analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs()
-                                                                            
.getLogEntry(0), Role.SIDECAR);
+            List<ServiceMeshMetric.Builder> result = 
this.analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
 
-            Assert.assertEquals(1, analysis.metrics.size());
+            Assert.assertEquals(1, result.size());
 
-            ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+            ServiceMeshMetric.Builder incoming = result.get(0);
             Assert.assertEquals("", incoming.getSourceServiceName());
             Assert.assertEquals("productpage", incoming.getDestServiceName());
             Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
@@ -117,12 +116,11 @@ public class K8sHTTPAnalysisTest {
             StreamAccessLogsMessage.Builder requestBuilder = 
StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
 
-            analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs()
-                                                                            
.getLogEntry(0), Role.SIDECAR);
+            List<ServiceMeshMetric.Builder> result = 
this.analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
 
-            Assert.assertEquals(1, analysis.metrics.size());
+            Assert.assertEquals(1, result.size());
 
-            ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+            ServiceMeshMetric.Builder incoming = result.get(0);
             Assert.assertEquals("productpage", 
incoming.getSourceServiceName());
             Assert.assertEquals("review", incoming.getDestServiceName());
             Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
@@ -135,38 +133,32 @@ public class K8sHTTPAnalysisTest {
             StreamAccessLogsMessage.Builder requestBuilder = 
StreamAccessLogsMessage.newBuilder();
             JsonFormat.parser().merge(isr, requestBuilder);
 
-            analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs()
-                                                                            
.getLogEntry(0), Role.SIDECAR);
+            List<ServiceMeshMetric.Builder> result = 
this.analysis.analysis(requestBuilder.getIdentifier(), 
requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
 
-            Assert.assertEquals(1, analysis.metrics.size());
+            Assert.assertEquals(1, result.size());
 
-            ServiceMeshMetric.Builder incoming = analysis.metrics.get(0);
+            ServiceMeshMetric.Builder incoming = result.get(0);
             Assert.assertEquals("productpage", 
incoming.getSourceServiceName());
             Assert.assertEquals("detail", incoming.getDestServiceName());
             Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint());
         }
     }
 
-    public static class MockK8sAnalysis extends K8sALSServiceMeshHTTPAnalysis {
-        private List<ServiceMeshMetric.Builder> metrics = new ArrayList<>();
+    public static class MockK8SAnalysis extends K8sALSServiceMeshHTTPAnalysis {
 
         @Override
-        public void init(EnvoyMetricReceiverConfig config) {
+        public void init(ModuleManager manager, EnvoyMetricReceiverConfig 
config) {
             serviceRegistry = mock(K8SServiceRegistry.class);
             
when(serviceRegistry.findService(anyString())).thenReturn(ServiceMetaInfo.UNKNOWN);
             when(serviceRegistry.findService("10.44.2.56")).thenReturn(new 
ServiceMetaInfo("ingress", "ingress-Inst"));
             when(serviceRegistry.findService("10.44.2.54")).thenReturn(new 
ServiceMetaInfo("productpage", "productpage-Inst"));
             when(serviceRegistry.findService("10.44.6.66")).thenReturn(new 
ServiceMetaInfo("detail", "detail-Inst"));
-            when(serviceRegistry.findService("10.44.2.55")).thenReturn(new 
ServiceMetaInfo("review", "detail-Inst"));
+            when(serviceRegistry.findService("10.44.2.55")).thenReturn(new 
ServiceMetaInfo("review", "review-Inst"));
         }
 
-        @Override
-        protected void forward(ServiceMeshMetric.Builder metric) {
-            metrics.add(metric);
-        }
     }
 
-    private static InputStream getResourceAsStream(final String resource) {
+    public static InputStream getResourceAsStream(final String resource) {
         final InputStream in = 
getContextClassLoader().getResourceAsStream(resource);
         return in == null ? 
MetricServiceGRPCHandlerTestMain.class.getResourceAsStream(resource) : in;
     }
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelperTest.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelperTest.java
new file mode 100644
index 0000000..35e91f8
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/FieldsHelperTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als.mx;
+
+import com.google.protobuf.util.JsonFormat;
+import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.powermock.reflect.Whitebox;
+
+import static 
org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8SALSServiceMeshHTTPAnalysisTest.getResourceAsStream;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+@RunWith(Parameterized.class)
+public class FieldsHelperTest {
+
+    @Parameterized.Parameter()
+    public String mapping;
+
+    @Parameterized.Parameter(1)
+    public String expectedServiceName;
+
+    @Parameterized.Parameter(2)
+    public String expectedServiceInstanceName;
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            {
+                "serviceName: ${LABELS.app}\nserviceInstanceName: ${NAME}",
+                "productpage",
+                "productpage-v1-65576bb7bf-4mzsp"
+            },
+            {
+                "serviceName: 
${LABELS.app}-${LABELS.version}\nserviceInstanceName: ${NAME}.${NAMESPACE}",
+                "productpage-v1",
+                "productpage-v1-65576bb7bf-4mzsp.default"
+            },
+            {
+                "serviceName: 
${LABELS.app}-${CLUSTER_ID}\nserviceInstanceName: 
${NAME}.${NAMESPACE}.${SERVICE_ACCOUNT}",
+                "productpage-Kubernetes",
+                "productpage-v1-65576bb7bf-4mzsp.default.bookinfo-productpage"
+            },
+            {
+                "serviceName: fixed-${LABELS.app}\nserviceInstanceName: 
yeah_${NAME}",
+                "fixed-productpage",
+                "yeah_productpage-v1-65576bb7bf-4mzsp"
+            }
+        });
+    }
+
+    @Before
+    public void setUp() {
+        Whitebox.setInternalState(FieldsHelper.SINGLETON, "initialized", 
false);
+    }
+
+    @Test
+    public void testFormat() throws Exception {
+        try (final InputStreamReader isr = new 
InputStreamReader(getResourceAsStream("field-helper.msg"))) {
+            final StreamAccessLogsMessage.Builder requestBuilder = 
StreamAccessLogsMessage.newBuilder();
+            JsonFormat.parser().merge(isr, requestBuilder);
+            final ServiceMetaInfo info = new ServiceMetaInfo();
+            FieldsHelper.SINGLETON.init(new 
ByteArrayInputStream(mapping.getBytes()));
+            FieldsHelper.SINGLETON.inflate(
+                requestBuilder.getIdentifier().getNode().getMetadata(),
+                info
+            );
+            assertThat(info.getServiceName(), equalTo(expectedServiceName));
+        }
+    }
+}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/field-helper.msg
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/field-helper.msg
new file mode 100644
index 0000000..9d589e8
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/resources/field-helper.msg
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "identifier": {
+    "node": {
+      "id": 
"sidecar~172.18.0.8~productpage-v1-65576bb7bf-4mzsp.default~default.svc.cluster.local",
+      "cluster": "productpage.default",
+      "metadata": {
+        "PROXY_CONFIG": {
+          "concurrency": 2.0,
+          "envoyAccessLogService": {
+            "address": "0.tcp.ngrok.io:13760"
+          },
+          "statNameLength": 189.0,
+          "configPath": "./etc/istio/proxy",
+          "parentShutdownDuration": "60s",
+          "proxyAdminPort": 15000.0,
+          "controlPlaneAuthPolicy": "MUTUAL_TLS",
+          "drainDuration": "45s",
+          "proxyMetadata": {
+            "DNS_AGENT": ""
+          },
+          "terminationDrainDuration": "5s",
+          "tracing": {
+            "zipkin": {
+              "address": "zipkin.istio-system:9411"
+            }
+          },
+          "statusPort": 15020.0,
+          "serviceCluster": "productpage.default",
+          "envoyMetricsService": {
+          },
+          "discoveryAddress": "istiod.istio-system.svc:15012",
+          "binaryPath": "/usr/local/bin/envoy"
+        },
+        "PLATFORM_METADATA": {
+          "gcp_location": "us-central1-a",
+          "gcp_gce_instance_id": "2148869885222929334",
+          "gcp_gce_instance": "zhenxu-test",
+          "gcp_project_number": "191872121544",
+          "gcp_project": "skywalking-live-demo"
+        },
+        "CLUSTER_ID": "Kubernetes",
+        "APP_CONTAINERS": "productpage",
+        "LABELS": {
+          "service.istio.io/canonical-name": "productpage",
+          "version": "v1",
+          "security.istio.io/tlsMode": "istio",
+          "app": "productpage",
+          "service.istio.io/canonical-revision": "v1",
+          "pod-template-hash": "65576bb7bf",
+          "istio.io/rev": "default"
+        },
+        "ISTIO_PROXY_SHA": 
"istio-proxy:262253d9d066f8ef7ed82fd175c28b8f95acbec0",
+        "NAME": "productpage-v1-65576bb7bf-4mzsp",
+        "NAMESPACE": "default",
+        "EXCHANGE_KEYS": 
"NAME,NAMESPACE,INSTANCE_IPS,LABELS,OWNER,PLATFORM_METADATA,WORKLOAD_NAME,MESH_ID,SERVICE_ACCOUNT,CLUSTER_ID",
+        "INSTANCE_IPS": "172.18.0.8",
+        "POD_PORTS": "[{\"containerPort\":9080,\"protocol\":\"TCP\"}]",
+        "INTERCEPTION_MODE": "REDIRECT",
+        "SERVICE_ACCOUNT": "bookinfo-productpage",
+        "MESH_ID": "cluster.local",
+        "SDS": "true",
+        "WORKLOAD_NAME": "productpage-v1",
+        "OWNER": 
"kubernetes://apis/apps/v1/namespaces/default/deployments/productpage-v1",
+        "ISTIO_VERSION": "1.7.1"
+      },
+      "locality": {
+        "region": "us-central1",
+        "zone": "us-central1-a"
+      },
+      "buildVersion": 
"262253d9d066f8ef7ed82fd175c28b8f95acbec0/1.15.0/Clean/RELEASE/BoringSSL"
+    },
+    "logName": "http_envoy_accesslog"
+  }
+}
diff --git a/oap-server/server-receiver-plugin/receiver-proto/pom.xml 
b/oap-server/server-receiver-plugin/receiver-proto/pom.xml
index e7ca3bf..117bb1d 100644
--- a/oap-server/server-receiver-plugin/receiver-proto/pom.xml
+++ b/oap-server/server-receiver-plugin/receiver-proto/pom.xml
@@ -17,7 +17,8 @@
   ~
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
         <artifactId>server-receiver-plugin</artifactId>
         <groupId>org.apache.skywalking</groupId>
@@ -28,9 +29,45 @@
     <artifactId>receiver-proto</artifactId>
     <packaging>jar</packaging>
 
+    <properties>
+        <fbs.sources>${basedir}/src/main/fbs</fbs.sources>
+        
<fbs.generated.sources>${project.build.directory}/generated-sources/fbs/java</fbs.generated.sources>
+        <fbs.compiler>${project.build.directory}/bin/flatc</fbs.compiler>
+        <fbs.compiler.artifact.type>tar.gz</fbs.compiler.artifact.type>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.google.flatbuffers</groupId>
+            <artifactId>flatbuffers-java</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>windows</id>
+            <activation>
+                <os>
+                    <family>Windows</family>
+                </os>
+            </activation>
+            <properties>
+                <fbs.compiler.artifact.type>zip</fbs.compiler.artifact.type>
+            </properties>
+        </profile>
+    </profiles>
+
     <build>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>${os-maven-plugin.version}</version>
+            </extension>
+        </extensions>
         <plugins>
-          <plugin>
+            <plugin>
                 <groupId>kr.motd.maven</groupId>
                 <artifactId>os-maven-plugin</artifactId>
                 <version>${os-maven-plugin.version}</version>
@@ -53,10 +90,12 @@
                       protobuf-java directly, you will be transitively 
depending on the
                       protobuf-java version that grpc depends on.
                     -->
-                    
<protocArtifact>com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier}
+                    <protocArtifact>
+                        
com.google.protobuf:protoc:${com.google.protobuf.protoc.version}:exe:${os.detected.classifier}
                     </protocArtifact>
                     <pluginId>grpc-java</pluginId>
-                    
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier}
+                    <pluginArtifact>
+                        
io.grpc:protoc-gen-grpc-java:${protoc-gen-grpc-java.plugin.version}:exe:${os.detected.classifier}
                     </pluginArtifact>
                 </configuration>
                 <executions>
@@ -68,6 +107,77 @@
                     </execution>
                 </executions>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>${maven-dependency-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>unpack</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>unpack</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>com.github.davidmoten</groupId>
+                                    
<artifactId>flatbuffers-compiler</artifactId>
+                                    <version>1.12.0.1</version>
+                                    <type>${fbs.compiler.artifact.type}</type>
+                                    
<classifier>distribution-${os.detected.name}</classifier>
+                                    <overWrite>true</overWrite>
+                                    
<outputDirectory>${project.build.directory}</outputDirectory>
+                                </artifactItem>
+                            </artifactItems>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>${exec-maven-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            <executable>${fbs.compiler}</executable>
+                            <workingDirectory>${fbs.sources}</workingDirectory>
+                            <arguments>
+                                <argument>--java</argument>
+                                <argument>--gen-mutable</argument>
+                                <argument>-o</argument>
+                                <argument>${fbs.generated.sources}</argument>
+                                <arguments>istio/node-info.fbs</arguments>
+                            </arguments>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>${build-helper-maven-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${fbs.generated.sources}</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/oap-server/server-receiver-plugin/receiver-proto/src/main/fbs/istio/node-info.fbs
 
b/oap-server/server-receiver-plugin/receiver-proto/src/main/fbs/istio/node-info.fbs
new file mode 100644
index 0000000..8b33c3a
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/receiver-proto/src/main/fbs/istio/node-info.fbs
@@ -0,0 +1,51 @@
+/* Copyright 2020 Istio Authors. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Original File Location:
+//   
https://github.com/istio/proxy/blob/bcdc1684df0839a612526f688ff7b475902f2feb/extensions/common/node_info.fbs
+
+namespace Wasm.Common;
+
+table KeyVal {
+  key:string (key);
+  value:string;
+}
+
+// NodeInfo represents the information extracted from proxy node metadata.
+table FlatNode {
+  // Name of the node. e.g. in k8s, name is the pod name.
+  name:string;
+  // Namespace that the node runs in.
+  namespace:string;
+  // K8s or vm workload attributes.
+  labels:[KeyVal];
+  owner:string;
+  workload_name:string;
+  // Platform metadata uses prefixed keys
+  // GCP uses gcp_* keys
+  platform_metadata:[KeyVal];
+  // Version identifier for the proxy.
+  istio_version:string;
+  // Unique identifier for the mesh. Taken from global mesh id parameter (or
+  // the configured trust domain when not specified).
+  mesh_id:string;
+  // List of short names for application containers that are using this proxy.
+  // This is only used for kubernetes, and is populated by the sidecar 
injector.
+  app_containers:[string];
+  // Identifier for the cluster to which this workload belongs (for k8s 
workloads).
+  cluster_id:string;
+}
+
+root_type FlatNode;
diff --git a/pom.xml b/pom.xml
index ad73b56..815d3d2 100755
--- a/pom.xml
+++ b/pom.xml
@@ -207,9 +207,11 @@
         <takari-maven-plugin.version>0.6.1</takari-maven-plugin.version>
         <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
         <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
+        <maven-dependency-plugin.version>2.10</maven-dependency-plugin.version>
         <maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
         <maven-assembly-plugin.version>3.1.0</maven-assembly-plugin.version>
         <maven-failsafe-plugin.version>2.22.0</maven-failsafe-plugin.version>
+        
<build-helper-maven-plugin.version>3.2.0</build-helper-maven-plugin.version>
         <maven-surefire-plugin.version>2.22.0</maven-surefire-plugin.version>
         <maven-jar-plugin.version>3.1.0</maven-jar-plugin.version>
         <maven-shade-plugin.version>3.1.1</maven-shade-plugin.version>
@@ -225,6 +227,7 @@
         <gmaven-plugin.version>1.5</gmaven-plugin.version>
         <cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
         <checkstyle.fails.on.error>true</checkstyle.fails.on.error>
+
     </properties>
 
     <dependencies>
@@ -498,6 +501,7 @@
                         <exclude>skywalking-ui/package-lock.json</exclude>
 
                         <!-- Proto files of Istio, envoy, prometheus and 
gogoproto projects -->
+                        <exclude>**/src/main/fbs/istio/**</exclude>
                         <exclude>**/src/main/proto/envoy/**</exclude>
                         
<exclude>**/src/main/proto/gogoproto/gogo.proto</exclude>
                         <exclude>**/src/main/proto/google/**</exclude>
diff --git 
a/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java
 
b/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java
index f6218c8..f6fdbb9 100644
--- 
a/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java
+++ 
b/test/e2e/e2e-common/src/main/java/org/apache/skywalking/e2e/retryable/RetryableTest.java
@@ -49,7 +49,7 @@ public @interface RetryableTest {
     /**
      * @return maximum times to retry, or -1 for infinite retries. {@code -1} 
by default.
      */
-    int value() default 120;
+    int value() default 60;
 
     /**
      * @return the interval between any two retries, in millisecond. {@code 
1000} by default.
diff --git 
a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/IDManager.java 
b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/IDManager.java
deleted file mode 100644
index ce2fb0c..0000000
--- 
a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/mesh/IDManager.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.e2e.mesh;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-public class IDManager {
-    public static class ServiceID {
-
-        public static ServiceIDDefinition analysisId(String id) {
-            final String[] strings = id.split("\\.");
-            if (strings.length != 2) {
-                throw new RuntimeException("Can't split service id into 2 
parts, " + id);
-            }
-            return new ServiceIDDefinition(
-                decode(strings[0]),
-                Integer.parseInt(strings[1]) == 1
-            );
-        }
-
-        @RequiredArgsConstructor
-        @Getter
-        @EqualsAndHashCode
-        public static class ServiceIDDefinition {
-            private final String name;
-
-            private final boolean isReal;
-        }
-    }
-
-    /**
-     * @param base64text Base64 encoded UTF-8 string
-     * @return normal literal string
-     */
-    private static String decode(String base64text) {
-        return new String(Base64.getDecoder().decode(base64text), 
StandardCharsets.UTF_8);
-    }
-}
diff --git a/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml 
b/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml
index 0d5ee72..b60dcc0 100644
--- a/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml
+++ b/test/e2e/e2e-test/src/test/resources/expected/als/topo.yml
@@ -47,6 +47,7 @@ calls:
     target: cHJvZHVjdHBhZ2U=.1
     detectPoints:
       - CLIENT
+      - SERVER
   - id: not null
     source: cHJvZHVjdHBhZ2U=.1
     target: cmV2aWV3cw==.1
diff --git a/tools/dependencies/known-oap-backend-dependencies-es7.txt 
b/tools/dependencies/known-oap-backend-dependencies-es7.txt
index cbd448f..ea82e6d 100755
--- a/tools/dependencies/known-oap-backend-dependencies-es7.txt
+++ b/tools/dependencies/known-oap-backend-dependencies-es7.txt
@@ -44,6 +44,7 @@ rank-eval-client-7.5.0.jar
 error_prone_annotations-2.3.2.jar
 etcd4j-2.17.0.jar
 failureaccess-1.0.1.jar
+flatbuffers-java-1.12.0.jar
 freemarker-2.3.28.jar
 graphql-java-8.0.jar
 graphql-java-tools-5.2.3.jar
diff --git a/tools/dependencies/known-oap-backend-dependencies.txt 
b/tools/dependencies/known-oap-backend-dependencies.txt
index 9c37c31..12041d5 100755
--- a/tools/dependencies/known-oap-backend-dependencies.txt
+++ b/tools/dependencies/known-oap-backend-dependencies.txt
@@ -41,6 +41,7 @@ elasticsearch-x-content-6.3.2.jar
 error_prone_annotations-2.3.2.jar
 etcd4j-2.17.0.jar
 failureaccess-1.0.1.jar
+flatbuffers-java-1.12.0.jar
 freemarker-2.3.28.jar
 graphql-java-8.0.jar
 graphql-java-tools-5.2.3.jar

Reply via email to