This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch feature/als-mx in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 0019f6caa8ed00fce2aca4dbba58b9d5151d3cfe Author: kezhenxu94 <kezhenx...@163.com> AuthorDate: Mon Nov 2 18:53:39 2020 +0800 Refactor Kubernetes analyzer --- .github/workflows/e2e.istio.yaml | 2 + .../envoy/AccessLogServiceGRPCHandler.java | 14 +- .../server/receiver/envoy/als/ALSHTTPAnalysis.java | 4 +- ...SHTTPAnalysis.java => AbstractALSAnalyzer.java} | 32 +- .../envoy/als/LogEntry2MetricsAdapter.java | 132 ++++++++ .../als/k8s/K8sALSServiceMeshHTTPAnalysis.java | 334 --------------------- .../als/k8s/K8sALSServiceMeshHTTPAnalyzer.java | 186 ++++++++++++ ...g.oap.server.receiver.envoy.als.ALSHTTPAnalysis | 2 +- .../envoy/als/k8s/K8sHTTPAnalysisTest.java | 41 +-- 9 files changed, 364 insertions(+), 383 deletions(-) diff --git a/.github/workflows/e2e.istio.yaml b/.github/workflows/e2e.istio.yaml index 34d590e..43c051c 100644 --- a/.github/workflows/e2e.istio.yaml +++ b/.github/workflows/e2e.istio.yaml @@ -19,10 +19,12 @@ name: Istio on: pull_request: paths: + - '**' - '!**.md' push: branches: - master + - feature/als-mx env: SKIP_TEST: true diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java index b715ee3..99c22ed 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java @@ -25,9 +25,8 @@ import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; -import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.source.Source; -import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis; import org.apache.skywalking.oap.server.receiver.envoy.als.Role; @@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory; public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogServiceImplBase { private static final Logger LOGGER = LoggerFactory.getLogger(AccessLogServiceGRPCHandler.class); private final List<ALSHTTPAnalysis> envoyHTTPAnalysisList; - private final SourceReceiver sourceReceiver; + private final CounterMetrics counter; private final HistogramMetrics histogram; private final CounterMetrics sourceDispatcherCounter; @@ -61,8 +60,6 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS LOGGER.debug("envoy HTTP analysis: " + envoyHTTPAnalysisList); - sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); - MetricsCreator metricCreator = manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class); counter = metricCreator.createCounter("envoy_als_in_count", "The count of envoy ALS metric received", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); histogram = metricCreator.createHistogramMetric("envoy_als_in_latency", "The process latency of service ALS metric receiver", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); @@ -103,7 +100,7 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS case HTTP_LOGS: StreamAccessLogsMessage.HTTPAccessLogEntries logs = message.getHttpLogs(); - List<Source> sourceResult = new ArrayList<>(); + List<ServiceMeshMetric.Builder> sourceResult = new ArrayList<>(); for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) { logs.getLogEntryList().forEach(log -> { sourceResult.addAll(analysis.analysis(identifier, log, role)); @@ -111,7 +108,8 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS } sourceDispatcherCounter.inc(sourceResult.size()); - sourceResult.forEach(sourceReceiver::receive); + sourceResult.forEach(TelemetryDataDispatcher::process); + break; } } finally { timer.finish(); diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java index 4297f3d..426604e 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java @@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; import java.util.List; -import org.apache.skywalking.oap.server.core.source.Source; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; /** @@ -32,7 +32,7 @@ public interface ALSHTTPAnalysis { void init(EnvoyMetricReceiverConfig config); - List<Source> analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role); + List<ServiceMeshMetric.Builder> analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role); Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev); } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java similarity index 58% copy from oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java copy to oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java index 4297f3d..6f492db 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ALSHTTPAnalysis.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AbstractALSAnalyzer.java @@ -18,21 +18,27 @@ package org.apache.skywalking.oap.server.receiver.envoy.als; -import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; +import io.envoyproxy.envoy.api.v2.core.Node; import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; -import java.util.List; -import org.apache.skywalking.oap.server.core.source.Source; -import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; -/** - * Analysis source metrics from ALS - */ -public interface ALSHTTPAnalysis { - String name(); - - void init(EnvoyMetricReceiverConfig config); +public abstract class AbstractALSAnalyzer implements ALSHTTPAnalysis { - List<Source> analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role); + @Override + public Role identify(final StreamAccessLogsMessage.Identifier alsIdentifier, final Role defaultRole) { + if (alsIdentifier == null) { + return defaultRole; + } + final Node node = alsIdentifier.getNode(); + if (node == null) { + return defaultRole; + } + final String id = node.getId(); + if (id.startsWith("router~")) { + return Role.PROXY; + } else if (id.startsWith("sidecar~")) { + return Role.SIDECAR; + } + return defaultRole; + } - Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev); } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java new file mode 100644 index 0000000..f170f0f --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als; + +import com.google.protobuf.Duration; +import com.google.protobuf.Timestamp; +import com.google.protobuf.UInt32Value; +import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties; +import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties; +import java.time.Instant; +import java.util.Optional; +import org.apache.skywalking.apm.network.servicemesh.v3.Protocol; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.Optional.ofNullable; + +/** + * Adapt {@link HTTPAccessLogEntry} objects to {@link ServiceMeshMetric} builders. + */ +public final class LogEntry2MetricsAdapter { + + public static final String NON_TLS = "NONE"; + + public static final String M_TLS = "mTLS"; + + public static final String TLS = "TLS"; + + /** + * Adapt the {@code entry} partially, into a downstream metrics {@link ServiceMeshMetric.Builder}. + * + * @param entry the log entry to adapt. + * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry. + */ + public static ServiceMeshMetric.Builder adaptToDownstreamMetrics(final HTTPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); + final long startTime = formatAsLong(properties.getStartTime()); + final long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); + return adaptCommonPart(entry) + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setLatency((int) duration); + } + + /** + * Adapt the {@code entry} partially, into a upstream metrics {@link ServiceMeshMetric.Builder}. + * + * @param entry the log entry to adapt. + * @return the {@link ServiceMeshMetric.Builder} adapted from the given entry. + */ + public static ServiceMeshMetric.Builder adaptUpstreamMetrics(final HTTPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); + final long startTime = formatAsLong(properties.getTimeToFirstUpstreamTxByte()); + final long duration = formatAsLong(properties.getTimeToLastUpstreamRxByte()); + return adaptCommonPart(entry) + .setStartTime(startTime) + .setEndTime(startTime + duration) + .setLatency((int) duration); + } + + private static ServiceMeshMetric.Builder adaptCommonPart(final HTTPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); + final String endpoint = ofNullable(entry.getRequest()).map(HTTPRequestProperties::getPath).orElse("/"); + final int responseCode = ofNullable(entry.getResponse()).map(HTTPResponseProperties::getResponseCode).map(UInt32Value::getValue).orElse(200); + final boolean status = responseCode >= 200 && responseCode < 400; + final Protocol protocol = requestProtocol(entry.getRequest()); + final String tlsMode = parseTLS(properties.getTlsProperties()); + + return ServiceMeshMetric.newBuilder() + .setEndpoint(endpoint) + .setResponseCode(Math.toIntExact(responseCode)) + .setStatus(status) + .setProtocol(protocol) + .setTlsMode(tlsMode); + } + + private static long formatAsLong(final Timestamp timestamp) { + return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); + } + + private static long formatAsLong(final Duration duration) { + return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli(); + } + + private static Protocol requestProtocol(final HTTPRequestProperties request) { + if (request == null) { + return Protocol.HTTP; + } + final String scheme = request.getScheme(); + if (scheme.startsWith("http")) { + return Protocol.HTTP; + } + return Protocol.gRPC; + } + + private static String parseTLS(final TLSProperties properties) { + if (properties == null) { + return NON_TLS; + } + if (isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties()) + .orElse(TLSProperties.CertificateProperties.newBuilder().build()) + .getSubject())) { + return NON_TLS; + } + if (isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties()) + .orElse(TLSProperties.CertificateProperties.newBuilder().build()) + .getSubject())) { + return TLS; + } + return M_TLS; + } + +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java deleted file mode 100644 index f06d941..0000000 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.receiver.envoy.als.k8s; - -import com.google.common.base.Strings; -import com.google.protobuf.Duration; -import com.google.protobuf.Timestamp; -import io.envoyproxy.envoy.api.v2.core.Address; -import io.envoyproxy.envoy.api.v2.core.Node; -import io.envoyproxy.envoy.api.v2.core.SocketAddress; -import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon; -import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; -import io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties; -import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties; -import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties; -import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher; -import org.apache.skywalking.apm.network.common.v3.DetectPoint; -import org.apache.skywalking.apm.network.servicemesh.v3.Protocol; -import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; -import org.apache.skywalking.oap.server.core.source.Source; -import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; -import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis; -import org.apache.skywalking.oap.server.receiver.envoy.als.Role; -import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; - -/** - * Analysis log based on ingress and mesh scenarios. - */ -@Slf4j -public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis { - private static final String NON_TLS = "NONE"; - - private static final String M_TLS = "mTLS"; - - private static final String TLS = "TLS"; - - protected K8SServiceRegistry serviceRegistry; - - @Override - public String name() { - return "k8s-mesh"; - } - - @Override - @SneakyThrows - public void init(EnvoyMetricReceiverConfig config) { - serviceRegistry = new K8SServiceRegistry(config); - serviceRegistry.start(); - } - - @Override - public List<Source> analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) { - if (serviceRegistry.isEmpty()) { - return Collections.emptyList(); - } - switch (role) { - case PROXY: - analysisProxy(identifier, entry); - break; - case SIDECAR: - return analysisSideCar(identifier, entry); - } - - return Collections.emptyList(); - } - - protected List<Source> analysisSideCar(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) { - List<Source> sources = new ArrayList<>(); - AccessLogCommon properties = entry.getCommonProperties(); - if (properties != null) { - String cluster = properties.getUpstreamCluster(); - if (cluster != null) { - long startTime = formatAsLong(properties.getStartTime()); - long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); - - HTTPRequestProperties request = entry.getRequest(); - String endpoint = "/"; - Protocol protocol = Protocol.HTTP; - if (request != null) { - endpoint = request.getPath(); - String schema = request.getScheme(); - if ("http".equals(schema) || "https".equals(schema)) { - protocol = Protocol.HTTP; - } else { - protocol = Protocol.gRPC; - } - } - HTTPResponseProperties response = entry.getResponse(); - int responseCode = 200; - if (response != null) { - responseCode = response.getResponseCode().getValue(); - } - boolean status = responseCode >= 200 && responseCode < 400; - - Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); - ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress()); - Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); - ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress()); - String tlsMode = parseTLS(properties.getTlsProperties()); - - ServiceMeshMetric.Builder metric = null; - if (cluster.startsWith("inbound|")) { - // Server side - if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) { - // Ingress -> sidecar(server side) - // Mesh telemetry without source, the relation would be generated. - metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setDestServiceName(localService.getServiceName()) - .setDestServiceInstance(localService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); - - log.debug("Transformed ingress->sidecar inbound mesh metric {}", metric); - } else { - // sidecar -> sidecar(server side) - metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName(downstreamService.getServiceName()) - .setSourceServiceInstance(downstreamService.getServiceInstanceName()) - .setDestServiceName(localService.getServiceName()) - .setDestServiceInstance(localService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); - - log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); - } - } else if (cluster.startsWith("outbound|")) { - // sidecar(client side) -> sidecar - Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); - ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress()); - - metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName(downstreamService.getServiceName()) - .setSourceServiceInstance(downstreamService.getServiceInstanceName()) - .setDestServiceName(destService.getServiceName()) - .setDestServiceInstance(destService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.client); - - log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); - } - - Optional.ofNullable(metric).ifPresent(this::forward); - } - } - return sources; - } - - private String parseTLS(TLSProperties properties) { - if (properties == null) { - return NON_TLS; - } - if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties()) - .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { - return NON_TLS; - } - if (Strings.isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties()) - .orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject())) { - return TLS; - } - return M_TLS; - } - - protected void analysisProxy(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry) { - AccessLogCommon properties = entry.getCommonProperties(); - if (properties != null) { - Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); - Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); - Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); - if (downstreamLocalAddress != null && downstreamRemoteAddress != null && upstreamRemoteAddress != null) { - SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress(); - ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress()); - - SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress(); - ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress()); - - long startTime = formatAsLong(properties.getStartTime()); - long duration = formatAsLong(properties.getTimeToLastDownstreamTxByte()); - - HTTPRequestProperties request = entry.getRequest(); - String endpoint = "/"; - Protocol protocol = Protocol.HTTP; - if (request != null) { - endpoint = request.getPath(); - String schema = request.getScheme(); - if ("http".equals(schema) || "https".equals(schema)) { - protocol = Protocol.HTTP; - } else { - protocol = Protocol.gRPC; - } - } - HTTPResponseProperties response = entry.getResponse(); - int responseCode = 200; - if (response != null) { - responseCode = response.getResponseCode().getValue(); - } - boolean status = responseCode >= 200 && responseCode < 400; - String tlsMode = parseTLS(properties.getTlsProperties()); - - ServiceMeshMetric.Builder metric = ServiceMeshMetric.newBuilder() - .setStartTime(startTime) - .setEndTime(startTime + duration) - .setSourceServiceName(outside.getServiceName()) - .setSourceServiceInstance( - outside.getServiceInstanceName()) - .setDestServiceName(ingress.getServiceName()) - .setDestServiceInstance( - ingress.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency((int) duration) - .setResponseCode(Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - .setTlsMode(tlsMode) - .setDetectPoint(DetectPoint.server); - - log.debug("Transformed ingress inbound mesh metric {}", metric); - forward(metric); - - SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress(); - ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress()); - - long outboundStartTime = startTime + formatAsLong(properties.getTimeToFirstUpstreamTxByte()); - long outboundEndTime = startTime + formatAsLong(properties.getTimeToLastUpstreamRxByte()); - - ServiceMeshMetric.Builder outboundMetric = ServiceMeshMetric.newBuilder() - .setStartTime(outboundStartTime) - .setEndTime(outboundEndTime) - .setSourceServiceName( - ingress.getServiceName()) - .setSourceServiceInstance( - ingress.getServiceInstanceName()) - .setDestServiceName( - targetService.getServiceName()) - .setDestServiceInstance( - targetService.getServiceInstanceName()) - .setEndpoint(endpoint) - .setLatency( - (int) (outboundEndTime - outboundStartTime)) - .setResponseCode( - Math.toIntExact(responseCode)) - .setStatus(status) - .setProtocol(protocol) - // Can't parse it from tls properties, leave - // it to Server side. - .setTlsMode(NON_TLS) - .setDetectPoint(DetectPoint.client); - - log.debug("Transformed ingress outbound mesh metric {}", outboundMetric); - forward(outboundMetric); - } - } - } - - @Override - public Role identify(StreamAccessLogsMessage.Identifier alsIdentifier, Role prev) { - if (alsIdentifier != null) { - Node node = alsIdentifier.getNode(); - if (node != null) { - String id = node.getId(); - if (id.startsWith("router~")) { - return Role.PROXY; - } else if (id.startsWith("sidecar~")) { - return Role.SIDECAR; - } - } - } - - return prev; - } - - /** - * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found. - */ - protected ServiceMetaInfo find(String ip) { - return serviceRegistry.findService(ip); - } - - protected void forward(ServiceMeshMetric.Builder metric) { - TelemetryDataDispatcher.process(metric); - } - - private long formatAsLong(Timestamp timestamp) { - return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); - } - - private long formatAsLong(Duration duration) { - return Instant.ofEpochSecond(duration.getSeconds(), duration.getNanos()).toEpochMilli(); - } -} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalyzer.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalyzer.java new file mode 100644 index 0000000..5d933d3 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalyzer.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.k8s; + +import io.envoyproxy.envoy.api.v2.core.Address; +import io.envoyproxy.envoy.api.v2.core.SocketAddress; +import io.envoyproxy.envoy.data.accesslog.v2.AccessLogCommon; +import io.envoyproxy.envoy.data.accesslog.v2.HTTPAccessLogEntry; +import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.common.v3.DetectPoint; +import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; +import org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig; +import org.apache.skywalking.oap.server.receiver.envoy.als.AbstractALSAnalyzer; +import org.apache.skywalking.oap.server.receiver.envoy.als.Role; +import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo; + +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.NON_TLS; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.adaptToDownstreamMetrics; +import static org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter.adaptUpstreamMetrics; + +/** + * Analysis log based on ingress and mesh scenarios. + */ +@Slf4j +public class K8sALSServiceMeshHTTPAnalyzer extends AbstractALSAnalyzer { + protected K8SServiceRegistry serviceRegistry; + + @Override + public String name() { + return "k8s-mesh"; + } + + @Override + @SneakyThrows + public void init(EnvoyMetricReceiverConfig config) { + serviceRegistry = new K8SServiceRegistry(config); + serviceRegistry.start(); + } + + @Override + public List<ServiceMeshMetric.Builder> analysis(StreamAccessLogsMessage.Identifier identifier, HTTPAccessLogEntry entry, Role role) { + if (serviceRegistry.isEmpty()) { + return Collections.emptyList(); + } + switch (role) { + case PROXY: + return analyzeProxy(entry); + case SIDECAR: + return analyzeSideCar(entry); + } + + return Collections.emptyList(); + } + + protected List<ServiceMeshMetric.Builder> analyzeSideCar(final HTTPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); + if (properties == null) { + return Collections.emptyList(); + } + final String cluster = properties.getUpstreamCluster(); + if (cluster == null) { + return Collections.emptyList(); + } + + final List<ServiceMeshMetric.Builder> sources = new ArrayList<>(); + + final Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); + final ServiceMetaInfo downstreamService = find(downstreamRemoteAddress.getSocketAddress().getAddress()); + final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + final ServiceMetaInfo localService = find(downstreamLocalAddress.getSocketAddress().getAddress()); + + ServiceMeshMetric.Builder metric = null; + if (cluster.startsWith("inbound|")) { + // Server side + metric = adaptToDownstreamMetrics(entry) + .setDestServiceName(localService.getServiceName()) + .setDestServiceInstance(localService.getServiceInstanceName()) + .setDetectPoint(DetectPoint.server); + if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) { + // Ingress -> sidecar(server side) + // Mesh telemetry without source, the relation would be generated. + + log.debug("Transformed ingress->sidecar inbound mesh metric {}", metric); + } else { + // sidecar -> sidecar(server side) + metric.setSourceServiceName(downstreamService.getServiceName()) + .setSourceServiceInstance(downstreamService.getServiceInstanceName()); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); + } + } else if (cluster.startsWith("outbound|")) { + // sidecar(client side) -> sidecar + Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + ServiceMetaInfo destService = find(upstreamRemoteAddress.getSocketAddress().getAddress()); + + metric = adaptToDownstreamMetrics(entry) + .setSourceServiceName(downstreamService.getServiceName()) + .setSourceServiceInstance(downstreamService.getServiceInstanceName()) + .setDestServiceName(destService.getServiceName()) + .setDestServiceInstance(destService.getServiceInstanceName()) + .setDetectPoint(DetectPoint.client); + + log.debug("Transformed sidecar->sidecar(server side) inbound mesh metric {}", metric); + } + + Optional.ofNullable(metric).ifPresent(sources::add); + + return sources; + } + + protected List<ServiceMeshMetric.Builder> analyzeProxy(final HTTPAccessLogEntry entry) { + final AccessLogCommon properties = entry.getCommonProperties(); + if (properties == null) { + return Collections.emptyList(); + } + final Address downstreamLocalAddress = properties.getDownstreamLocalAddress(); + final Address downstreamRemoteAddress = properties.getDownstreamRemoteAddress(); + final Address upstreamRemoteAddress = properties.getUpstreamRemoteAddress(); + if (downstreamLocalAddress == null || downstreamRemoteAddress == null || upstreamRemoteAddress == null) { + return Collections.emptyList(); + } + + List<ServiceMeshMetric.Builder> result = new ArrayList<>(2); + SocketAddress downstreamRemoteAddressSocketAddress = downstreamRemoteAddress.getSocketAddress(); + ServiceMetaInfo outside = find(downstreamRemoteAddressSocketAddress.getAddress()); + + SocketAddress downstreamLocalAddressSocketAddress = downstreamLocalAddress.getSocketAddress(); + ServiceMetaInfo ingress = find(downstreamLocalAddressSocketAddress.getAddress()); + + ServiceMeshMetric.Builder metric = adaptToDownstreamMetrics(entry) + .setSourceServiceName(outside.getServiceName()) + .setSourceServiceInstance(outside.getServiceInstanceName()) + .setDestServiceName(ingress.getServiceName()) + .setDestServiceInstance(ingress.getServiceInstanceName()) + .setDetectPoint(DetectPoint.server); + + log.debug("Transformed ingress inbound mesh metric {}", metric); + result.add(metric); + + SocketAddress upstreamRemoteAddressSocketAddress = upstreamRemoteAddress.getSocketAddress(); + ServiceMetaInfo targetService = find(upstreamRemoteAddressSocketAddress.getAddress()); + + ServiceMeshMetric.Builder outboundMetric = adaptUpstreamMetrics(entry) + .setSourceServiceName(ingress.getServiceName()) + .setSourceServiceInstance(ingress.getServiceInstanceName()) + .setDestServiceName(targetService.getServiceName()) + .setDestServiceInstance(targetService.getServiceInstanceName()) + // Can't parse it from tls properties, leave it to Server side. + .setTlsMode(NON_TLS) + .setDetectPoint(DetectPoint.client); + + log.debug("Transformed ingress outbound mesh metric {}", outboundMetric); + result.add(outboundMetric); + + return result; + } + + /** + * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to represent not found. + */ + protected ServiceMetaInfo find(String ip) { + return serviceRegistry.findService(ip); + } +} diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis index 215d7a0..f330757 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis @@ -17,4 +17,4 @@ # -org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8sALSServiceMeshHTTPAnalysis +org.apache.skywalking.oap.server.receiver.envoy.als.k8s.K8sALSServiceMeshHTTPAnalyzer diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java index 7ad64a5..871dac9 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sHTTPAnalysisTest.java @@ -23,7 +23,6 @@ import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.util.ArrayList; import java.util.List; import org.apache.skywalking.apm.network.common.v3.DetectPoint; import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric; @@ -41,11 +40,11 @@ import static org.mockito.Mockito.when; public class K8sHTTPAnalysisTest { - private MockK8sAnalysis analysis; + private MockK8SAnalyzer analysis; @Before public void setUp() { - analysis = new MockK8sAnalysis(); + analysis = new MockK8SAnalyzer(); analysis.init(null); } @@ -77,16 +76,16 @@ public class K8sHTTPAnalysisTest { StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); JsonFormat.parser().merge(isr, requestBuilder); - analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY); + List<ServiceMeshMetric.Builder> result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY); - Assert.assertEquals(2, analysis.metrics.size()); + Assert.assertEquals(2, result.size()); - ServiceMeshMetric.Builder incoming = analysis.metrics.get(0); + ServiceMeshMetric.Builder incoming = result.get(0); Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName()); Assert.assertEquals("ingress", incoming.getDestServiceName()); Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); - ServiceMeshMetric.Builder outgoing = analysis.metrics.get(1); + ServiceMeshMetric.Builder outgoing = result.get(1); Assert.assertEquals("ingress", outgoing.getSourceServiceName()); Assert.assertEquals("productpage", outgoing.getDestServiceName()); Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint()); @@ -99,12 +98,11 @@ public class K8sHTTPAnalysisTest { StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); JsonFormat.parser().merge(isr, requestBuilder); - analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs() - .getLogEntry(0), Role.SIDECAR); + List<ServiceMeshMetric.Builder> result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); - Assert.assertEquals(1, analysis.metrics.size()); + Assert.assertEquals(1, result.size()); - ServiceMeshMetric.Builder incoming = analysis.metrics.get(0); + ServiceMeshMetric.Builder incoming = result.get(0); Assert.assertEquals("", incoming.getSourceServiceName()); Assert.assertEquals("productpage", incoming.getDestServiceName()); Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); @@ -117,12 +115,11 @@ public class K8sHTTPAnalysisTest { StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); JsonFormat.parser().merge(isr, requestBuilder); - analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs() - .getLogEntry(0), Role.SIDECAR); + List<ServiceMeshMetric.Builder> result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); - Assert.assertEquals(1, analysis.metrics.size()); + Assert.assertEquals(1, result.size()); - ServiceMeshMetric.Builder incoming = analysis.metrics.get(0); + ServiceMeshMetric.Builder incoming = result.get(0); Assert.assertEquals("productpage", incoming.getSourceServiceName()); Assert.assertEquals("review", incoming.getDestServiceName()); Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint()); @@ -135,20 +132,18 @@ public class K8sHTTPAnalysisTest { StreamAccessLogsMessage.Builder requestBuilder = StreamAccessLogsMessage.newBuilder(); JsonFormat.parser().merge(isr, requestBuilder); - analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs() - .getLogEntry(0), Role.SIDECAR); + List<ServiceMeshMetric.Builder> result = this.analysis.analysis(requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR); - Assert.assertEquals(1, analysis.metrics.size()); + Assert.assertEquals(1, result.size()); - ServiceMeshMetric.Builder incoming = analysis.metrics.get(0); + ServiceMeshMetric.Builder incoming = result.get(0); Assert.assertEquals("productpage", incoming.getSourceServiceName()); Assert.assertEquals("detail", incoming.getDestServiceName()); Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint()); } } - public static class MockK8sAnalysis extends K8sALSServiceMeshHTTPAnalysis { - private List<ServiceMeshMetric.Builder> metrics = new ArrayList<>(); + public static class MockK8SAnalyzer extends K8sALSServiceMeshHTTPAnalyzer { @Override public void init(EnvoyMetricReceiverConfig config) { @@ -160,10 +155,6 @@ public class K8sHTTPAnalysisTest { when(serviceRegistry.findService("10.44.2.55")).thenReturn(new ServiceMetaInfo("review", "detail-Inst")); } - @Override - protected void forward(ServiceMeshMetric.Builder metric) { - metrics.add(metric); - } } private static InputStream getResourceAsStream(final String resource) {