This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push: new 7727ef5b7c Add Unknown Node when receive Kubernetes peer address is not aware in current cluster (#12496) 7727ef5b7c is described below commit 7727ef5b7c0902d7a45987102b8a994f641361fc Author: mrproliu <741550...@qq.com> AuthorDate: Thu Aug 1 16:21:28 2024 +0800 Add Unknown Node when receive Kubernetes peer address is not aware in current cluster (#12496) --- docs/en/changes/changes.md | 1 + .../provider/handler/AccessLogServiceHandler.java | 34 +++++++++++++++++----- .../access_log/expected/dependency-services.yml | 2 +- 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index eff84e5d9a..f1ffad1f2d 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -45,6 +45,7 @@ * BanyanDB: if the model column is already a `@BanyanDB.TimestampColumn`, set `@BanyanDB.NoIndexing` on it to reduce indexes. * BanyanDB: stream sort-by `time` query, use internal time-series rather than `index` to improve the query performance. * Bump up graphql-java to 21.5. +* Add Unknown Node when receive Kubernetes peer address is not aware in current cluster. #### UI diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java index 189afa5494..9d6b769b22 100644 --- a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java @@ -46,6 +46,7 @@ import org.apache.skywalking.apm.network.ebpf.accesslog.v3.IPAddress; import org.apache.skywalking.apm.network.ebpf.accesslog.v3.KubernetesProcessAddress; import org.apache.skywalking.library.kubernetes.ObjectID; import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry; +import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.Layer; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; @@ -80,6 +81,10 @@ import java.util.stream.Stream; @Slf4j public class AccessLogServiceHandler extends EBPFAccessLogServiceGrpc.EBPFAccessLogServiceImplBase { + protected static final KubernetesProcessAddress UNKNOWN_ADDRESS = KubernetesProcessAddress.newBuilder() + .setServiceName(Const.UNKNOWN) + .setPodName(Const.UNKNOWN) + .build(); private final SourceReceiver sourceReceiver; private final NamingControl namingControl; @@ -171,7 +176,8 @@ public class AccessLogServiceHandler extends EBPFAccessLogServiceGrpc.EBPFAccess private void dispatchKernelLog(NodeInfo node, ConnectionInfo connection, AccessLogKernelLog kernelLog) { final List<K8SMetrics> metrics = Arrays.asList(connection.toService(), connection.toServiceInstance(), - connection.toServiceRelation(), connection.toServiceInstanceRelation()); + connection.toServiceRelation(), connection.toServiceInstanceRelation()) + .stream().filter(Objects::nonNull).collect(Collectors.toList()); for (K8SMetrics metric : metrics) { switch (kernelLog.getOperationCase()) { @@ -464,20 +470,24 @@ public class AccessLogServiceHandler extends EBPFAccessLogServiceGrpc.EBPFAccess protected KubernetesProcessAddress buildKubernetesAddressByIP(NodeInfo nodeInfo, IPAddress ipAddress) { final ObjectID pod = K8sInfoRegistry.getInstance().findPodByIP(ipAddress.getHost()); if (pod == ObjectID.EMPTY) { - return null; - } - if (nodeInfo.shouldExcludeNamespace(pod.namespace())) { - log.debug("Should exclude the namespace[{}] traffic, pod: {}", pod.namespace(), pod.name()); - return null; + // if cannot found the address, then return the unknown address + log.debug("building unknown address by ip: {}:{}", ipAddress.getHost(), ipAddress.getPort()); + return buildUnknownAddress(); } final ObjectID serviceName = K8sInfoRegistry.getInstance().findService(pod.namespace(), pod.name()); if (serviceName == ObjectID.EMPTY) { - return null; + // if the pod have been found, but the service name cannot found, then still return unknown address + log.debug("building unknown address by pod: {}:{}", pod.name(), ipAddress.getPort()); + return buildUnknownAddress(); } return buildRemoteAddress(nodeInfo, serviceName, pod); } + protected KubernetesProcessAddress buildUnknownAddress() { + return UNKNOWN_ADDRESS; + } + protected KubernetesProcessAddress buildRemoteAddress(NodeInfo nodeInfo, ObjectID service, ObjectID pod) { String serviceName = service.name() + "." + service.namespace(); if (StringUtil.isNotEmpty(nodeInfo.getClusterName())) { @@ -511,6 +521,10 @@ public class AccessLogServiceHandler extends EBPFAccessLogServiceGrpc.EBPFAccess this.nodeInfo = nodeInfo; this.protocolType = connection.getProtocol(); this.valid = generateIsValid(); + if (log.isDebugEnabled() && + (Objects.equals(this.local, buildUnknownAddress()) || Objects.equals(this.remote, buildUnknownAddress()))) { + log.debug("found unknown connection: {}", connection); + } } private KubernetesProcessAddress buildAddress(NodeInfo nodeInfo, ConnectionAddress address) { @@ -542,6 +556,9 @@ public class AccessLogServiceHandler extends EBPFAccessLogServiceGrpc.EBPFAccess } public K8SService toService() { + if (Objects.equals(local, buildUnknownAddress())) { + return null; + } final K8SService service = new K8SService(); service.setName(buildServiceNameByAddress(nodeInfo, local)); service.setLayer(Layer.K8S_SERVICE); @@ -550,6 +567,9 @@ public class AccessLogServiceHandler extends EBPFAccessLogServiceGrpc.EBPFAccess } public K8SServiceInstance toServiceInstance() { + if (Objects.equals(local, buildUnknownAddress())) { + return null; + } final K8SServiceInstance serviceInstance = new K8SServiceInstance(); serviceInstance.setServiceName(buildServiceNameByAddress(nodeInfo, local)); serviceInstance.setServiceInstanceName(buildServiceInstanceName(local)); diff --git a/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml b/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml index fb4b5554bf..f54beebc4e 100644 --- a/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml +++ b/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml @@ -17,7 +17,7 @@ nodes: {{- contains .nodes }} - id: {{ b64enc "productpage.default"}}.1 name: productpage.default - type: null + type: http isreal: true - id: {{ b64enc "details.default"}}.1 name: details.default