This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch k8s/service-registry in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 8b137dc67bc2812bffffc03d192b2ec635f689f7 Author: kezhenxu94 <kezhenx...@163.com> AuthorDate: Fri Oct 23 12:06:38 2020 +0800 Improve K8S ALS analysis The current implementation of envoy ALS K8S analysis is based on the hierarchy, pod -> StatefulSet -> deployment, StatefulSet, or others. It's freaky and different from the Istio Kubernetes registry. The new path is pod -> endpoint -> service, and we should leverage Informer API instead of raw Kubernetes API. --- .github/workflows/e2e.istio.yaml | 12 +- docker/oap/log4j2.xml | 2 +- .../receiver/envoy/als/K8SServiceRegistry.java | 211 ++++++++++++++++ .../envoy/als/K8sALSServiceMeshHTTPAnalysis.java | 273 ++++++--------------- .../server/receiver/envoy/als/ServiceMetaInfo.java | 9 + .../receiver/envoy/als/K8sHTTPAnalysisTest.java | 17 +- test/e2e-mesh/e2e-istio/scripts/istio.sh | 3 +- 7 files changed, 312 insertions(+), 215 deletions(-) diff --git a/.github/workflows/e2e.istio.yaml b/.github/workflows/e2e.istio.yaml index 28db870..32d6e9c 100644 --- a/.github/workflows/e2e.istio.yaml +++ b/.github/workflows/e2e.istio.yaml @@ -21,14 +21,16 @@ on: push: branches: - master + - k8s/service-registry env: SKIP_TEST: true ES_VERSION: es7 ISTIO_VERSION: 1.7.1 + K8S_VER: 1.17.0 TAG: ${{ github.sha }} SCRIPTS_DIR: test/e2e-mesh/e2e-istio/scripts - SW_OAP_BASE_IMAGE: openjdk:8-jre-alpine + SW_OAP_BASE_IMAGE: openjdk:11-jdk jobs: als: @@ -62,17 +64,21 @@ jobs: run: | git clone https://github.com/apache/skywalking-kubernetes.git cd skywalking-kubernetes - git reset --hard 419cd1aed8bb4ad972208e5a031527a25d2ae690 + git reset --hard 80a18d1d475c82ccaace87f2dbe1c0bf22f2dedf cd chart helm dep up skywalking helm -n istio-system install skywalking skywalking \ --set fullnameOverride=skywalking \ --set elasticsearch.replicas=1 \ + --set elasticsearch.minimumMasterNodes=1 \ --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \ --set oap.envoy.als.enabled=true \ --set oap.replicas=1 \ + --set ui.image.repository=skywalking/ui \ + --set ui.image.tag=$TAG \ --set oap.image.tag=$TAG \ - --set oap.image.repository=skywalking/oap + --set oap.image.repository=skywalking/oap \ + --set oap.storageType=elasticsearch7 kubectl -n istio-system get pods sleep 3 diff --git a/docker/oap/log4j2.xml b/docker/oap/log4j2.xml index eb69a89..a119e1e 100644 --- a/docker/oap/log4j2.xml +++ b/docker/oap/log4j2.xml @@ -29,7 +29,7 @@ <logger name="org.elasticsearch.common.network.IfConfig" level="INFO"/> <logger name="io.grpc.netty" level="INFO"/> <logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/> - <Root level="INFO"> + <Root level="DEBUG"> <AppenderRef ref="Console"/> </Root> </Loggers> diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8SServiceRegistry.java new file mode 100644 index 0000000..97c233c --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8SServiceRegistry.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.kubernetes.client.informer.ResourceEventHandler; +import io.kubernetes.client.informer.SharedInformerFactory; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1Endpoints; +import io.kubernetes.client.openapi.models.V1EndpointsList; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1PodList; +import io.kubernetes.client.util.Config; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +import static java.util.Objects.isNull; +import static java.util.Objects.requireNonNull; + +@Slf4j +class K8SServiceRegistry { + final Map<String, ServiceMetaInfo> ipServiceMap; + + final ExecutorService executor; + + K8SServiceRegistry() { + ipServiceMap = new ConcurrentHashMap<>(); + executor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("K8SServiceRegistry-%d") + .setDaemon(true) + .build() + ); + } + + void start() throws IOException { + final ApiClient apiClient = Config.defaultClient(); + apiClient.setHttpClient(apiClient.getHttpClient() + .newBuilder() + .readTimeout(0, TimeUnit.SECONDS) + .build()); + + final CoreV1Api coreV1Api = new CoreV1Api(apiClient); + final SharedInformerFactory factory = new SharedInformerFactory(executor); + + listenEndpointsEvents(coreV1Api, factory); + listenPodEvents(coreV1Api, factory); + + factory.startAllRegisteredInformers(); + } + + private void listenEndpointsEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) { + factory.sharedIndexInformerFor( + params -> coreV1Api.listEndpointsForAllNamespacesCall( + null, + null, + null, + null, + null, + null, + params.resourceVersion, + params.timeoutSeconds, + params.watch, + null + ), + V1Endpoints.class, + V1EndpointsList.class + ).addEventHandler(new ResourceEventHandler<V1Endpoints>() { + @Override + public void onAdd(final V1Endpoints endpoints) { + addEndpoints(endpoints); + } + + @Override + public void onUpdate(final V1Endpoints oldEndpoints, final V1Endpoints newEndpoints) { + removeEndpoints(oldEndpoints); + addEndpoints(newEndpoints); + } + + @Override + public void onDelete(final V1Endpoints endpoints, final boolean deletedFinalStateUnknown) { + removeEndpoints(endpoints); + } + }); + } + + private void listenPodEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) { + factory.sharedIndexInformerFor( + params -> coreV1Api.listPodForAllNamespacesCall( + null, + null, + null, + null, + null, + null, + params.resourceVersion, + params.timeoutSeconds, + params.watch, + null + ), + V1Pod.class, + V1PodList.class + ).addEventHandler(new ResourceEventHandler<V1Pod>() { + @Override + public void onAdd(final V1Pod pod) { + addPod(pod); + } + + @Override + public void onUpdate(final V1Pod oldPod, final V1Pod newPod) { + removePod(oldPod); + addPod(newPod); + } + + @Override + public void onDelete(final V1Pod pod, final boolean deletedFinalStateUnknown) { + removePod(pod); + } + }); + } + + private void removePod(final V1Pod pod) { + Optional.ofNullable(pod.getStatus()).ifPresent( + status -> ipServiceMap.remove(status.getPodIP()) + ); + } + + private void addPod(final V1Pod pod) { + Optional.ofNullable(pod.getStatus()).ifPresent( + status -> { + final String ip = status.getPodIP(); + final ServiceMetaInfo service = ipServiceMap.computeIfAbsent(ip, unused -> new ServiceMetaInfo()); + + final V1ObjectMeta podMeta = requireNonNull(pod.getMetadata()); + service.setServiceInstanceName(String.format("%s.%s", podMeta.getName(), podMeta.getNamespace())); + service.setTags(transformLabelsToTags(podMeta.getLabels())); + } + ); + } + + private void addEndpoints(final V1Endpoints endpoints) { + final String serviceName = requireNonNull(endpoints.getMetadata()).getName(); + + requireNonNull(endpoints.getSubsets()).forEach(subset -> { + requireNonNull(subset.getAddresses()).forEach(address -> { + final String ip = address.getIp(); + final ServiceMetaInfo service = ipServiceMap.computeIfAbsent(ip, unused -> new ServiceMetaInfo()); + service.setServiceName(serviceName); + }); + }); + } + + private void removeEndpoints(final V1Endpoints endpoints) { + requireNonNull(endpoints.getSubsets()).forEach(subset -> { + requireNonNull(subset.getAddresses()).forEach(address -> { + final String ip = address.getIp(); + ipServiceMap.remove(ip); + }); + }); + } + + private List<ServiceMetaInfo.KeyValue> transformLabelsToTags(final Map<String, String> labels) { + if (isNull(labels)) { + return Collections.emptyList(); + } + return labels.entrySet() + .stream() + .map(each -> new ServiceMetaInfo.KeyValue(each.getKey(), each.getValue())) + .collect(Collectors.toList()); + } + + ServiceMetaInfo findService(final String ip) { + final ServiceMetaInfo service = ipServiceMap.getOrDefault(ip, ServiceMetaInfo.UNKNOWN); + if (!service.isComplete()) { + log.debug("Unknown ip {}, ip -> service is null", ip); + return ServiceMetaInfo.UNKNOWN; + } + return service; + } + + boolean isEmpty() { + return ipServiceMap.isEmpty(); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java index 6d1e3dc..807ba06 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java @@ -19,7 +19,6 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; import com.google.common.base.Strings; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.Duration; import com.google.protobuf.Timestamp; import io.envoyproxy.envoy.api.v2.core.Address; @@ -31,58 +30,32 @@ import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties; import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties; import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties; import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; -import io.kubernetes.client.openapi.ApiClient; -import io.kubernetes.client.openapi.apis.CoreV1Api; -import io.kubernetes.client.openapi.apis.ExtensionsV1beta1Api; -import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.openapi.models.V1OwnerReference; -import io.kubernetes.client.openapi.models.V1Pod; -import io.kubernetes.client.openapi.models.V1PodList; -import io.kubernetes.client.util.Config; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import lombok.AccessLevel; -import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher; import org.apache.skywalking.apm.network.common.v3.DetectPoint; import org.apache.skywalking.apm.network.servicemesh.v3.Protocol; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; import org.apache.skywalking.oap.server.core.source.Source; import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Analysis log based on ingress and mesh scenarios. */ +@Slf4j public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { - private static final Logger LOGGER = LoggerFactory.getLogger(K8sALSServiceMeshHTTPAnalysis.class); - - private static final String VALID_PHASE = "Running"; - private static final String NON_TLS = "NONE"; private static final String M_TLS = "mTLS"; private static final String TLS = "TLS"; - @Getter(AccessLevel.PROTECTED) - private final AtomicReference<Map<String, ServiceMetaInfo>> ipServiceMap = new AtomicReference<>(); - - private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool( - 1, new ThreadFactoryBuilder() - .setNameFormat("load-pod-%d") - .setDaemon(true) - .build()); + protected K8SServiceRegistry serviceRegistry; @Override public String name() { @@ -90,83 +63,15 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { } @Override + @SneakyThrows public void init(EnvoyMetricReceiverConfig config) { - executorService.scheduleAtFixedRate(this::loadPodInfo, 0, 15, TimeUnit.SECONDS); - } - - private boolean invalidPodList() { - Map<String, ServiceMetaInfo> map = ipServiceMap.get(); - return map == null || map.isEmpty(); - } - - private void loadPodInfo() { - try { - ApiClient client = Config.defaultClient(); - CoreV1Api api = new CoreV1Api(client); - - V1PodList list = api.listPodForAllNamespaces(null, null, null, null, null, null, null, null, null); - Map<String, ServiceMetaInfo> ipMap = new HashMap<>(list.getItems().size()); - long startTime = System.nanoTime(); - for (V1Pod item : list.getItems()) { - if (!item.getStatus().getPhase().equals(VALID_PHASE)) { - LOGGER.debug("Invalid pod {} is not in a valid phase {}", item.getMetadata() - .getName(), item.getStatus() - .getPhase()); - continue; - } - if (item.getStatus().getPodIP().equals(item.getStatus().getHostIP())) { - LOGGER.debug( - "Pod {}.{} is removed because hostIP and podIP are identical ", item.getMetadata() - .getName(), - item.getMetadata() - .getNamespace() - ); - continue; - } - ipMap.put(item.getStatus().getPodIP(), createServiceMetaInfo(item.getMetadata())); - } - LOGGER.info("Load {} pods in {}ms", ipMap.size(), (System.nanoTime() - startTime) / 1_000_000); - ipServiceMap.set(ipMap); - } catch (Throwable th) { - LOGGER.error("run load pod error", th); - } - } - - private ServiceMetaInfo createServiceMetaInfo(final V1ObjectMeta podMeta) { - ExtensionsV1beta1Api extensionsApi = new ExtensionsV1beta1Api(); - DependencyResource dr = new DependencyResource(podMeta); - DependencyResource meta = dr.getOwnerResource( - "ReplicaSet", ownerReference -> extensionsApi.readNamespacedReplicaSet( - ownerReference - .getName(), podMeta.getNamespace(), "", true, true) - .getMetadata()); - ServiceMetaInfo result = new ServiceMetaInfo(); - if (meta.getMetadata().getOwnerReferences() != null && meta.getMetadata().getOwnerReferences().size() > 0) { - V1OwnerReference owner = meta.getMetadata().getOwnerReferences().get(0); - result.setServiceName(String.format("%s.%s", owner.getName(), meta.getMetadata().getNamespace())); - } else { - result.setServiceName(String.format("%s.%s", meta.getMetadata().getName(), meta.getMetadata() - .getNamespace())); - } - result.setServiceInstanceName(String.format("%s.%s", podMeta.getName(), podMeta.getNamespace())); - result.setTags(transformLabelsToTags(podMeta.getLabels())); - return result; - } - - private List<ServiceMetaInfo.KeyValue> transformLabelsToTags(final Map<String, String> labels) { - if (labels == null || labels.size() < 1) { - return Collections.emptyList(); - } - List<ServiceMetaInfo.KeyValue> result = new ArrayList<>(labels.size()); - for (Map.Entry<String, String> each : labels.entrySet()) { - result.add(new ServiceMetaInfo.KeyValue(each.getKey(), each.getValue())); - } - return result; + serviceRegistry = new K8SServiceRegistry(); + serviceRegistry.start(); } @Override public List<Source> analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) { - if (invalidPodList()) { + if (serviceRegistry.isEmpty()) { return Collections.emptyList(); } switch (role) { @@ -209,95 +114,74 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { boolean status = responseCode >= 200 && responseCode < 400; Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); - ServiceMetaInfo downstreamService = find( - downstreamRemoteAddress.getSocketAddress() - .getAddress(), downstreamRemoteAddress.getSocketAddress() - .getPortValue()); + ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress()); Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); - ServiceMetaInfo localService = find( - downstreamLocalAddress.getSocketAddress() - .getAddress(), downstreamLocalAddress.getSocketAddress() - .getPortValue()); + ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress()); String tlsMode = parseTLS(properties.getTlsProperties()); + + ServiceMeshMetric.Builder metric = null; if (cluster.startsWith("inbound|")) { // Server side if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) { // Ingress -> sidecar(server side) // Mesh telemetry without source, the relation would be generated. - ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setDestServiceName( - localService.getServiceName()) - .setDestServiceInstance( - localService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode( - Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); - - LOGGER.debug("Transformed ingress->sidecar inbound mesh metric {}", metric); - forward(metric); + metric = ServiceMeshMetric.newBuilder() + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setDestServiceName(localService.getServiceName()) + .setDestServiceInstance(localService.getServiceInstanceName()) + .setEndpoint(endpoint) + .setLatency((int) duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status) + .setProtocol(protocol) + .setTlsMode(tlsMode) + .setDetectPoint(DetectPoint.server); + + log.debug("Transformed ingress->sidecar inbound mesh metric {}", metric); } else { // sidecar -> sidecar(server side) - ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName( - downstreamService.getServiceName()) - .setSourceServiceInstance( - downstreamService.getServiceInstanceName()) - .setDestServiceName( - localService.getServiceName()) - .setDestServiceInstance( - localService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode( - Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); - - LOGGER.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); - forward(metric); + metric = ServiceMeshMetric.newBuilder() + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setSourceServiceName(downstreamService.getServiceName()) + .setSourceServiceInstance(downstreamService.getServiceInstanceName()) + .setDestServiceName(localService.getServiceName()) + .setDestServiceInstance(localService.getServiceInstanceName()) + .setEndpoint(endpoint) + .setLatency((int) duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status) + .setProtocol(protocol) + .setTlsMode(tlsMode) + .setDetectPoint(DetectPoint.server); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); } } else if (cluster.startsWith("outbound|")) { // sidecar(client side) -> sidecar Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); - ServiceMetaInfo destService = find( - upstreamRemoteAddress.getSocketAddress() - .getAddress(), upstreamRemoteAddress.getSocketAddress() - .getPortValue()); - - ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName( - downstreamService.getServiceName()) - .setSourceServiceInstance( - downstreamService.getServiceInstanceName()) - .setDestServiceName( - destService.getServiceName()) - .setDestServiceInstance( - destService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.client); - - LOGGER.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); - forward(metric); - + ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress()); + + metric = ServiceMeshMetric.newBuilder() + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setSourceServiceName(downstreamService.getServiceName()) + .setSourceServiceInstance(downstreamService.getServiceInstanceName()) + .setDestServiceName(destService.getServiceName()) + .setDestServiceInstance(destService.getServiceInstanceName()) + .setEndpoint(endpoint) + .setLatency((int) duration) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status) + .setProtocol(protocol) + .setTlsMode(tlsMode) + .setDetectPoint(DetectPoint.client); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); } + + Optional.ofNullable(metric).ifPresent(this::forward); } } return sources; @@ -308,11 +192,11 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { return NON_TLS; } if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties()) - .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { + .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { return NON_TLS; } if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties()) - .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { + .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { return TLS; } return M_TLS; @@ -326,14 +210,10 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); if (downstreamLocalAddress != null && downstreamRemoteAddress != null && upstreamRemoteAddress != null) { SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress(); - ServiceMetaInfo outside = find( - downstreamRemoteAddressSocketAddress.getAddress(), downstreamRemoteAddressSocketAddress - .getPortValue()); + ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress()); SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress(); - ServiceMetaInfo ingress = find( - downstreamLocalAddressSocketAddress.getAddress(), downstreamLocalAddressSocketAddress - .getPortValue()); + ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress()); long startTime = formatAsLong(properties.getStartTime()); long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); @@ -375,13 +255,11 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { .setTlsMode(tlsMode) .setDetectPoint(DetectPoint.server); - LOGGER.debug("Transformed ingress inbound mesh metric {}", metric); + log.debug("Transformed ingress inbound mesh metric {}", metric); forward(metric); SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress(); - ServiceMetaInfo targetService = find( - upstreamRemoteAddressSocketAddress.getAddress(), upstreamRemoteAddressSocketAddress - .getPortValue()); + ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress()); long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte()); long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte()); @@ -409,7 +287,7 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { .setTlsMode(NON_TLS) .setDetectPoint(DetectPoint.client); - LOGGER.debug("Transformed ingress outbound mesh metric {}", outboundMetric); + log.debug("Transformed ingress outbound mesh metric {}", outboundMetric); forward(outboundMetric); } } @@ -435,17 +313,8 @@ public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { /** * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found. */ - protected ServiceMetaInfo find(String ip, int port) { - Map<String, ServiceMetaInfo> map = ipServiceMap.get(); - if (map == null) { - LOGGER.debug("Unknown ip {}, ip -> service is null", ip); - return ServiceMetaInfo.UNKNOWN; - } - if (map.containsKey(ip)) { - return map.get(ip); - } - LOGGER.debug("Unknown ip {}, ip -> service is {}", ip, map); - return ServiceMetaInfo.UNKNOWN; + protected ServiceMetaInfo find(String ip) { + return serviceRegistry.findService(ip); } protected void forward(ServiceMeshMetric.Builder metric) { diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java index 41ba0e5..c449c2f 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java @@ -25,6 +25,8 @@ import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; +import static java.util.Objects.nonNull; + @Getter @Setter @ToString @@ -41,6 +43,13 @@ public class ServiceMetaInfo { this.serviceInstanceName = serviceInstanceName; } + /** + * @return {@code true} if this object is completely constructed, otherwise {@code false}. + */ + public boolean isComplete() { + return nonNull(serviceName) && nonNull(serviceInstanceName); + } + @Setter @Getter @RequiredArgsConstructor diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java index e6c797f..dea5f70 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java @@ -18,7 +18,6 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; -import com.google.common.collect.ImmutableMap; import com.google.protobuf.util.JsonFormat; import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; import java.io.IOException; @@ -34,6 +33,10 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class K8sHTTPAnalysisTest { private MockK8sAnalysis analysis; @@ -147,12 +150,12 @@ public class K8sHTTPAnalysisTest { @Override public void init(EnvoyMetricReceiverConfig config) { - getIpServiceMap().set( - ImmutableMap.of("10.44.2.56", new ServiceMetaInfo("ingress", "ingress-Inst"), "10.44.2.54", - new ServiceMetaInfo("productpage", "productpage-Inst"), "10.44.6.66", - new ServiceMetaInfo("detail", "detail-Inst"), "10.44.2.55", - new ServiceMetaInfo("review", "detail-Inst") - )); + serviceRegistry = mock(K8SServiceRegistry.class); + when(serviceRegistry.findService(anyString())).thenReturn(ServiceMetaInfo.UNKNOWN); + when(serviceRegistry.findService("10.44.2.56")).thenReturn(new ServiceMetaInfo("ingress", "ingress-Inst")); + when(serviceRegistry.findService("10.44.2.54")).thenReturn(new ServiceMetaInfo("productpage", "productpage-Inst")); + when(serviceRegistry.findService("10.44.6.66")).thenReturn(new ServiceMetaInfo("detail", "detail-Inst")); + when(serviceRegistry.findService("10.44.2.55")).thenReturn(new ServiceMetaInfo("review", "detail-Inst")); } @Override diff --git a/test/e2e-mesh/e2e-istio/scripts/istio.sh b/test/e2e-mesh/e2e-istio/scripts/istio.sh index ffd432c..2578818 100644 --- a/test/e2e-mesh/e2e-istio/scripts/istio.sh +++ b/test/e2e-mesh/e2e-istio/scripts/istio.sh @@ -21,7 +21,6 @@ set -ex -curl -L https://istio.io/downloadIstio | sh - -sudo mv $PWD/istio-$ISTIO_VERSION/bin/istioctl /usr/local/bin/ +istioctl version || (curl -L https://istio.io/downloadIstio | sh - && sudo mv $PWD/istio-$ISTIO_VERSION/bin/istioctl /usr/local/bin/) istioctl install $@ kubectl label namespace default istio-injection=enabled