This is an automated email from the ASF dual-hosted git repository. tanjian pushed a commit to branch rest_api in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 47bb0c0247290e42923496b2b164eb75ec7cdfe6 Author: JaredTan95 <jian....@daocloud.io> AuthorDate: Sun Feb 23 10:58:16 2020 +0800 support http api. --- .../register/provider/RegisterModuleProvider.java | 11 ++ .../v6/rest/ServiceInstancePingServletHandler.java | 107 ++++++++++++++++++ .../ServiceInstanceRegisterServletHandler.java | 121 ++++++++++++++++++++ .../v6/rest/ServiceRegisterServletHandler.java | 80 +++++++++++++ .../trace/provider/TraceModuleProvider.java | 25 +++-- .../v6/rest/TraceSegmentCollectServletHandler.java | 82 ++++++++++++++ .../rest/reader/KeyStringValuePairJsonReader.java | 52 +++++++++ .../handler/v6/rest/reader/LogJsonReader.java | 58 ++++++++++ .../v6/rest/reader/ReferenceJsonReader.java | 90 +++++++++++++++ .../handler/v6/rest/reader/SegmentJsonReader.java | 80 +++++++++++++ .../handler/v6/rest/reader/SpanJsonReader.java | 124 +++++++++++++++++++++ .../handler/v6/rest/reader/StreamJsonReader.java | 26 +++++ .../handler/v6/rest/reader/TraceSegment.java | 44 ++++++++ .../handler/v6/rest/reader/UniqueIdJsonReader.java | 38 +++++++ .../v6/rest/reader/UpstreamSegmentJsonReader.java | 63 +++++++++++ 15 files changed, 994 insertions(+), 7 deletions(-) diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/RegisterModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/RegisterModuleProvider.java index 798dea3..f7b3dc0 100644 --- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/RegisterModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/RegisterModuleProvider.java @@ -20,12 +20,16 @@ package org.apache.skywalking.oap.server.receiver.register.provider; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.receiver.register.module.RegisterModule; import org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.grpc.RegisterServiceHandler; import org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.grpc.ServiceInstancePingServiceHandler; +import org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.rest.ServiceInstancePingServletHandler; +import org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.rest.ServiceInstanceRegisterServletHandler; +import org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.rest.ServiceRegisterServletHandler; import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule; public class RegisterModuleProvider extends ModuleProvider { @@ -56,6 +60,13 @@ public class RegisterModuleProvider extends ModuleProvider { .getService(GRPCHandlerRegister.class); grpcHandlerRegister.addHandler(new RegisterServiceHandler(getManager())); grpcHandlerRegister.addHandler(new ServiceInstancePingServiceHandler(getManager())); + + JettyHandlerRegister jettyHandlerRegister = getManager().find(SharingServerModule.NAME) + .provider() + .getService(JettyHandlerRegister.class); + jettyHandlerRegister.addHandler(new ServiceRegisterServletHandler(getManager())); + jettyHandlerRegister.addHandler(new ServiceInstanceRegisterServletHandler(getManager())); + jettyHandlerRegister.addHandler(new ServiceInstancePingServletHandler(getManager())); } @Override diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/rest/ServiceInstancePingServletHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/rest/ServiceInstancePingServletHandler.java new file mode 100644 index 0000000..2724036 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/rest/ServiceInstancePingServletHandler.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.rest; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.IOException; +import java.util.Objects; +import javax.servlet.http.HttpServletRequest; +import org.apache.skywalking.apm.network.common.Command; +import org.apache.skywalking.apm.network.common.Commands; +import org.apache.skywalking.apm.network.trace.component.command.ServiceResetCommand; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; +import org.apache.skywalking.oap.server.core.command.CommandService; +import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; +import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.server.jetty.ArgumentsParseException; +import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServiceInstancePingServletHandler extends JettyJsonHandler { + + private static final Logger logger = LoggerFactory.getLogger(ServiceInstancePingServletHandler.class); + + private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister; + private final ServiceInstanceInventoryCache serviceInstanceInventoryCache; + private final IServiceInventoryRegister serviceInventoryRegister; + private final CommandService commandService; + private final Gson gson = new Gson(); + + private static final String INSTANCE_ID = "instance_id"; + private static final String TIME = "time"; + private static final String SERVICE_INSTANCE_UUID = "service_instance_UUID"; + private static final String COMMANDS = "commands"; + + public ServiceInstancePingServletHandler(ModuleManager moduleManager) { + this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService( + IServiceInstanceInventoryRegister.class); + this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService( + ServiceInstanceInventoryCache.class); + this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService( + IServiceInventoryRegister.class); + this.commandService = moduleManager.find(CoreModule.NAME).provider().getService(CommandService.class); + } + + @Override + public String pathSpec() { + return "/v2/instance/heartbeat"; + } + + @Override + protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { + throw new UnsupportedOperationException(); + } + + @Override + protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException, IOException { + JsonObject responseJson = new JsonObject(); + try { + JsonObject heartBeat = gson.fromJson(req.getReader(), JsonObject.class); + int instanceId = heartBeat.get(INSTANCE_ID).getAsInt(); + long heartBeatTime = heartBeat.get(TIME).getAsLong(); + String serviceInstanceUUID = heartBeat.get(SERVICE_INSTANCE_UUID).getAsString(); + + serviceInstanceInventoryRegister.heartbeat(instanceId, heartBeatTime); + ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(instanceId); + if (Objects.nonNull(serviceInstanceInventory)) { + serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime); + } else { + logger.warn( + "Can't found service by service instance id from cache, service instance id is: {}", instanceId); + + final ServiceResetCommand resetCommand = commandService.newResetCommand( + instanceId, heartBeatTime, serviceInstanceUUID); + final Command command = resetCommand.serialize().build(); + final Commands nextCommands = Commands.newBuilder().addCommands(command).build(); + responseJson.add(COMMANDS, gson.toJsonTree(nextCommands, Commands.class)); + } + responseJson.addProperty(INSTANCE_ID, instanceId); + responseJson.addProperty(TIME, heartBeatTime); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + return responseJson; + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/rest/ServiceInstanceRegisterServletHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/rest/ServiceInstanceRegisterServletHandler.java new file mode 100644 index 0000000..cefd13d --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/rest/ServiceInstanceRegisterServletHandler.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.rest; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import javax.servlet.http.HttpServletRequest; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; +import org.apache.skywalking.oap.server.core.register.ServiceInventory; +import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.server.jetty.ArgumentsParseException; +import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.HOST_NAME; +import static org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory.PropertyUtil.PROCESS_NO; + +public class ServiceInstanceRegisterServletHandler extends JettyJsonHandler { + + private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceRegisterServletHandler.class); + + private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister; + private final ServiceInventoryCache serviceInventoryCache; + private final Gson gson = new Gson(); + + private static final String SERVICE_ID = "service_id"; + private static final String AGENT_UUID = "agent_uuid"; + private static final String REGISTER_TIME = "register_time"; + private static final String INSTANCE_ID = "instance_id"; + private static final String OS_INFO = "os_info"; + + public ServiceInstanceRegisterServletHandler(ModuleManager moduleManager) { + this.serviceInventoryCache = moduleManager.find(CoreModule.NAME) + .provider() + .getService(ServiceInventoryCache.class); + this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService( + IServiceInstanceInventoryRegister.class); + } + + @Override + public String pathSpec() { + return "/v2/instance/register"; + } + + @Override + protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { + throw new UnsupportedOperationException(); + } + + @Override + protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException { + JsonObject responseJson = new JsonObject(); + try { + JsonObject instance = gson.fromJson(req.getReader(), JsonObject.class); + int serviceId = instance.get(SERVICE_ID).getAsInt(); + String agentUUID = instance.get(AGENT_UUID).getAsString(); + long registerTime = instance.get(REGISTER_TIME).getAsLong(); + JsonObject osInfoJson = instance.get(OS_INFO).getAsJsonObject(); + + List<String> ipv4sList = new ArrayList<>(); + JsonArray ipv4s = osInfoJson.get("ipv4s").getAsJsonArray(); + ipv4s.forEach(ipv4 -> ipv4sList.add(ipv4.getAsString())); + + ServiceInventory serviceInventory = serviceInventoryCache.get(serviceId); + + JsonObject instanceProperties = new JsonObject(); + instanceProperties.addProperty( + ServiceInstanceInventory.PropertyUtil.HOST_NAME, osInfoJson.get("host_name").getAsString()); + instanceProperties.addProperty( + ServiceInstanceInventory.PropertyUtil.OS_NAME, osInfoJson.get("os_name").getAsString()); + instanceProperties.addProperty( + ServiceInstanceInventory.PropertyUtil.PROCESS_NO, osInfoJson.get("process_id").getAsInt() + ""); + instanceProperties.addProperty( + ServiceInstanceInventory.PropertyUtil.IPV4S, + ServiceInstanceInventory.PropertyUtil.ipv4sSerialize(ipv4sList) + ); + + String instanceName = serviceInventory.getName(); + if (instanceProperties.has(PROCESS_NO)) { + instanceName += "-pid:" + instanceProperties.get(PROCESS_NO).getAsString(); + } + if (instanceProperties.has(HOST_NAME)) { + instanceName += "@" + instanceProperties.get(HOST_NAME).getAsString(); + } + + int instanceId = serviceInstanceInventoryRegister.getOrCreate( + serviceId, instanceName, agentUUID, registerTime, instanceProperties); + responseJson.addProperty(SERVICE_ID, serviceId); + responseJson.addProperty(INSTANCE_ID, instanceId); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + //TODO: Commands + return responseJson; + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/rest/ServiceRegisterServletHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/rest/ServiceRegisterServletHandler.java new file mode 100644 index 0000000..c33616e --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/rest/ServiceRegisterServletHandler.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.rest; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.server.jetty.ArgumentsParseException; +import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServiceRegisterServletHandler extends JettyJsonHandler { + + private static final Logger logger = LoggerFactory.getLogger(ServiceRegisterServletHandler.class); + + private final IServiceInventoryRegister serviceInventoryRegister; + private final Gson gson = new Gson(); + private static final String SERVICE_NAME = "service_name"; + private static final String SERVICE_ID = "service_id"; + + public ServiceRegisterServletHandler(ModuleManager moduleManager) { + serviceInventoryRegister = moduleManager.find(CoreModule.NAME) + .provider() + .getService(IServiceInventoryRegister.class); + } + + @Override + public String pathSpec() { + return "/v2/service/register"; + } + + @Override + protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { + throw new UnsupportedOperationException(); + } + + @Override + protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException { + JsonArray responseArray = new JsonArray(); + try { + JsonArray serviceCodes = gson.fromJson(req.getReader(), JsonArray.class); + for (int i = 0; i < serviceCodes.size(); i++) { + JsonObject service = serviceCodes.get(i).getAsJsonObject(); + String serviceCode = service.get(SERVICE_NAME).getAsString(); + int serviceId = serviceInventoryRegister.getOrCreate(serviceCode, null); + JsonObject mapping = new JsonObject(); + mapping.addProperty(SERVICE_NAME, serviceCode); + mapping.addProperty(SERVICE_ID, serviceId); + responseArray.add(mapping); + // + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + return responseArray; + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java index 3eb3d5f..b087241 100755 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java @@ -18,10 +18,12 @@ package org.apache.skywalking.oap.server.receiver.trace.provider; +import java.io.IOException; import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule; import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; @@ -30,6 +32,7 @@ import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule; import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule; import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc.TraceSegmentReportServiceHandler; +import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.TraceSegmentCollectServletHandler; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager; @@ -41,8 +44,6 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener. import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; -import java.io.IOException; - public class TraceModuleProvider extends ModuleProvider { private final TraceServiceModuleConfig moduleConfig; @@ -80,7 +81,8 @@ public class TraceModuleProvider extends ModuleProvider { segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager(), moduleConfig); - this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2)); + this.registerServiceImplementation( + ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2)); } public SegmentParserListenerManager listenerManager() { @@ -99,19 +101,28 @@ public class TraceModuleProvider extends ModuleProvider { public void start() throws ModuleStartException { DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME) .provider() - .getService(DynamicConfigurationService.class); + .getService( + DynamicConfigurationService.class); GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME) .provider() .getService(GRPCHandlerRegister.class); + JettyHandlerRegister jettyHandlerRegister = getManager().find(SharingServerModule.NAME) + .provider() + .getService(JettyHandlerRegister.class); try { dynamicConfigurationService.registerConfigChangeWatcher(thresholds); dynamicConfigurationService.registerConfigChangeWatcher(uninstrumentedGatewaysConfig); grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandler(segmentProducerV2, getManager())); - SegmentStandardizationWorker standardizationWorkerV2 = new SegmentStandardizationWorker(getManager(), segmentProducerV2, moduleConfig - .getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig - .isBufferFileCleanWhenRestart()); + jettyHandlerRegister.addHandler(new TraceSegmentCollectServletHandler(segmentProducerV2)); + + SegmentStandardizationWorker standardizationWorkerV2 = new SegmentStandardizationWorker( + getManager(), segmentProducerV2, moduleConfig + .getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), + moduleConfig + .isBufferFileCleanWhenRestart() + ); segmentProducerV2.setStandardizationWorker(standardizationWorkerV2); } catch (IOException e) { throw new ModuleStartException(e.getMessage(), e); diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/TraceSegmentCollectServletHandler.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/TraceSegmentCollectServletHandler.java new file mode 100644 index 0000000..aadad13 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/TraceSegmentCollectServletHandler.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest; + +import com.google.gson.JsonElement; +import com.google.gson.stream.JsonReader; +import java.io.BufferedReader; +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler; +import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader.TraceSegment; +import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader.UpstreamSegmentJsonReader; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TraceSegmentCollectServletHandler extends JettyJsonHandler { + + private static final Logger logger = LoggerFactory.getLogger(TraceSegmentCollectServletHandler.class); + + private final SegmentParseV2.Producer segmentProducer; + + public TraceSegmentCollectServletHandler(SegmentParseV2.Producer segmentProducer) { + this.segmentProducer = segmentProducer; + } + + @Override + public String pathSpec() { + return "/v2/segments"; + } + + @Override + protected JsonElement doGet(HttpServletRequest req) { + throw new UnsupportedOperationException(); + } + + @Override + protected JsonElement doPost(HttpServletRequest req) { + if (logger.isDebugEnabled()) { + logger.debug("receive stream segment"); + } + + try { + BufferedReader bufferedReader = req.getReader(); + read(bufferedReader); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + + return null; + } + + private UpstreamSegmentJsonReader jsonReader = new UpstreamSegmentJsonReader(); + + private void read(BufferedReader bufferedReader) throws IOException { + JsonReader reader = new JsonReader(bufferedReader); + + reader.beginArray(); + while (reader.hasNext()) { + TraceSegment traceSegment = jsonReader.read(reader); + segmentProducer.send(traceSegment.getUpstreamSegment(), SegmentSource.Agent); + } + reader.endArray(); + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/KeyStringValuePairJsonReader.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/KeyStringValuePairJsonReader.java new file mode 100644 index 0000000..40ec472 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/KeyStringValuePairJsonReader.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader; + +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import org.apache.skywalking.apm.network.common.KeyStringValuePair; + +public class KeyStringValuePairJsonReader implements StreamJsonReader<KeyStringValuePair> { + + private static final String KEY = "key"; + private static final String VALUE = "value"; + + @Override + public KeyStringValuePair read(JsonReader reader) throws IOException { + KeyStringValuePair.Builder builder = KeyStringValuePair.newBuilder(); + + reader.beginObject(); + while (reader.hasNext()) { + switch (reader.nextName()) { + case KEY: + builder.setKey(reader.nextString()); + break; + case VALUE: + builder.setValue(reader.nextString()); + break; + default: + reader.skipValue(); + break; + } + } + reader.endObject(); + + return builder.build(); + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/LogJsonReader.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/LogJsonReader.java new file mode 100644 index 0000000..9b24c5c --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/LogJsonReader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader; + +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import org.apache.skywalking.apm.network.language.agent.v2.Log; + +public class LogJsonReader implements StreamJsonReader<Log> { + + private KeyStringValuePairJsonReader keyStringValuePairJsonReader = new KeyStringValuePairJsonReader(); + + private static final String TIME = "time"; + private static final String DATA = "data"; + + @Override + public Log read(JsonReader reader) throws IOException { + Log.Builder builder = Log.newBuilder(); + + reader.beginObject(); + while (reader.hasNext()) { + switch (reader.nextName()) { + case TIME: + builder.setTime(reader.nextLong()); + break; + case DATA: + reader.beginArray(); + while (reader.hasNext()) { + builder.addData(keyStringValuePairJsonReader.read(reader)); + } + reader.endArray(); + break; + default: + reader.skipValue(); + break; + } + } + reader.endObject(); + + return builder.build(); + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/ReferenceJsonReader.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/ReferenceJsonReader.java new file mode 100644 index 0000000..ed4653e --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/ReferenceJsonReader.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader; + +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import org.apache.skywalking.apm.network.language.agent.v2.SegmentReference; + +public class ReferenceJsonReader implements StreamJsonReader<SegmentReference> { + + private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader(); + + private static final String REF_TYPE_VALUE = "ref_type"; + private static final String PARENT_TRACE_SEGMENT_ID = "parent_trace_segment_id"; + private static final String PARENT_SPAN_ID = "parent_span_id"; + private static final String PARENT_SERVICE_INSTANCE_ID = "parent_service_instance_id"; + private static final String NETWORK_ADDRESS = "network_address"; + private static final String NETWORK_ADDRESS_ID = "network_address_id"; + private static final String ENTRY_SERVICE_INSTANCE_ID = "entry_service_instance_id"; + private static final String ENTRY_ENDPOINT = "entry_endpoint"; + private static final String ENTRY_ENDPOINT_ID = "entry_endpoint_id"; + private static final String PARENT_ENDPOINT = "parent_endpoint"; + private static final String PARENT_ENDPOINT_ID = "parent_endpoint_id"; + + @Override + public SegmentReference read(JsonReader reader) throws IOException { + SegmentReference.Builder builder = SegmentReference.newBuilder(); + + reader.beginObject(); + while (reader.hasNext()) { + switch (reader.nextName()) { + case REF_TYPE_VALUE: + builder.setRefTypeValue(reader.nextInt()); + break; + case PARENT_TRACE_SEGMENT_ID: + builder.setParentTraceSegmentId(uniqueIdJsonReader.read(reader)); + break; + case PARENT_SPAN_ID: + builder.setParentSpanId(reader.nextInt()); + break; + case PARENT_SERVICE_INSTANCE_ID: + builder.setParentServiceInstanceId(reader.nextInt()); + break; + case NETWORK_ADDRESS: + builder.setNetworkAddress(reader.nextString()); + break; + case NETWORK_ADDRESS_ID: + builder.setNetworkAddressId(reader.nextInt()); + break; + case ENTRY_SERVICE_INSTANCE_ID: + builder.setEntryServiceInstanceId(reader.nextInt()); + break; + case ENTRY_ENDPOINT: + builder.setEntryEndpoint(reader.nextString()); + break; + case ENTRY_ENDPOINT_ID: + builder.setEntryEndpointId(reader.nextInt()); + break; + case PARENT_ENDPOINT: + builder.setParentEndpoint(reader.nextString()); + break; + case PARENT_ENDPOINT_ID: + builder.setParentEndpointId(reader.nextInt()); + break; + default: + reader.skipValue(); + break; + } + } + reader.endObject(); + + return builder.build(); + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/SegmentJsonReader.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/SegmentJsonReader.java new file mode 100644 index 0000000..2982584 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/SegmentJsonReader.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader; + +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SegmentJsonReader implements StreamJsonReader<SegmentObject.Builder> { + + private static final Logger logger = LoggerFactory.getLogger(SegmentJsonReader.class); + + private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader(); + private SpanJsonReader spanJsonReader = new SpanJsonReader(); + + private static final String TRACE_SEGMENT_ID = "trace_segment_id"; + private static final String SPANS = "spans"; + private static final String SERVICE_ID = "service_id"; + private static final String SERVICE_INSTANCE_ID = "service_instance_id"; + private static final String IS_SIZE_LIMITED = "is_size_limited"; + + @Override + public SegmentObject.Builder read(JsonReader reader) throws IOException { + SegmentObject.Builder builder = SegmentObject.newBuilder(); + + reader.beginObject(); + while (reader.hasNext()) { + switch (reader.nextName()) { + case TRACE_SEGMENT_ID: + builder.setTraceSegmentId(uniqueIdJsonReader.read(reader)); + if (logger.isDebugEnabled()) { + StringBuilder segmentId = new StringBuilder(); + builder.getTraceSegmentId().getIdPartsList().forEach(idPart -> segmentId.append(idPart)); + logger.debug("segment id: {}", segmentId); + } + break; + case SERVICE_ID: + builder.setServiceId(reader.nextInt()); + break; + case SERVICE_INSTANCE_ID: + builder.setServiceInstanceId(reader.nextInt()); + break; + case IS_SIZE_LIMITED: + builder.setIsSizeLimited(reader.nextBoolean()); + break; + case SPANS: + reader.beginArray(); + while (reader.hasNext()) { + builder.addSpans(spanJsonReader.read(reader)); + } + reader.endArray(); + break; + default: + reader.skipValue(); + break; + } + } + reader.endObject(); + + return builder; + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/SpanJsonReader.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/SpanJsonReader.java new file mode 100644 index 0000000..11d51f3 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/SpanJsonReader.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader; + +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2; + +public class SpanJsonReader implements StreamJsonReader<SpanObjectV2> { + + private KeyStringValuePairJsonReader keyStringValuePairJsonReader = new KeyStringValuePairJsonReader(); + private LogJsonReader logJsonReader = new LogJsonReader(); + private ReferenceJsonReader referenceJsonReader = new ReferenceJsonReader(); + + private static final String SPAN_ID = "span_id"; + private static final String PARENT_SPAN_ID = "parent_span_id"; + private static final String START_TIME = "start_time"; + private static final String END_TIME = "end_time"; + private static final String REFS = "refs"; + private static final String OPERATION_NAME_ID = "operation_name_id"; + private static final String OPERATION_NAME = "operation_name"; + private static final String PEER_ID = "peer_id"; + private static final String PEER = "peer"; + private static final String SPAN_TYPE = "span_type"; + private static final String SPAN_LAYER = "span_layer"; + private static final String COMPONENT_ID = "component_id"; + private static final String COMPONENT = "component"; + private static final String IS_ERROR = "is_error"; + private static final String TAGS = "tags"; + private static final String LOGS = "logs"; + + @Override + public SpanObjectV2 read(JsonReader reader) throws IOException { + SpanObjectV2.Builder builder = SpanObjectV2.newBuilder(); + + reader.beginObject(); + while (reader.hasNext()) { + switch (reader.nextName()) { + case SPAN_ID: + builder.setSpanId(reader.nextInt()); + break; + case SPAN_TYPE: + builder.setSpanTypeValue(reader.nextInt()); + break; + case SPAN_LAYER: + builder.setSpanLayerValue(reader.nextInt()); + break; + case PARENT_SPAN_ID: + builder.setParentSpanId(reader.nextInt()); + break; + case START_TIME: + builder.setStartTime(reader.nextLong()); + break; + case END_TIME: + builder.setEndTime(reader.nextLong()); + break; + case COMPONENT_ID: + builder.setComponentId(reader.nextInt()); + break; + case COMPONENT: + builder.setComponent(reader.nextString()); + break; + case OPERATION_NAME_ID: + builder.setOperationNameId(reader.nextInt()); + break; + case OPERATION_NAME: + builder.setOperationName(reader.nextString()); + break; + case PEER_ID: + builder.setPeerId(reader.nextInt()); + break; + case PEER: + builder.setPeer(reader.nextString()); + break; + case IS_ERROR: + builder.setIsError(reader.nextBoolean()); + break; + case REFS: + reader.beginArray(); + while (reader.hasNext()) { + builder.addRefs(referenceJsonReader.read(reader)); + } + reader.endArray(); + break; + case TAGS: + reader.beginArray(); + while (reader.hasNext()) { + builder.addTags(keyStringValuePairJsonReader.read(reader)); + } + reader.endArray(); + break; + case LOGS: + reader.beginArray(); + while (reader.hasNext()) { + builder.addLogs(logJsonReader.read(reader)); + } + reader.endArray(); + break; + default: + reader.skipValue(); + break; + } + } + reader.endObject(); + + return builder.build(); + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/StreamJsonReader.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/StreamJsonReader.java new file mode 100644 index 0000000..5e7e461 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/StreamJsonReader.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader; + +import com.google.gson.stream.JsonReader; +import java.io.IOException; + +public interface StreamJsonReader<T> { + T read(JsonReader reader) throws IOException; +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/TraceSegment.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/TraceSegment.java new file mode 100644 index 0000000..fe1f2f7 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/TraceSegment.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader; + +import org.apache.skywalking.apm.network.language.agent.UniqueId; +import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; +import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject; + +public class TraceSegment { + + private UpstreamSegment.Builder builder; + + public TraceSegment() { + builder = UpstreamSegment.newBuilder(); + } + + public void addGlobalTraceId(UniqueId.Builder globalTraceId) { + builder.addGlobalTraceIds(globalTraceId); + } + + public void setTraceSegmentBuilder(SegmentObject.Builder traceSegmentBuilder) { + builder.setSegment(traceSegmentBuilder.build().toByteString()); + } + + public UpstreamSegment getUpstreamSegment() { + return builder.build(); + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/UniqueIdJsonReader.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/UniqueIdJsonReader.java new file mode 100644 index 0000000..ec87fea --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/UniqueIdJsonReader.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader; + +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import org.apache.skywalking.apm.network.language.agent.UniqueId; + +public class UniqueIdJsonReader implements StreamJsonReader<UniqueId.Builder> { + + @Override + public UniqueId.Builder read(JsonReader reader) throws IOException { + UniqueId.Builder builder = UniqueId.newBuilder(); + + reader.beginArray(); + while (reader.hasNext()) { + builder.addIdParts(reader.nextLong()); + } + reader.endArray(); + return builder; + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/UpstreamSegmentJsonReader.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/UpstreamSegmentJsonReader.java new file mode 100644 index 0000000..c47583c --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v6/rest/reader/UpstreamSegmentJsonReader.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.rest.reader; + +import com.google.gson.stream.JsonReader; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpstreamSegmentJsonReader implements StreamJsonReader<TraceSegment> { + + private static final Logger logger = LoggerFactory.getLogger(UpstreamSegmentJsonReader.class); + + private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader(); + private SegmentJsonReader segmentJsonReader = new SegmentJsonReader(); + + private static final String GLOBAL_TRACE_IDS = "global_trace_ids"; + private static final String SEGMENT = "segment"; + + @Override + public TraceSegment read(JsonReader reader) throws IOException { + TraceSegment traceSegment = new TraceSegment(); + + reader.beginObject(); + while (reader.hasNext()) { + switch (reader.nextName()) { + case GLOBAL_TRACE_IDS: + reader.beginArray(); + while (reader.hasNext()) { + traceSegment.addGlobalTraceId(uniqueIdJsonReader.read(reader)); + } + reader.endArray(); + + break; + case SEGMENT: + traceSegment.setTraceSegmentBuilder(segmentJsonReader.read(reader)); + break; + default: + reader.skipValue(); + break; + } + } + reader.endObject(); + + return traceSegment; + } +}