This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 7727ef5b7c Add Unknown Node when receive Kubernetes peer address is 
not aware in current cluster (#12496)
7727ef5b7c is described below

commit 7727ef5b7c0902d7a45987102b8a994f641361fc
Author: mrproliu <741550...@qq.com>
AuthorDate: Thu Aug 1 16:21:28 2024 +0800

    Add Unknown Node when receive Kubernetes peer address is not aware in 
current cluster (#12496)
---
 docs/en/changes/changes.md                         |  1 +
 .../provider/handler/AccessLogServiceHandler.java  | 34 +++++++++++++++++-----
 .../access_log/expected/dependency-services.yml    |  2 +-
 3 files changed, 29 insertions(+), 8 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index eff84e5d9a..f1ffad1f2d 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -45,6 +45,7 @@
 * BanyanDB: if the model column is already a `@BanyanDB.TimestampColumn`, set 
`@BanyanDB.NoIndexing` on it to reduce indexes.
 * BanyanDB: stream sort-by `time` query, use internal time-series rather than 
`index` to improve the query performance.
 * Bump up graphql-java to 21.5.
+* Add Unknown Node when receive Kubernetes peer address is not aware in 
current cluster.
 
 #### UI
 
diff --git 
a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java
 
b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java
index 189afa5494..9d6b769b22 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java
@@ -46,6 +46,7 @@ import 
org.apache.skywalking.apm.network.ebpf.accesslog.v3.IPAddress;
 import 
org.apache.skywalking.apm.network.ebpf.accesslog.v3.KubernetesProcessAddress;
 import org.apache.skywalking.library.kubernetes.ObjectID;
 import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
+import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.Layer;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
@@ -80,6 +81,10 @@ import java.util.stream.Stream;
 
 @Slf4j
 public class AccessLogServiceHandler extends 
EBPFAccessLogServiceGrpc.EBPFAccessLogServiceImplBase {
+    protected static final KubernetesProcessAddress UNKNOWN_ADDRESS = 
KubernetesProcessAddress.newBuilder()
+        .setServiceName(Const.UNKNOWN)
+        .setPodName(Const.UNKNOWN)
+        .build();
     private final SourceReceiver sourceReceiver;
     private final NamingControl namingControl;
 
@@ -171,7 +176,8 @@ public class AccessLogServiceHandler extends 
EBPFAccessLogServiceGrpc.EBPFAccess
 
     private void dispatchKernelLog(NodeInfo node, ConnectionInfo connection, 
AccessLogKernelLog kernelLog) {
         final List<K8SMetrics> metrics = Arrays.asList(connection.toService(), 
connection.toServiceInstance(),
-            connection.toServiceRelation(), 
connection.toServiceInstanceRelation());
+            connection.toServiceRelation(), 
connection.toServiceInstanceRelation())
+            .stream().filter(Objects::nonNull).collect(Collectors.toList());
 
         for (K8SMetrics metric : metrics) {
             switch (kernelLog.getOperationCase()) {
@@ -464,20 +470,24 @@ public class AccessLogServiceHandler extends 
EBPFAccessLogServiceGrpc.EBPFAccess
     protected KubernetesProcessAddress buildKubernetesAddressByIP(NodeInfo 
nodeInfo, IPAddress ipAddress) {
         final ObjectID pod = 
K8sInfoRegistry.getInstance().findPodByIP(ipAddress.getHost());
         if (pod == ObjectID.EMPTY) {
-            return null;
-        }
-        if (nodeInfo.shouldExcludeNamespace(pod.namespace())) {
-            log.debug("Should exclude the namespace[{}] traffic, pod: {}", 
pod.namespace(), pod.name());
-            return null;
+            // if cannot found the address, then return the unknown address
+            log.debug("building unknown address by ip: {}:{}", 
ipAddress.getHost(), ipAddress.getPort());
+            return buildUnknownAddress();
         }
         final ObjectID serviceName = 
K8sInfoRegistry.getInstance().findService(pod.namespace(), pod.name());
         if (serviceName == ObjectID.EMPTY) {
-            return null;
+            // if the pod have been found, but the service name cannot found, 
then still return unknown address
+            log.debug("building unknown address by pod: {}:{}", pod.name(), 
ipAddress.getPort());
+            return buildUnknownAddress();
         }
 
         return buildRemoteAddress(nodeInfo, serviceName, pod);
     }
 
+    protected KubernetesProcessAddress buildUnknownAddress() {
+        return UNKNOWN_ADDRESS;
+    }
+
     protected KubernetesProcessAddress buildRemoteAddress(NodeInfo nodeInfo, 
ObjectID service, ObjectID pod) {
         String serviceName = service.name() + "." + service.namespace();
         if (StringUtil.isNotEmpty(nodeInfo.getClusterName())) {
@@ -511,6 +521,10 @@ public class AccessLogServiceHandler extends 
EBPFAccessLogServiceGrpc.EBPFAccess
             this.nodeInfo = nodeInfo;
             this.protocolType = connection.getProtocol();
             this.valid = generateIsValid();
+            if (log.isDebugEnabled() &&
+                (Objects.equals(this.local, buildUnknownAddress()) || 
Objects.equals(this.remote, buildUnknownAddress()))) {
+                log.debug("found unknown connection: {}", connection);
+            }
         }
 
         private KubernetesProcessAddress buildAddress(NodeInfo nodeInfo, 
ConnectionAddress address) {
@@ -542,6 +556,9 @@ public class AccessLogServiceHandler extends 
EBPFAccessLogServiceGrpc.EBPFAccess
         }
 
         public K8SService toService() {
+            if (Objects.equals(local, buildUnknownAddress())) {
+                return null;
+            }
             final K8SService service = new K8SService();
             service.setName(buildServiceNameByAddress(nodeInfo, local));
             service.setLayer(Layer.K8S_SERVICE);
@@ -550,6 +567,9 @@ public class AccessLogServiceHandler extends 
EBPFAccessLogServiceGrpc.EBPFAccess
         }
 
         public K8SServiceInstance toServiceInstance() {
+            if (Objects.equals(local, buildUnknownAddress())) {
+                return null;
+            }
             final K8SServiceInstance serviceInstance = new 
K8SServiceInstance();
             serviceInstance.setServiceName(buildServiceNameByAddress(nodeInfo, 
local));
             
serviceInstance.setServiceInstanceName(buildServiceInstanceName(local));
diff --git 
a/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml 
b/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml
index fb4b5554bf..f54beebc4e 100644
--- 
a/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml
+++ 
b/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml
@@ -17,7 +17,7 @@ nodes:
 {{- contains .nodes }}
 - id: {{ b64enc "productpage.default"}}.1
   name: productpage.default
-  type: null
+  type: http
   isreal: true
 - id: {{ b64enc "details.default"}}.1
   name: details.default

Reply via email to