http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/RpcBootstrapFactory.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/RpcBootstrapFactory.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/RpcBootstrapFactory.java new file mode 100644 index 0000000..13ce176 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/RpcBootstrapFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc; + +import java.util.Properties; +import org.apache.rocketmq.remoting.internal.BeanUtils; +import org.apache.rocketmq.rpc.api.SimpleClient; +import org.apache.rocketmq.rpc.api.SimpleServer; +import org.apache.rocketmq.rpc.impl.client.SimpleClientImpl; +import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig; +import org.apache.rocketmq.rpc.impl.server.SimpleServerImpl; + +public class RpcBootstrapFactory { + public static SimpleServer createServerBootstrap(Properties properties) { + return new SimpleServerImpl(BeanUtils.populate(properties, RpcCommonConfig.class)); + } + + public static SimpleClient createClientBootstrap(Properties properties) { + return new SimpleClientImpl(BeanUtils.populate(properties, RpcCommonConfig.class)); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/AdvancedClientImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/AdvancedClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/AdvancedClientImpl.java new file mode 100644 index 0000000..e94f589 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/AdvancedClientImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.rpc.impl.client; + +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.rpc.annotation.MethodType; +import org.apache.rocketmq.rpc.api.AdvancedClient; +import org.apache.rocketmq.rpc.api.Promise; +import org.apache.rocketmq.rpc.impl.service.RpcJdkProxy; +import org.apache.rocketmq.rpc.impl.service.RpcProxyFactory; + +public class AdvancedClientImpl implements AdvancedClient { + private final SimpleClientImpl simpleClient; + + public AdvancedClientImpl(final SimpleClientImpl simpleClient) { + this.simpleClient = simpleClient; + } + + @Override + public <T> T callSync(final String address, final String serviceCode, final String version, + final Object[] parameter, + final Class<T> responseType) throws Exception { + RemotingCommand request = simpleClient.createRemoteRequest(serviceCode, version, parameter); + RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleClient, simpleClient.getRemotingClient(), simpleClient.getRpcCommonConfig(), address); + return (T) simpleClient.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, responseType, MethodType.SYNC); + } + + @Override + public <T> Promise<T> callAsync(final String address, final String serviceCode, final String version, + final Object[] parameter, final Class<T> responseType) throws Exception { + RemotingCommand request = simpleClient.createRemoteRequest(serviceCode, version, parameter); + RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleClient, simpleClient.getRemotingClient(), simpleClient.getRpcCommonConfig(), address); + return (Promise<T>) simpleClient.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, responseType, MethodType.ASYNC); + } + + @Override + public void callOneway(final String address, final String serviceCode, final String version, + final Object[] parameter) throws Exception { + RemotingCommand request = simpleClient.createRemoteRequest(serviceCode, version, parameter); + RpcJdkProxy rpcJdkProxy = RpcProxyFactory.createServiceProxy(null, simpleClient, simpleClient.getRemotingClient(), simpleClient.getRpcCommonConfig(), address); + simpleClient.invokeRemoteMethod(rpcJdkProxy, serviceCode, request, Void.TYPE, MethodType.ONEWAY); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java new file mode 100644 index 0000000..787e8c1 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.client; + +import java.util.Properties; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.remoting.api.RemotingClient; +import org.apache.rocketmq.remoting.api.RemotingService; +import org.apache.rocketmq.remoting.api.interceptor.Interceptor; +import org.apache.rocketmq.remoting.external.ThreadUtils; +import org.apache.rocketmq.remoting.impl.netty.RemotingBootstrapFactory; +import org.apache.rocketmq.rpc.api.AdvancedClient; +import org.apache.rocketmq.rpc.api.SimpleClient; +import org.apache.rocketmq.rpc.impl.config.RpcCommonConfig; +import org.apache.rocketmq.rpc.impl.exception.ServiceExceptionManager; +import org.apache.rocketmq.rpc.impl.service.RpcConnectionListener; +import org.apache.rocketmq.rpc.impl.service.RpcInstanceAbstract; +import org.apache.rocketmq.rpc.impl.service.RpcProxyFactory; +import org.apache.rocketmq.rpc.internal.RpcErrorMapper; + +public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClient { + private RpcCommonConfig rpcCommonConfig; + private RemotingClient remotingClient; + private ExecutorService callServiceThreadPool; + + public SimpleClientImpl(RpcCommonConfig rpcCommonConfig) { + this(rpcCommonConfig, RemotingBootstrapFactory.createRemotingClient(rpcCommonConfig)); + } + + private SimpleClientImpl(RpcCommonConfig rpcCommonConfig, RemotingClient remotingClient) { + super(rpcCommonConfig); + this.remotingClient = remotingClient; + this.rpcCommonConfig = rpcCommonConfig; + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "clientCallServiceThread", true); + } + + public void initialize() { + this.remotingClient.registerChannelEventListener(new RpcConnectionListener(this)); + } + + @Override + public void start() { + try { + initialize(); + super.start(); + this.remotingClient.start(); + } catch (Exception e) { + throw ServiceExceptionManager.TranslateException(RpcErrorMapper.RpcErrorLatitude.LOCAL.getCode(), e); + } + } + + @Override + public void stop() { + try { + super.stop(); + ThreadUtils.shutdownGracefully(this.callServiceThreadPool, 3000, TimeUnit.MILLISECONDS); + this.remotingClient.stop(); + } catch (Exception e) { + throw ServiceExceptionManager.TranslateException(RpcErrorMapper.RpcErrorLatitude.LOCAL.getCode(), e); + } + } + + @Override + public <T> T bind(final Class<T> service, final String address, final Properties properties) { + return this.narrow0(service, RpcProxyFactory.createServiceProxy(service, this, remotingClient, rpcCommonConfig, address)); + } + + @Override + public void publish(final Object service) { + this.publishService0(service); + } + + @Override + public void publish(final Object service, final ExecutorService executorService) { + this.publishService0(service, executorService); + } + + @Override + public AdvancedClient advancedClient() { + return new AdvancedClientImpl(this); + } + + @Override + public RemotingService remotingService() { + return this.remotingClient; + } + + @Override + public void registerServiceListener() { + + } + + public ExecutorService getCallServiceThreadPool() { + return callServiceThreadPool; + } + + public void setCallServiceThreadPool(final ExecutorService callServiceThreadPool) { + this.callServiceThreadPool = callServiceThreadPool; + } + + public void registerInterceptor(Interceptor interceptor) { + if (interceptor != null) + this.remotingClient.registerInterceptor(interceptor); + } + + public RemotingClient getRemotingClient() { + return remotingClient; + } + + public void setRemotingClient(final RemotingClient remotingClient) { + this.remotingClient = remotingClient; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/command/ResponseCode.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/command/ResponseCode.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/command/ResponseCode.java new file mode 100644 index 0000000..270adaa --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/command/ResponseCode.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.command; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +/** + * 1xx: SDK exception + * <p> + * 4xx: User exception + * <p> + * 5xx: Server exception + * <p> + * 6xx: Common exception + */ +public class ResponseCode { + + public static final String ILLEGAL_ACCESS = "100"; + public static final String NULL_POINTER = "102"; + public static final String FAIL_INVOKE = "103"; + public static final String INSTANTIATED_FAIL = "104"; + public static final String SUCCESS = "0"; + + public static final String USER_SERVICE_EXCEPTION = "400"; + public static final String USER_EXCEPTION_CLASS_NOT_FOUND = "401"; + public static final String USER_EXCEPTION_METHOD_NOT_FOUND = "402"; + + public static final String PROVIDER_OFF_LINE = "501"; + public static final String CONSUMER_OFF_LINE = "502"; + public static final String PROVIDER_NOT_EXIST = "503"; + public static final String CONSUMER_NOT_EXIST = "504"; + public static final String NAMESPACE_NOT_EXIST = "505"; + public static final String CLIENT_NOT_REGISTERED = "506"; + public static final String NO_SERVICE_ON_LINE = "507"; + public static final String NO_SERVICE_ON_THIS_SERVER = "508"; + public static final String SERVICE_ALREADY_ONLINE = "509"; + public static final String SERVER_NOT_EXIST = "510"; + public static final String SERVER_ALREADY_ONLINE = "511"; + public static final String NO_SERVER_ON_LINE = "512"; + public static final String ACCESS_DENIED = "513"; + + public static final String SYSTEM_ERROR = "600"; + public static final String SYSTEM_BUSY = "601"; + public static final String BAD_REQUEST = "602"; + public static final String PARAMETER_ERROR = "603"; + public static final String SEND_REQUEST_FAILED = "604"; + public static final String REQUEST_TIMEOUT = "605"; + public static final String UNKNOWN_EXCEPTION = "606"; + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } + + public enum ResponseStatus { + R_100(100, "Illegal access"), + R_101(101, "Illegal argument"), + R_102(102, "Null pointer"), + R_103(103, "Invoke failed"), + R_104(104, "Failed initialization"), + R_200(200, "Success"), + R_400(400, "User service happened"), + R_401(401, "Class not found"), + R_402(402, "Method not found"), + R_501(501, "Service provider offline "), + R_502(502, "Service consumer offline"), + R_503(503, "Service provider not exist"), + R_504(504, "Service consumer not exist"), + R_505(505, "Namespace not exist"), + R_506(506, "Client not registered"), + R_507(507, "No service online"), + R_508(508, "No service on this server"), + R_509(509, "Server already online"), + R_510(510, "Server not exist"), + R_511(511, "Server already online"), + R_512(512, "No server online"), + R_513(513, "Access denied"), + R_600(600, "System error"), + R_601(601, "System busy"), + R_602(602, "Bad request"), + R_603(603, "Parameter error"), + R_604(604, "Send request failed"), + R_605(605, "Request timeout"), + R_606(606, "unknown exception"); + + private int responseCode = 0; + private String responseSimpleMessage = ""; + + ResponseStatus(int responseCode, String responseSimpleMessage) { + this.responseCode = responseCode; + this.responseSimpleMessage = responseSimpleMessage; + } + + public int getResponseCode() { + return responseCode; + } + + public String getResponseSimpleMessage() { + return responseSimpleMessage; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/command/RpcRequestCode.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/command/RpcRequestCode.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/command/RpcRequestCode.java new file mode 100644 index 0000000..3507754 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/command/RpcRequestCode.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.command; + +public interface RpcRequestCode { + String CALL_SERVICE = "service/call"; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/config/RpcCommonConfig.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/config/RpcCommonConfig.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/config/RpcCommonConfig.java new file mode 100644 index 0000000..7c00a09 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/config/RpcCommonConfig.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.config; + +import org.apache.rocketmq.remoting.config.RemotingConfig; + +public class RpcCommonConfig extends RemotingConfig { + public final static String SERVICE_INVOKE_TIMEOUT = "service.invoke.timeout"; + public final static String SERVICE_THREAD_KEEP_ALIVE_TIME = "service.thread.keep.alive.time"; + private final static String SERVICE_ADDRESS_CACHE_TIME = "service.address.cache.time"; + private final static String SERVICE_CACHE_MAX_COUNT = "service.cache.max.count"; + private long serviceInvokeTimeout = 10000; + private long serviceThreadKeepAliveTime = 60000; + private long serviceAddressCacheTime = 30; + private long serviceCacheMaxCount = 20000; + + public long getServiceInvokeTimeout() { + return serviceInvokeTimeout; + } + + public void setServiceInvokeTimeout(final long serviceInvokeTimeout) { + this.serviceInvokeTimeout = serviceInvokeTimeout; + } + + public long getServiceThreadKeepAliveTime() { + return serviceThreadKeepAliveTime; + } + + public void setServiceThreadKeepAliveTime(final long serviceThreadKeepAliveTime) { + this.serviceThreadKeepAliveTime = serviceThreadKeepAliveTime; + } + + public long getServiceAddressCacheTime() { + return serviceAddressCacheTime; + } + + public void setServiceAddressCacheTime(long serviceAddressCacheTime) { + this.serviceAddressCacheTime = serviceAddressCacheTime; + } + + public long getServiceCacheMaxCount() { + return serviceCacheMaxCount; + } + + public void setServiceCacheMaxCount(long serviceCacheMaxCount) { + this.serviceCacheMaxCount = serviceCacheMaxCount; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/context/RpcCallerContext.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/context/RpcCallerContext.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/context/RpcCallerContext.java new file mode 100644 index 0000000..bb8433c --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/context/RpcCallerContext.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.context; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; + +public class RpcCallerContext { + private RemotingCommand remotingRequest; + private RemotingCommand remotingResponse; + + public RemotingCommand getRemotingRequest() { + return remotingRequest; + } + + public void setRemotingRequest(RemotingCommand remotingRequest) { + this.remotingRequest = remotingRequest; + } + + public RemotingCommand getRemotingResponse() { + return remotingResponse; + } + + public void setRemotingResponse(RemotingCommand remotingResponse) { + this.remotingResponse = remotingResponse; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/context/RpcProviderContext.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/context/RpcProviderContext.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/context/RpcProviderContext.java new file mode 100644 index 0000000..547a0ec --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/context/RpcProviderContext.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.context; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; + +public class RpcProviderContext { + private RemotingChannel remotingChannel; + private RemotingCommand remotingRequest; + private RemotingCommand remotingResponse; + private boolean returnResponse = true; + + public RemotingChannel getRemotingChannel() { + return remotingChannel; + } + + public void setRemotingChannel(RemotingChannel remotingChannel) { + this.remotingChannel = remotingChannel; + } + + public RemotingCommand getRemotingRequest() { + return remotingRequest; + } + + public void setRemotingRequest(RemotingCommand remotingRequest) { + this.remotingRequest = remotingRequest; + } + + public RemotingCommand getRemotingResponse() { + return remotingResponse; + } + + public void setRemotingResponse(RemotingCommand remotingResponse) { + this.remotingResponse = remotingResponse; + } + + public boolean isReturnResponse() { + return returnResponse; + } + + public void setReturnResponse(boolean returnResponse) { + this.returnResponse = returnResponse; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceAuthException.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceAuthException.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceAuthException.java new file mode 100644 index 0000000..ab53a54 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceAuthException.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.exception; + +import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; + +public class ServiceAuthException extends RemoteAccessException { + private static final long serialVersionUID = 1L; + + private String status; + private int code; + + public ServiceAuthException(String status, int code) { + super(""); + this.status = status; + this.code = code; + } + + public ServiceAuthException(String status, int code, String message) { + super(message); + this.status = status; + this.code = code; + } + + public ServiceAuthException(String status, int code, Throwable throwable) { + super(throwable.getMessage()); + this.status = status; + this.code = code; + } + + public ServiceAuthException(String status, int code, String message, Throwable throwable) { + super(message, throwable); + this.status = status; + this.code = code; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionHandlerCode.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionHandlerCode.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionHandlerCode.java new file mode 100644 index 0000000..900e82a --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionHandlerCode.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.exception; + +import java.util.HashMap; +import java.util.Map; +import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException; +import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; +import org.apache.rocketmq.rpc.impl.command.ResponseCode; + +public class ServiceExceptionHandlerCode extends ResponseCode { + + private static Map<Class, Integer> exceptionCodeMap = new HashMap<>(); + + static { + exceptionCodeMap.put(IllegalAccessException.class, 100); + exceptionCodeMap.put(IllegalArgumentException.class, 101); + exceptionCodeMap.put(NullPointerException.class, 102); + exceptionCodeMap.put(InstantiationException.class, 104); + exceptionCodeMap.put(NumberFormatException.class, 105); + exceptionCodeMap.put(RemoteTimeoutException.class, 106); + exceptionCodeMap.put(RemoteConnectFailureException.class, 107); + exceptionCodeMap.put(ClassNotFoundException.class, 402); + } + + public static int searchExceptionCode(Class type) { + Integer code = exceptionCodeMap.get(type); + if (code == null) + //Default exception code + return 100; + return code; + } + + public static ResponseStatus searchResponseStatus(int code) { + for (ResponseStatus responseStatus : ResponseStatus.values()) { + if (code == responseStatus.getResponseCode()) + return responseStatus; + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionHandlerManager.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionHandlerManager.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionHandlerManager.java new file mode 100644 index 0000000..c5bc599 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionHandlerManager.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.exception; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.serializable.Serializer; +import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; +import org.apache.rocketmq.rpc.impl.command.ResponseCode; +import org.apache.rocketmq.rpc.internal.RpcErrorMapper; + +import static org.apache.rocketmq.remoting.internal.ExceptionUtils.getStackTrace; + +public class ServiceExceptionHandlerManager { + private static boolean isIllegalCode(int code) { + return code > 0; + } + + private static ServiceExceptionInvokeMessage getInvokeExceptionMessage(RemotingCommand remotingCommand, + SerializerFactory serializerFactory) { + Serializer serialization = serializerFactory.get(remotingCommand.serializerType()); + if (remotingCommand.parameterBytes() == null) + return null; + return serialization.decode(remotingCommand.parameterBytes(), ServiceExceptionInvokeMessage.class); + } + + private static Exception getResponseException(String className, RemotingCommand remotingCommand, + SerializerFactory serializerFactory) + throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, + InstantiationException, InvocationTargetException { + return parseException(serializerFactory, remotingCommand, className); + } + + @SuppressWarnings("unchecked") + private static Exception parseException(SerializerFactory serializerFactory, RemotingCommand remotingCommand, + String className) + throws ClassNotFoundException, NoSuchMethodException, + IllegalAccessException, InvocationTargetException, InstantiationException { + Class exceptionClass = Class.forName(className); + assert exceptionClass != null; + Class[] oneParamTypes = {String.class}; + Class[] twoParamTypes = {String.class, Throwable.class}; + ServiceExceptionInvokeMessage serviceExceptionInvokeMessage = getInvokeExceptionMessage(remotingCommand, serializerFactory); + if (serviceExceptionInvokeMessage == null) + return new Exception(); + + if (serviceExceptionInvokeMessage.getThrowable() == null) { + Object[] onsParams = {serviceExceptionInvokeMessage.getErrorMessage()}; + Constructor constructor = exceptionClass.getConstructor(oneParamTypes); + assert constructor != null; + return (Exception) constructor.newInstance(onsParams); + } else { + Object[] twoParams = {serviceExceptionInvokeMessage.getErrorMessage(), serviceExceptionInvokeMessage.getThrowable()}; + Constructor constructor = exceptionClass.getConstructor(twoParamTypes); + assert constructor != null; + return (Exception) constructor.newInstance(twoParams); + } + } + + public static void exceptionHandler(String code, RemotingCommand remotingCommand, + SerializerFactory serializerFactory) throws Exception { + int intCode; + try { + intCode = Integer.parseInt(code); + } catch (NumberFormatException e) { + throw ServiceExceptionManager.TranslateException(RpcErrorMapper.RpcErrorLatitude.REMOTE.getCode(), e); + } + + if (isIllegalCode(intCode)) { + ServiceExceptionInvokeMessage returnResult = getInvokeExceptionMessage(remotingCommand, serializerFactory); + if (returnResult == null) { + ResponseCode.ResponseStatus responseStatus = ServiceExceptionHandlerCode.searchResponseStatus(intCode); + if (responseStatus != null) { + throw new ServiceRuntimeException(RpcErrorMapper.RpcErrorLatitude.REMOTE.getCode(), + String.valueOf(responseStatus.getResponseCode())); + } else { + throw new ServiceRuntimeException(RpcErrorMapper.RpcErrorLatitude.REMOTE.getCode(), + ResponseCode.UNKNOWN_EXCEPTION); + } + } + + try { + Exception exception = getResponseException(returnResult.getClassFullName(), remotingCommand, serializerFactory); + if (intCode == Integer.valueOf(ResponseCode.USER_SERVICE_EXCEPTION)) { + throw exception; + } else { + throw new ServiceRuntimeException(RpcErrorMapper.RpcErrorLatitude.REMOTE.getCode(), + String.valueOf(intCode), + getStackTrace(exception)); + } + } catch (ClassNotFoundException e) { + throw new ServiceRuntimeException(RpcErrorMapper.RpcErrorLatitude.LOCAL.getCode(), + String.valueOf(ResponseCode.USER_EXCEPTION_CLASS_NOT_FOUND), + e.getMessage()); + } catch (NoSuchMethodException e) { + throw new ServiceRuntimeException(RpcErrorMapper.RpcErrorLatitude.REMOTE.getCode(), + String.valueOf(ResponseCode.USER_EXCEPTION_METHOD_NOT_FOUND), + e.getMessage()); + } catch (IllegalAccessException e) { + throw new ServiceRuntimeException(RpcErrorMapper.RpcErrorLatitude.REMOTE.getCode(), + String.valueOf(ResponseCode.ILLEGAL_ACCESS), + e.getMessage()); + } catch (InstantiationException e) { + throw new ServiceRuntimeException(RpcErrorMapper.RpcErrorLatitude.REMOTE.getCode(), + String.valueOf(ResponseCode.INSTANTIATED_FAIL), + e.getMessage()); + } catch (InvocationTargetException e) { + throw new ServiceRuntimeException(RpcErrorMapper.RpcErrorLatitude.REMOTE.getCode(), + String.valueOf(ResponseCode.FAIL_INVOKE), + e.getMessage()); + } + } else { + throw new ServiceRuntimeException(RpcErrorMapper.RpcErrorLatitude.REMOTE.getCode(), + ResponseCode.UNKNOWN_EXCEPTION); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionInvokeMessage.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionInvokeMessage.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionInvokeMessage.java new file mode 100644 index 0000000..6f77ca9 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionInvokeMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.exception; + +import java.io.Serializable; + +public class ServiceExceptionInvokeMessage implements Serializable { + private String classFullName; + private String errorMessage; + private Throwable throwable; + + public String getClassFullName() { + return classFullName; + } + + public void setClassFullName(final String classFullName) { + this.classFullName = classFullName; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(final String errorMessage) { + this.errorMessage = errorMessage; + } + + public Throwable getThrowable() { + return throwable; + } + + public void setThrowable(Throwable throwable) { + this.throwable = throwable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionManager.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionManager.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionManager.java new file mode 100644 index 0000000..721dd2a --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceExceptionManager.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.exception; + +public class ServiceExceptionManager { + public static ServiceRuntimeException TranslateException(String code, Throwable e) { + if (e instanceof ServiceRuntimeException) + return (ServiceRuntimeException) e; + int errorCode = ServiceExceptionHandlerCode.searchExceptionCode(e.getClass()); + /* + int errorCode = ServiceExceptionHandlerCode.searchExceptionCode(e.getClass()); + if (errorCode == 100) + return new ServiceRuntimeException(code, String.valueOf(errorCode), e.getMessage()); + */ + return new ServiceRuntimeException(code, String.valueOf(errorCode), e); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceRuntimeException.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceRuntimeException.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceRuntimeException.java new file mode 100644 index 0000000..8716f5c --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceRuntimeException.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.exception; + +import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; +import org.apache.rocketmq.rpc.impl.command.ResponseCode; + +public class ServiceRuntimeException extends RemoteAccessException { + private static final String URL = "http://jukola.alibaba.com"; + private String code; + private String helpUrl; + private String exMsg; + + public ServiceRuntimeException(String latitude, String code) { + this(latitude, code, ""); + } + + public ServiceRuntimeException(String latitude, String code, Throwable e) { + this(latitude, code, e.getMessage()); + if (e.getCause() != null) + this.setStackTrace(e.getCause().getStackTrace()); + else + this.setStackTrace(e.getStackTrace()); + } + + public ServiceRuntimeException(String latitude, String code, String msg) { + super(buildExceptionMsg(latitude, code, msg)); + this.code = code; + this.exMsg = msg; + this.helpUrl = buildHelpUrl(latitude, code); + } + + private static String buildExceptionMsg(String latitude, String code, String msg) { + StringBuilder sb = new StringBuilder(); + ResponseCode.ResponseStatus responseStatus = ServiceExceptionHandlerCode.searchResponseStatus(Integer.valueOf(code)); + String helpUrl = buildHelpUrl(latitude, code); + if (responseStatus != null) + if (msg != null && !msg.isEmpty()) + sb.append(msg).append(", see for more ").append(helpUrl); + else + sb.append(responseStatus.getResponseSimpleMessage()).append(", see for more ").append(helpUrl); + return sb.toString(); + } + + private static String buildHelpUrl(String latitude, String code) { + return URL + "/" + latitude + "/" + code; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public String getHelpUrl() { + return helpUrl; + } + + public void setHelpUrl(String helpUrl) { + this.helpUrl = helpUrl; + } + + public String getExMsg() { + return exMsg; + } + + public void setExMsg(String exMsg) { + this.exMsg = exMsg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceSignatureException.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceSignatureException.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceSignatureException.java new file mode 100644 index 0000000..ce4870f --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/exception/ServiceSignatureException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.exception; + +import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; + +public class ServiceSignatureException extends RemoteAccessException { + + private static final long serialVersionUID = -3662598055526208602L; + private final int code; + + public ServiceSignatureException(int code) { + super(""); + this.code = code; + } + + public ServiceSignatureException(int code, String message, Throwable cause) { + super(message, cause); + this.code = code; + } + + public ServiceSignatureException(int code, String message) { + super(message); + this.code = code; + } + + public ServiceSignatureException(int code, Throwable cause) { + super(cause.getMessage()); + this.code = code; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DefaultServiceAPI.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DefaultServiceAPI.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DefaultServiceAPI.java new file mode 100644 index 0000000..25db808 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DefaultServiceAPI.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import org.apache.rocketmq.rpc.annotation.RemoteMethod; +import org.apache.rocketmq.rpc.annotation.RemoteService; + +@RemoteService(name = "DefaultServiceAPI", version = "1.0.0") +public interface DefaultServiceAPI { + @RemoteMethod(name = "collectStackAll", version = "1.0.0") + StackTracesAll collectStackAll(); + + @RemoteMethod(name = "collectStatsAll", version = "1.0.0") + StatsAll collectStatsAll(); + + @RemoteMethod(name = "list", version = "1.0.0") + void list(); + + @RemoteMethod(name = "trace", version = "1.0.0") + void trace(); + + @RemoteMethod(name = "ping", version = "1.0.0") + void ping(); + + @RemoteMethod(name = "metrics", version = "1.0.0") + void metrics(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DefaultServiceAPIImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DefaultServiceAPIImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DefaultServiceAPIImpl.java new file mode 100644 index 0000000..eb45d60 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DefaultServiceAPIImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import java.util.TreeMap; + +public class DefaultServiceAPIImpl implements DefaultServiceAPI { + private final ServiceStats serviceStats; + private final ThreadStats threadStats; + + public DefaultServiceAPIImpl(ServiceStats serviceStats, ThreadStats threadStats) { + this.serviceStats = serviceStats; + this.threadStats = threadStats; + } + + public StackTracesAll collectStackAll() { + StackTracesAll sta = new StackTracesAll(); + sta.setStackTraces(UtilAll.jstack()); + return sta; + } + + public StatsAll collectStatsAll() { + StatsAll statsAll = serviceStats.stats(); + TreeMap<Threading, TimestampRegion> tm = this.threadStats.cloneStatsTable(); + statsAll.setStatsThreading(tm); + return statsAll; + } + + public void list() { + + } + + public void trace() { + + } + + public void ping() { + + } + + public void metrics() { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DownloadFileRequest.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DownloadFileRequest.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DownloadFileRequest.java new file mode 100644 index 0000000..dca7d5e --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/DownloadFileRequest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public class DownloadFileRequest { + private String fileFullName; + private int offset; + private int size; + + public String getFileFullName() { + return fileFullName; + } + + public void setFileFullName(String fileFullName) { + this.fileFullName = fileFullName; + } + + public int getOffset() { + return offset; + } + + public void setOffset(int offset) { + this.offset = offset; + } + + public int getSize() { + return size; + } + + public void setSize(int size) { + this.size = size; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ExecuteResult.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ExecuteResult.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ExecuteResult.java new file mode 100644 index 0000000..1eb795f --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ExecuteResult.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +public class ExecuteResult { + private int ret; + private String stdout; + private String stderr; + + public int getRet() { + return ret; + } + + public void setRet(int ret) { + this.ret = ret; + } + + public String getStdout() { + return stdout; + } + + public void setStdout(String stdout) { + this.stdout = stdout; + } + + public String getStderr() { + return stderr; + } + + public void setStderr(String stderr) { + this.stderr = stderr; + } + + @Override + public String toString() { + return "ExecuteResult{" + + "ret=" + ret + + ", stdout='" + stdout + '\'' + + ", stderr='" + stderr + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/MethodStats.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/MethodStats.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/MethodStats.java new file mode 100644 index 0000000..175d5a7 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/MethodStats.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public class MethodStats { + private double qpsOK; + private long failedTimesInMinutes; + private double rtAvgInMinutes; + private long rtMaxInMinutes; + private long rtMaxIn10Minutes; + private long rtMaxInHour; + private long[] rtRegion; + + public double getQpsOK() { + return qpsOK; + } + + public void setQpsOK(double qpsOK) { + this.qpsOK = qpsOK; + } + + public long getFailedTimesInMinutes() { + return failedTimesInMinutes; + } + + public void setFailedTimesInMinutes(long failedTimesInMinutes) { + this.failedTimesInMinutes = failedTimesInMinutes; + } + + public double getRtAvgInMinutes() { + return rtAvgInMinutes; + } + + public void setRtAvgInMinutes(double rtAvgInMinutes) { + this.rtAvgInMinutes = rtAvgInMinutes; + } + + public long getRtMaxInMinutes() { + return rtMaxInMinutes; + } + + public void setRtMaxInMinutes(long rtMaxInMinutes) { + this.rtMaxInMinutes = rtMaxInMinutes; + } + + public long getRtMaxIn10Minutes() { + return rtMaxIn10Minutes; + } + + public void setRtMaxIn10Minutes(long rtMaxIn10Minutes) { + this.rtMaxIn10Minutes = rtMaxIn10Minutes; + } + + public long getRtMaxInHour() { + return rtMaxInHour; + } + + public void setRtMaxInHour(long rtMaxInHour) { + this.rtMaxInHour = rtMaxInHour; + } + + public long[] getRtRegion() { + return rtRegion; + } + + public void setRtRegion(long[] rtRegion) { + this.rtRegion = rtRegion; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ServiceStats.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ServiceStats.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ServiceStats.java new file mode 100644 index 0000000..20026a9 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/ServiceStats.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.remoting.external.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ServiceStats { + private static final Logger log = LoggerFactory.getLogger("ServiceStats"); + private final StatsItemSet callerOKQPS; + private final StatsItemSet callerFailedQPS; + private final StatsItemSet callerRT; + private final StatsItemSet providerOKQPS; + private final StatsItemSet providerFailedQPS; + private final StatsItemSet providerRT; + private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor("ServiceStatsScheduledThread", true); + + public ServiceStats() { + this.callerOKQPS = new StatsItemSet("SERVICE_QPS_CALLER_OK", this.scheduledExecutorService, log); + this.callerFailedQPS = new StatsItemSet("SERVICE_QPS_CALLER_FAILED", this.scheduledExecutorService, log); + this.callerRT = new StatsItemSet("SERVICE_RT_CALLER", this.scheduledExecutorService, log); + this.providerOKQPS = new StatsItemSet("SERVICE_QPS_PROVIDER_OK", this.scheduledExecutorService, log); + this.providerFailedQPS = new StatsItemSet("SERVICE_QPS_PROVIDER_FAILED", this.scheduledExecutorService, log); + this.providerRT = new StatsItemSet("SERVICE_RT_PROVIDER", this.scheduledExecutorService, log); + } + + private static MethodStats methodStats(StatsItem itemOK, + StatsItem itemFailed, + StatsItem itemRT) { + MethodStats methodStats = new MethodStats(); + if (itemOK != null) { + methodStats.setQpsOK(itemOK.getStatsDataInMinute().getTps()); + } + if (itemFailed != null) { + methodStats.setFailedTimesInMinutes(itemFailed.getStatsDataInMinute().getSum()); + } + if (itemRT != null) { + methodStats.setRtAvgInMinutes(itemRT.getStatsDataInMinute().getAvgpt()); + methodStats.setRtMaxInMinutes(itemRT.getValueMaxInMinutes().get()); + methodStats.setRtMaxIn10Minutes(itemRT.getValueMaxIn10Minutes().get()); + methodStats.setRtMaxInHour(itemRT.getValueMaxInHour().get()); + methodStats.setRtRegion(itemRT.valueRegion()); + } + return methodStats; + } + + public void addCallerOKQPSValue(final String statsKey, final int incValue, final int incTimes) { + this.callerOKQPS.addValue(statsKey, incValue, incTimes); + } + + public void addCallerFailedQPSValue(final String statsKey, final int incValue, final int incTimes) { + this.callerFailedQPS.addValue(statsKey, incValue, incTimes); + } + + public void addCallerRTValue(final String statsKey, final int incValue, final int incTimes) { + this.callerRT.addValue(statsKey, incValue, incTimes); + } + + public void addProviderOKQPSValue(final String statsKey, final int incValue, final int incTimes) { + this.providerOKQPS.addValue(statsKey, incValue, incTimes); + } + + public void addProviderFailedQPSValue(final String statsKey, final int incValue, final int incTimes) { + this.providerFailedQPS.addValue(statsKey, incValue, incTimes); + } + + public void addProviderRTValue(final String statsKey, final int incValue, final int incTimes) { + this.providerRT.addValue(statsKey, incValue, incTimes); + } + + public void start() { + this.callerOKQPS.init(); + this.callerFailedQPS.init(); + this.callerRT.init(); + this.providerOKQPS.init(); + this.providerFailedQPS.init(); + this.providerRT.init(); + } + + public void stop() { + ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 3000, TimeUnit.MILLISECONDS); + } + + StatsAll stats() { + StatsAll sa = new StatsAll(); + Set<String> keySetCaller = this.callerOKQPS.getStatsItemTable().keySet(); + Set<String> keySetProvider = this.providerOKQPS.getStatsItemTable().keySet(); + + for (String statsKey : keySetCaller) { + MethodStats caller = methodStats(this.callerOKQPS.getStatsItemTable().get(statsKey), + this.callerFailedQPS.getStatsItemTable().get(statsKey), + this.callerRT.getStatsItemTable().get(statsKey) + ); + sa.getStatsCaller().put(statsKey, caller); + } + + for (String statsKey : keySetProvider) { + MethodStats provider = methodStats(this.providerOKQPS.getStatsItemTable().get(statsKey), + this.providerFailedQPS.getStatsItemTable().get(statsKey), + this.providerRT.getStatsItemTable().get(statsKey) + ); + sa.getStatsProvider().put(statsKey, provider); + } + return sa; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StackTracesAll.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StackTracesAll.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StackTracesAll.java new file mode 100644 index 0000000..1057187 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StackTracesAll.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +public class StackTracesAll { + private String stackTraces; + + public String getStackTraces() { + return stackTraces; + } + + public void setStackTraces(String stackTraces) { + this.stackTraces = stackTraces; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsAll.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsAll.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsAll.java new file mode 100644 index 0000000..7a40285 --- /dev/null +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/metrics/StatsAll.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.rpc.impl.metrics; + +import java.util.TreeMap; + +public class StatsAll { + private TreeMap<String/* request code */, MethodStats> statsCaller = new TreeMap<String, MethodStats>(); + private TreeMap<String/* request code */, MethodStats> statsProvider = new TreeMap<String, MethodStats>(); + private TreeMap<Threading, TimestampRegion> statsThreading = new TreeMap<Threading, TimestampRegion>(); + + public TreeMap<String, MethodStats> getStatsCaller() { + return statsCaller; + } + + public void setStatsCaller(TreeMap<String, MethodStats> statsCaller) { + this.statsCaller = statsCaller; + } + + public TreeMap<String, MethodStats> getStatsProvider() { + return statsProvider; + } + + public void setStatsProvider(TreeMap<String, MethodStats> statsProvider) { + this.statsProvider = statsProvider; + } + + public TreeMap<Threading, TimestampRegion> getStatsThreading() { + return statsThreading; + } + + public void setStatsThreading(TreeMap<Threading, TimestampRegion> statsThreading) { + this.statsThreading = statsThreading; + } + + @Override + public String toString() { + return "StatsAll{" + + "statsCaller=" + statsCaller + + ", statsProvider=" + statsProvider + + ", statsThreading=" + statsThreading + + '}'; + } + +}