This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch zipkin-trace in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
commit 6a0729d175fee086b5d36602f087178052e53f60 Author: Wu Sheng <[email protected]> AuthorDate: Thu Mar 28 11:20:45 2019 -0700 Codebase for zipkin span persistence. --- .../oap/server/core/source/DefaultScopeDefine.java | 1 + .../zipkin-receiver-plugin/pom.xml | 5 + .../server/receiver/zipkin/CoreRegisterLinker.java | 11 +- .../receiver/zipkin/ZipkinReceiverConfig.java | 47 +-------- .../receiver/zipkin/ZipkinReceiverProvider.java | 15 +-- .../{ => analysis}/Receiver2AnalysisBridge.java | 8 +- .../ZipkinSkyWalkingTransfer.java} | 43 +------- .../{ => analysis}/ZipkinTraceOSInfoBuilder.java | 2 +- .../zipkin/{ => analysis}/cache/CacheFactory.java | 4 +- .../zipkin/{ => analysis}/cache/ISpanCache.java | 2 +- .../cache/caffeine/CaffeineSpanCache.java | 8 +- .../{ => analysis}/data/SkyWalkingTrace.java | 2 +- .../zipkin/{ => analysis}/data/ZipkinTrace.java | 2 +- .../{ => analysis}/transform/SegmentBuilder.java | 7 +- .../{ => analysis}/transform/SegmentListener.java | 4 +- .../transform/Zipkin2SkyWalkingTransfer.java | 6 +- .../receiver/zipkin/handler/SpanProcessor.java | 44 ++++---- .../zipkin/handler/SpanV1JettyHandler.java | 25 +++-- .../zipkin/handler/SpanV2JettyHandler.java | 21 ++-- .../server/receiver/zipkin/trace/SpanForward.java | 95 +++++++++++++++++ .../transform/SpringSleuthSegmentBuilderTest.java | 4 +- oap-server/server-starter/pom.xml | 5 + oap-server/server-storage-plugin/pom.xml | 1 + .../StorageModuleElasticsearchConfig.java | 4 +- .../{ => storage-zipkin-plugin}/pom.xml | 20 ++-- .../server/storage/plugin/zipkin/ZipkinSpan.java | 52 ++++++++++ .../storage/plugin/zipkin/ZipkinSpanRecord.java | 112 ++++++++++++++++++++ .../plugin/zipkin/ZipkinSpanRecordDispatcher.java | 48 +++++++++ .../StorageModuleElasticsearchProvider.java | 114 +++++++++++++++++++++ ...alking.oap.server.library.module.ModuleProvider | 19 ++++ 30 files changed, 575 insertions(+), 156 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java index 78b3565..aec1665 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java @@ -59,6 +59,7 @@ public class DefaultScopeDefine { public static final int SERVICE_INSTANCE_CLR_GC = 20; public static final int SERVICE_INSTANCE_CLR_THREAD = 21; public static final int ENVOY_INSTANCE_METRIC = 22; + public static final int ZIPKIN_SPAN = 23; /** * Catalog of scope, the indicator processor could use this to group all generated indicators by oal tool. diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml index d2adbc1..a332f73 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml @@ -37,6 +37,11 @@ </dependency> <dependency> <groupId>org.apache.skywalking</groupId> + <artifactId>storage-zipkin-plugin</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> <artifactId>skywalking-register-receiver-plugin</artifactId> <version>${project.version}</version> </dependency> diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java index 2459199..57e4bc7 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java @@ -19,14 +19,14 @@ package org.apache.skywalking.oap.server.receiver.zipkin; import org.apache.skywalking.oap.server.core.CoreModule; -import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; -import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.*; import org.apache.skywalking.oap.server.library.module.ModuleManager; public class CoreRegisterLinker { private static volatile ModuleManager MODULE_MANAGER; private static volatile IServiceInventoryRegister SERVICE_INVENTORY_REGISTER; private static volatile IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER; + private static volatile IEndpointInventoryRegister ENDPOINT_INVENTORY_REGISTER; public static void setModuleManager(ModuleManager moduleManager) { CoreRegisterLinker.MODULE_MANAGER = moduleManager; @@ -45,4 +45,11 @@ public class CoreRegisterLinker { } return SERVICE_INSTANCE_INVENTORY_REGISTER; } + + public static IEndpointInventoryRegister getEndpointInventoryRegister() { + if (ENDPOINT_INVENTORY_REGISTER == null) { + ENDPOINT_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(IEndpointInventoryRegister.class); + } + return ENDPOINT_INVENTORY_REGISTER; + } } diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java index a5e14af..cf9ff22 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java @@ -18,57 +18,20 @@ package org.apache.skywalking.oap.server.receiver.zipkin; +import lombok.*; import org.apache.skywalking.oap.server.library.module.ModuleConfig; /** * @author wusheng */ +@Setter +@Getter public class ZipkinReceiverConfig extends ModuleConfig { private String host; private int port; private String contextPath; - private int expireTime = 20; - private int maxCacheSize = 1_000_000; - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String getContextPath() { - return contextPath; - } - - public void setContextPath(String contextPath) { - this.contextPath = contextPath; - } - - public int getExpireTime() { - return expireTime; - } - - public void setExpireTime(int expireTime) { - this.expireTime = expireTime; - } - - public int getMaxCacheSize() { - return maxCacheSize; - } - - public void setMaxCacheSize(int maxCacheSize) { - this.maxCacheSize = maxCacheSize; - } + private boolean needAnalysis = false; + private boolean registerZipkinEndpoint = true; } diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java index 2eb2079..d9e55ef 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java @@ -27,9 +27,10 @@ import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.jetty.JettyServer; import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*; import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler; import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler; -import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer; /** * @author wusheng @@ -65,12 +66,14 @@ public class ZipkinReceiverProvider extends ModuleProvider { jettyServer = new JettyServer(config.getHost(), config.getPort(), config.getContextPath()); jettyServer.initialize(); - jettyServer.addHandler(new SpanV1JettyHandler(config)); - jettyServer.addHandler(new SpanV2JettyHandler(config)); + jettyServer.addHandler(new SpanV1JettyHandler(config, getManager())); + jettyServer.addHandler(new SpanV2JettyHandler(config, getManager())); - ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class); - Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService); - Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge); + if (config.isNeedAnalysis()) { + ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class); + Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService); + Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge); + } } @Override public void notifyAfterCompleted() throws ModuleStartException { diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java similarity index 82% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java index 1381da3..53051e5 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/Receiver2AnalysisBridge.java @@ -16,12 +16,12 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService; -import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; -import org.apache.skywalking.oap.server.receiver.zipkin.transform.SegmentListener; -import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.SegmentListener; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer; /** * Send the segments to Analysis module, like receiving segments from native SkyWalking agents. diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java similarity index 50% copy from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java copy to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java index 48774f9..c4ec894 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinSkyWalkingTransfer.java @@ -16,34 +16,15 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.handler; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; import java.util.List; -import java.util.zip.GZIPInputStream; -import javax.servlet.http.HttpServletRequest; -import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker; -import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder; -import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory; +import org.apache.skywalking.oap.server.receiver.zipkin.*; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.CacheFactory; import zipkin2.Span; -import zipkin2.codec.SpanBytesDecoder; - -public class SpanProcessor { - void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException { - InputStream inputStream = getInputStream(request); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] buffer = new byte[2048]; - int readCntOnce; - - while ((readCntOnce = inputStream.read(buffer)) >= 0) { - out.write(buffer, 0, readCntOnce); - } - - List<Span> spanList = decoder.decodeList(out.toByteArray()); +public class ZipkinSkyWalkingTransfer { + public void doTransfer(ZipkinReceiverConfig config, List<Span> spanList) { spanList.forEach(span -> { // In Zipkin, the local service name represents the application owner. String applicationCode = span.localServiceName(); @@ -59,18 +40,4 @@ public class SpanProcessor { CacheFactory.INSTANCE.get(config).addSpan(span); }); } - - private InputStream getInputStream(HttpServletRequest request) throws IOException { - InputStream requestInStream; - - String headEncoding = request.getHeader("accept-encoding"); - if (headEncoding != null && (headEncoding.indexOf("gzip") != -1)) { - requestInStream = new GZIPInputStream(request.getInputStream()); - } else { - requestInStream = request.getInputStream(); - } - - return requestInStream; - } - } diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java similarity index 94% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java index 451ba53..7d286f8 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/ZipkinTraceOSInfoBuilder.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis; import com.google.gson.JsonObject; import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java similarity index 89% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java index c4d5b06..8893001 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/CacheFactory.java @@ -16,10 +16,10 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.cache; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine.CaffeineSpanCache; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine.CaffeineSpanCache; /** * @author wusheng diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java similarity index 92% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java index b122bcf..0f9f3e4 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/ISpanCache.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.cache; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache; import zipkin2.Span; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java similarity index 91% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java index c42e710..d8dc260 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/cache/caffeine/CaffeineSpanCache.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.caffeine; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -28,9 +28,9 @@ import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.apache.skywalking.oap.server.receiver.zipkin.cache.ISpanCache; -import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace; -import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.cache.ISpanCache; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform.Zipkin2SkyWalkingTransfer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import zipkin2.Span; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java similarity index 96% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java index 38c59ec..ab8712c 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/SkyWalkingTrace.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.data; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data; import java.util.LinkedList; import java.util.List; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java similarity index 95% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java index d12beb8..e54a613 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/data/ZipkinTrace.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.data; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.data; import java.util.LinkedList; import java.util.List; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java similarity index 98% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java index 3f687a6..e5af454 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentBuilder.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.transform; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform; import com.google.common.base.Strings; import java.util.*; @@ -25,8 +25,9 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.skywalking.apm.network.common.KeyStringValuePair; import org.apache.skywalking.apm.network.language.agent.*; import org.apache.skywalking.apm.network.language.agent.v2.*; -import org.apache.skywalking.oap.server.receiver.zipkin.*; -import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.*; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace; import org.eclipse.jetty.util.StringUtil; import zipkin2.*; diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java similarity index 84% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java index 9a0b7c7..5c37c9a 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SegmentListener.java @@ -16,9 +16,9 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.transform; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform; -import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace; public interface SegmentListener { void notify(SkyWalkingTrace trace); diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java similarity index 86% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java index b41e50e..25a59ab 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/Zipkin2SkyWalkingTransfer.java @@ -16,12 +16,12 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.transform; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform; import java.util.LinkedList; import java.util.List; -import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; -import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.ZipkinTrace; import zipkin2.Span; /** diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java index 48774f9..a033f4c 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java @@ -18,20 +18,31 @@ package org.apache.skywalking.oap.server.receiver.zipkin.handler; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.util.List; import java.util.zip.GZIPInputStream; import javax.servlet.http.HttpServletRequest; -import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker; +import org.apache.skywalking.oap.server.core.cache.*; +import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder; -import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.ZipkinSkyWalkingTransfer; +import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; public class SpanProcessor { + private SourceReceiver receiver; + private ServiceInventoryCache serviceInventoryCache; + private EndpointInventoryCache endpointInventoryCache; + + public SpanProcessor(SourceReceiver receiver, + ServiceInventoryCache serviceInventoryCache, + EndpointInventoryCache endpointInventoryCache) { + this.receiver = receiver; + this.serviceInventoryCache = serviceInventoryCache; + this.endpointInventoryCache = endpointInventoryCache; + } + void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException { InputStream inputStream = getInputStream(request); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -44,20 +55,13 @@ public class SpanProcessor { List<Span> spanList = decoder.decodeList(out.toByteArray()); - spanList.forEach(span -> { - // In Zipkin, the local service name represents the application owner. - String applicationCode = span.localServiceName(); - if (applicationCode != null) { - int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode, null); - if (applicationId != 0) { - CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode, - span.timestampAsLong(), - ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode)); - } - } - - CacheFactory.INSTANCE.get(config).addSpan(span); - }); + if (config.isNeedAnalysis()) { + ZipkinSkyWalkingTransfer transfer = new ZipkinSkyWalkingTransfer(); + transfer.doTransfer(config, spanList); + } else { + SpanForward forward = new SpanForward(config, receiver, serviceInventoryCache, endpointInventoryCache); + forward.send(spanList); + } } private InputStream getInputStream(HttpServletRequest request) throws IOException { diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java index 77f38e4..8f3634f 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java @@ -18,20 +18,29 @@ package org.apache.skywalking.oap.server.receiver.zipkin.handler; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.*; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.*; +import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; import zipkin2.codec.SpanBytesDecoder; public class SpanV1JettyHandler extends JettyHandler { private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class); private ZipkinReceiverConfig config; + private SourceReceiver sourceReceiver; + private ServiceInventoryCache serviceInventoryCache; + private EndpointInventoryCache endpointInventoryCache; - public SpanV1JettyHandler(ZipkinReceiverConfig config) { + public SpanV1JettyHandler(ZipkinReceiverConfig config, + ModuleManager manager) { + sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); + serviceInventoryCache = manager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class); + endpointInventoryCache = manager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class); this.config = config; } @@ -49,10 +58,10 @@ public class SpanV1JettyHandler extends JettyHandler { String type = request.getHeader("Content-Type"); SpanBytesDecoder decoder = type != null && type.contains("/x-thrift") - ? SpanBytesDecoder.THRIFT - : SpanBytesDecoder.JSON_V1; + ? SpanBytesDecoder.THRIFT + : SpanBytesDecoder.JSON_V1; - SpanProcessor processor = new SpanProcessor(); + SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache); processor.convert(config, decoder, request); response.setStatus(202); diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java index 7c8705a..92cf3e7 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java @@ -18,12 +18,14 @@ package org.apache.skywalking.oap.server.receiver.zipkin.handler; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.*; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.*; +import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler; import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; import zipkin2.codec.SpanBytesDecoder; /** @@ -33,8 +35,15 @@ public class SpanV2JettyHandler extends JettyHandler { private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class); private ZipkinReceiverConfig config; + private SourceReceiver sourceReceiver; + private ServiceInventoryCache serviceInventoryCache; + private EndpointInventoryCache endpointInventoryCache; - public SpanV2JettyHandler(ZipkinReceiverConfig config) { + public SpanV2JettyHandler(ZipkinReceiverConfig config, + ModuleManager manager) { + sourceReceiver = manager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); + serviceInventoryCache = manager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class); + endpointInventoryCache = manager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class); this.config = config; } @@ -55,7 +64,7 @@ public class SpanV2JettyHandler extends JettyHandler { ? SpanBytesDecoder.PROTO3 : SpanBytesDecoder.JSON_V2; - SpanProcessor processor = new SpanProcessor(); + SpanProcessor processor = new SpanProcessor(sourceReceiver, serviceInventoryCache, endpointInventoryCache); processor.convert(config, decoder, request); response.setStatus(202); diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java new file mode 100644 index 0000000..2665169 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.trace; + +import java.util.List; +import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.cache.*; +import org.apache.skywalking.oap.server.core.source.*; +import org.apache.skywalking.oap.server.library.util.BooleanUtils; +import org.apache.skywalking.oap.server.receiver.zipkin.*; +import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpan; +import zipkin2.Span; + +/** + * @author wusheng + */ +public class SpanForward { + private ZipkinReceiverConfig config; + private SourceReceiver receiver; + private ServiceInventoryCache serviceInventoryCache; + private EndpointInventoryCache endpointInventoryCache; + + public SpanForward(ZipkinReceiverConfig config, SourceReceiver receiver, + ServiceInventoryCache serviceInventoryCache, + EndpointInventoryCache endpointInventoryCache) { + this.config = config; + this.receiver = receiver; + this.serviceInventoryCache = serviceInventoryCache; + this.endpointInventoryCache = endpointInventoryCache; + } + + public void send(List<Span> spanList) { + spanList.forEach(span -> { + ZipkinSpan zipkinSpan = new ZipkinSpan(); + zipkinSpan.setTraceId(span.traceId()); + zipkinSpan.setSpanId(span.id()); + String serviceName = span.localServiceName(); + int serviceId = Const.NONE; + if (!StringUtil.isEmpty(serviceName)) { + serviceId = serviceInventoryCache.getServiceId(serviceName); + if (serviceId != Const.NONE) { + zipkinSpan.setServiceId(serviceId); + } else { + /** + * Only register, but don't wait. + * For this span, service id will be missed. + */ + CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, null); + } + } + + String spanName = span.name(); + Span.Kind kind = span.kind(); + switch (kind) { + case SERVER: + case CONSUMER: + if (!StringUtil.isEmpty(spanName) && serviceId != Const.NONE) { + int endpointId = endpointInventoryCache.getEndpointId(serviceId, spanName, + DetectPoint.SERVER.ordinal()); + if (endpointId != Const.NONE) { + zipkinSpan.setEndpointId(endpointId); + } else if (config.isRegisterZipkinEndpoint()) { + CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(serviceId, spanName, DetectPoint.SERVER); + } + } + } + if (!StringUtil.isEmpty(spanName)) { + zipkinSpan.setEndpointName(spanName); + } + + zipkinSpan.setStartTime(span.timestampAsLong()); + zipkinSpan.setEndTime(span.timestampAsLong() + span.durationAsLong()); + zipkinSpan.setIsError(BooleanUtils.booleanToValue(false)); + + receiver.receive(zipkinSpan); + }); + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java similarity index 98% rename from oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java rename to oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java index 9a53885..0c576c7 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/analysis/transform/SpringSleuthSegmentBuilderTest.java @@ -16,7 +16,7 @@ * */ -package org.apache.skywalking.oap.server.receiver.zipkin.transform; +package org.apache.skywalking.oap.server.receiver.zipkin.analysis.transform; import com.google.gson.JsonObject; import java.io.UnsupportedEncodingException; @@ -26,7 +26,7 @@ import org.apache.skywalking.apm.network.language.agent.v2.*; import org.apache.skywalking.oap.server.core.register.NodeType; import org.apache.skywalking.oap.server.core.register.service.*; import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker; -import org.apache.skywalking.oap.server.receiver.zipkin.data.*; +import org.apache.skywalking.oap.server.receiver.zipkin.analysis.data.*; import org.junit.*; import org.powermock.reflect.Whitebox; import zipkin2.Span; diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml index 37c5330..cfd77d3 100644 --- a/oap-server/server-starter/pom.xml +++ b/oap-server/server-starter/pom.xml @@ -126,6 +126,11 @@ <artifactId>storage-elasticsearch-plugin</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>storage-zipkin-plugin</artifactId> + <version>${project.version}</version> + </dependency> <!-- storage module --> <!-- queryBuild module --> diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/pom.xml index 4cb4fcb..99291e1 100644 --- a/oap-server/server-storage-plugin/pom.xml +++ b/oap-server/server-storage-plugin/pom.xml @@ -30,6 +30,7 @@ <modules> <module>storage-jdbc-hikaricp-plugin</module> <module>storage-elasticsearch-plugin</module> + <module>storage-zipkin-plugin</module> </modules> </project> \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java index 7d461aa..6608a58 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java @@ -59,7 +59,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { this.password = password; } - int getIndexShardsNumber() { + public int getIndexShardsNumber() { return indexShardsNumber; } @@ -67,7 +67,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig { this.indexShardsNumber = indexShardsNumber; } - int getIndexReplicasNumber() { + public int getIndexReplicasNumber() { return indexReplicasNumber; } diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml similarity index 63% copy from oap-server/server-storage-plugin/pom.xml copy to oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml index 4cb4fcb..658b47a 100644 --- a/oap-server/server-storage-plugin/pom.xml +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/pom.xml @@ -17,19 +17,23 @@ ~ --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <artifactId>oap-server</artifactId> + <artifactId>server-storage-plugin</artifactId> <groupId>org.apache.skywalking</groupId> <version>6.1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>server-storage-plugin</artifactId> - <packaging>pom</packaging> - <modules> - <module>storage-jdbc-hikaricp-plugin</module> - <module>storage-elasticsearch-plugin</module> - </modules> + <artifactId>storage-zipkin-plugin</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>storage-elasticsearch-plugin</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> </project> \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java new file mode 100644 index 0000000..0072d9f --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpan.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.zipkin; + +import lombok.*; +import org.apache.skywalking.oap.server.core.source.*; + +import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.*; + +/** + * @author peng-yongsheng + */ +@ScopeDeclaration(id = ZIPKIN_SPAN, name = "ZipkinSpan") +public class ZipkinSpan extends Source { + + @Override public int scope() { + return DefaultScopeDefine.SEGMENT; + } + + @Override public String getEntityId() { + return spanId; + } + + @Setter @Getter private String traceId; + @Setter @Getter private String spanId; + @Setter @Getter private int serviceId; + @Setter @Getter private int serviceInstanceId; + @Setter @Getter private String endpointName; + @Setter @Getter private int endpointId; + @Setter @Getter private long startTime; + @Setter @Getter private long endTime; + @Setter @Getter private int latency; + @Setter @Getter private int isError; + @Setter @Getter private byte[] dataBinary; + @Setter @Getter private int version; +} diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java new file mode 100644 index 0000000..3b97139 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecord.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.zipkin; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.record.Record; +import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType; +import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; + +@RecordType +@StorageEntity(name = ZipkinSpanRecord.INDEX_NAME, builder = ZipkinSpanRecord.Builder.class, sourceScopeId = DefaultScopeDefine.ZIPKIN_SPAN) +public class ZipkinSpanRecord extends Record { + public static final String INDEX_NAME = "segment"; + public static final String TRACE_ID = "trace_id"; + public static final String SPAN_ID = "span_id"; + public static final String SERVICE_ID = "service_id"; + public static final String SERVICE_INSTANCE_ID = "service_instance_id"; + public static final String ENDPOINT_NAME = "endpoint_name"; + public static final String ENDPOINT_ID = "endpoint_id"; + public static final String START_TIME = "start_time"; + public static final String END_TIME = "end_time"; + public static final String LATENCY = "latency"; + public static final String IS_ERROR = "is_error"; + public static final String DATA_BINARY = "data_binary"; + public static final String VERSION = "version"; + + @Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId; + @Setter @Getter @Column(columnName = SPAN_ID) @IDColumn private String spanId; + @Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId; + @Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) @IDColumn private int serviceInstanceId; + @Setter @Getter @Column(columnName = ENDPOINT_NAME, matchQuery = true) @IDColumn private String endpointName; + @Setter @Getter @Column(columnName = ENDPOINT_ID) @IDColumn private int endpointId; + @Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime; + @Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime; + @Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency; + @Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError; + @Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary; + @Setter @Getter @Column(columnName = VERSION) @IDColumn private int version; + + @Override public String id() { + return traceId + "-" + spanId; + } + + public static class Builder implements StorageBuilder<ZipkinSpanRecord> { + + @Override public Map<String, Object> data2Map(ZipkinSpanRecord storageData) { + Map<String, Object> map = new HashMap<>(); + map.put(TRACE_ID, storageData.getTraceId()); + map.put(SPAN_ID, storageData.getSpanId()); + map.put(SERVICE_ID, storageData.getServiceId()); + map.put(SERVICE_INSTANCE_ID, storageData.getServiceInstanceId()); + map.put(ENDPOINT_NAME, storageData.getEndpointName()); + map.put(ENDPOINT_ID, storageData.getEndpointId()); + map.put(START_TIME, storageData.getStartTime()); + map.put(END_TIME, storageData.getEndTime()); + map.put(LATENCY, storageData.getLatency()); + map.put(IS_ERROR, storageData.getIsError()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + if (CollectionUtils.isEmpty(storageData.getDataBinary())) { + map.put(DATA_BINARY, Const.EMPTY_STRING); + } else { + map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary()))); + } + map.put(VERSION, storageData.getVersion()); + return map; + } + + @Override public ZipkinSpanRecord map2Data(Map<String, Object> dbMap) { + ZipkinSpanRecord record = new ZipkinSpanRecord(); + record.setTraceId((String)dbMap.get(TRACE_ID)); + record.setSpanId((String)dbMap.get(SPAN_ID)); + record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue()); + record.setServiceInstanceId(((Number)dbMap.get(SERVICE_INSTANCE_ID)).intValue()); + record.setEndpointName((String)dbMap.get(ENDPOINT_NAME)); + record.setEndpointId(((Number)dbMap.get(ENDPOINT_ID)).intValue()); + record.setStartTime(((Number)dbMap.get(START_TIME)).longValue()); + record.setEndTime(((Number)dbMap.get(END_TIME)).longValue()); + record.setLatency(((Number)dbMap.get(LATENCY)).intValue()); + record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue()); + record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue()); + if (StringUtil.isEmpty((String)dbMap.get(DATA_BINARY))) { + record.setDataBinary(new byte[] {}); + } else { + record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY))); + } + record.setVersion(((Number)dbMap.get(VERSION)).intValue()); + return record; + } + } +} diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java new file mode 100644 index 0000000..71aa9b2 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/ZipkinSpanRecordDispatcher.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.zipkin; + +import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher; +import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess; + +/** + * Dispatch for Zipkin native mode spans. + * + * @author wusheng + */ +public class ZipkinSpanRecordDispatcher implements SourceDispatcher<ZipkinSpan> { + @Override public void dispatch(ZipkinSpan source) { + ZipkinSpanRecord segment = new ZipkinSpanRecord(); + segment.setTraceId(source.getTraceId()); + segment.setSpanId(source.getSpanId()); + segment.setServiceId(source.getServiceId()); + segment.setServiceInstanceId(source.getServiceInstanceId()); + segment.setEndpointName(source.getEndpointName()); + segment.setEndpointId(source.getEndpointId()); + segment.setStartTime(source.getStartTime()); + segment.setEndTime(source.getEndTime()); + segment.setLatency(source.getLatency()); + segment.setIsError(source.getIsError()); + segment.setDataBinary(source.getDataBinary()); + segment.setTimeBucket(source.getTimeBucket()); + segment.setVersion(source.getVersion()); + + RecordProcess.INSTANCE.in(segment); + } +} diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java new file mode 100644 index 0000000..472d01a --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/zipkin/elasticsearch/StorageModuleElasticsearchProvider.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch; + +import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.cache.*; +import org.apache.skywalking.oap.server.core.storage.query.*; +import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; +import org.apache.skywalking.oap.server.library.module.*; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.*; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.*; +import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.*; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class StorageModuleElasticsearchProvider extends ModuleProvider { + + private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class); + + private final StorageModuleElasticsearchConfig config; + private ElasticSearchClient elasticSearchClient; + + public StorageModuleElasticsearchProvider() { + super(); + this.config = new StorageModuleElasticsearchConfig(); + } + + @Override + public String name() { + return "elasticsearch"; + } + + @Override + public Class<? extends ModuleDefine> module() { + return StorageModule.class; + } + + @Override + public ModuleConfig createConfigBeanIfAbsent() { + return config; + } + + @Override + public void prepare() throws ServiceNotProvidedException { + if (!StringUtil.isEmpty(config.getNameSpace())) { + config.setNameSpace(config.getNameSpace().toLowerCase()); + } + elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getNameSpace(), config.getUser(), config.getPassword()); + + this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests())); + this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient)); + this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient)); + this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient)); + + this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient)); + this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient)); + this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient)); + this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient)); + + this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient)); + this.registerServiceImplementation(IMetricQueryDAO.class, new MetricQueryEsDAO(elasticSearchClient)); + this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient)); + this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient)); + this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient)); + this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient)); + this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient)); + } + + @Override + public void start() throws ModuleStartException { + try { + elasticSearchClient.connect(); + + StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber()); + installer.install(elasticSearchClient); + + RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient); + lockInstaller.install(); + } catch (StorageException e) { + throw new ModuleStartException(e.getMessage(), e); + } + } + + @Override + public void notifyAfterCompleted() { + } + + @Override + public String[] requiredModules() { + return new String[] {CoreModule.NAME}; + } +} diff --git a/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100644 index 0000000..5f028bc --- /dev/null +++ b/oap-server/server-storage-plugin/storage-zipkin-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.oap.server.storage.plugin.zipkin.elasticsearch.StorageModuleElasticsearchProvider \ No newline at end of file
