http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java new file mode 100644 index 0000000..7ef01db --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.api.exception; + +/** + * Handy class for wrapping runtime {@code Exceptions} with a root cause. + * + * <p>This class is {@code abstract} to force the programmer to extend + * the class. {@code getMessage} will include nested exception + * information; {@code getRootCause} will include the innermost cause of + * this exception, if any; {@code printStackTrace} and other like methods will + * delegate to the wrapped exception, if any. + * + * @since 1.0.0 + */ +public abstract class NestedRuntimeException extends RuntimeException { + private static final long serialVersionUID = -8371779880133933367L; + + /** + * Construct a {@code NestedRuntimeException} with the specified detail message. + * + * @param msg the detail message + */ + public NestedRuntimeException(String msg) { + super(msg); + } + + /** + * Construct a {@code NestedRuntimeException} with the specified detail message + * and nested exception. + * + * @param msg the detail message + * @param cause the nested exception + */ + public NestedRuntimeException(String msg, Throwable cause) { + super(msg, cause); + } + + /** + * Build a message for the given base message and root cause. + * + * @param message the base message + * @param cause the root cause + * @return the full exception message + */ + private static String getMessageWithCause(String message, Throwable cause) { + if (cause != null) { + StringBuilder sb = new StringBuilder(); + if (message != null) { + sb.append(message).append("; "); + } + sb.append("nested exception is ").append(cause); + return sb.toString(); + } else { + return message; + } + } + + /** + * Return the detail message, including the message from the nested exception + * if there is one. + */ + @Override + public String getMessage() { + return getMessageWithCause(super.getMessage(), getCause()); + } + + /** + * Retrieve the innermost cause of this exception, if any. + * + * @return the innermost exception, or {@code null} if none + */ + public Throwable getRootCause() { + Throwable rootCause = null; + Throwable cause = getCause(); + while (cause != null && cause != rootCause) { + rootCause = cause; + cause = cause.getCause(); + } + return rootCause; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java new file mode 100644 index 0000000..6ce6dd4 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.exception; + +/** + * Generic remote access exception. A service proxy for any remoting + * protocol should throw this exception or subclasses of it, in order + * to transparently expose a plain Java business interface. + * + * <p>A client may catch RemoteAccessException if it wants to, but as + * remote access errors are typically unrecoverable, it will probably let + * such exceptions propagate to a higher level that handles them generically. + * In this case, the client opCode doesn't show any signs of being involved in + * remote access, as there aren't any remoting-specific dependencies. + * + * @since 1.0.0 + */ +public class RemoteAccessException extends NestedRuntimeException { + private static final long serialVersionUID = 6280428909532427263L; + + /** + * Constructor for RemoteAccessException with the specified detail message. + * + * @param msg the detail message + */ + public RemoteAccessException(String msg) { + super(msg); + } + + /** + * Constructor for RemoteAccessException with the specified detail message + * and nested exception. + * + * @param msg the detail message + * @param cause the root cause (usually from using an underlying + * remoting API such as RMI) + */ + public RemoteAccessException(String msg, Throwable cause) { + super(msg, cause); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteCodecException.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteCodecException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteCodecException.java new file mode 100644 index 0000000..a8b9e4e --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteCodecException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.exception; + +/** + * @since 1.0.0 + */ +public class RemoteCodecException extends RemoteAccessException { + private static final long serialVersionUID = -7597014042746200543L; + + public RemoteCodecException(String msg) { + super(msg); + } + + public RemoteCodecException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteConnectFailureException.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteConnectFailureException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteConnectFailureException.java new file mode 100644 index 0000000..af0a6e9 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteConnectFailureException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.exception; + +/** + * RemoteConnectFailureException will be thrown when connection + * could not be established with a remote service. + * + * @since 1.0.0 + */ +public class RemoteConnectFailureException extends RemoteAccessException { + private static final long serialVersionUID = -5565366231695911316L; + + /** + * Constructor for RemoteConnectFailureException with the specified detail message + * and nested exception. + * + * @param msg the detail message + * @param cause the root cause from the remoting API in use + */ + public RemoteConnectFailureException(String msg, Throwable cause) { + super(msg, cause); + } + + /** + * Constructor for RemoteConnectFailureException with the specified detail message. + * + * @param msg the detail message + */ + public RemoteConnectFailureException(String msg) { + super(msg); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteTimeoutException.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteTimeoutException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteTimeoutException.java new file mode 100644 index 0000000..adfcc8d --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteTimeoutException.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.api.exception; + +/** + * RemoteTimeoutException will be thrown when the execution + * of the target method did not complete before a configurable + * timeout, for example when a reply message was not received. + * + * @since 1.0.0 + */ +public class RemoteTimeoutException extends RemoteAccessException { + private static final long serialVersionUID = 8710772392914461626L; + + /** + * Constructor for RemoteTimeoutException with the specified detail message,configurable timeout. + * + * @param msg the detail message + * @param timeoutMillis configurable timeout + */ + public RemoteTimeoutException(String msg, long timeoutMillis) { + this(msg, timeoutMillis, null); + } + + /** + * Constructor for RemoteTimeoutException with the specified detail message,configurable timeout + * and nested exception.. + * + * @param msg the detail message + * @param timeoutMillis configurable timeout + * @param cause Exception cause + */ + public RemoteTimeoutException(String msg, long timeoutMillis, Throwable cause) { + super(String.format("%s, waiting for %s ms", msg, timeoutMillis), cause); + } + + /** + * Constructor for RemoteTimeoutException with the specified detail message. + * + * @param msg the detail message + */ + public RemoteTimeoutException(String msg) { + super(msg); + } + + /** + * Constructor for RemoteTimeoutException with the specified detail message + * and nested exception. + * + * @param msg the detail message + * @param cause the root cause from the remoting API in use + */ + public RemoteTimeoutException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java new file mode 100644 index 0000000..2452309 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.interceptor; + +import org.apache.rocketmq.remoting.api.RemotingEndPoint; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; + +public class ExceptionContext extends RequestContext { + private Throwable exception; + private String remark; + + public ExceptionContext(RemotingEndPoint remotingEndPoint, String remoteAddr, RemotingCommand request, + Throwable exception, String remark) { + super(remotingEndPoint, remoteAddr, request); + this.remotingEndPoint = remotingEndPoint; + this.remoteAddr = remoteAddr; + this.request = request; + this.exception = exception; + this.remark = remark; + } + + public RemotingEndPoint getRemotingEndPoint() { + return remotingEndPoint; + } + + public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) { + this.remotingEndPoint = remotingEndPoint; + } + + public String getRemoteAddr() { + return remoteAddr; + } + + public void setRemoteAddr(String remoteAddr) { + this.remoteAddr = remoteAddr; + } + + public RemotingCommand getRequest() { + return request; + } + + public void setRequest(RemotingCommand request) { + this.request = request; + } + + public Throwable getException() { + return exception; + } + + public void setException(Throwable exception) { + this.exception = exception; + } + + public String getRemark() { + return remark; + } + + public void setRemark(String remark) { + this.remark = remark; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java new file mode 100644 index 0000000..62257ef --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.interceptor; + +public interface Interceptor { + void beforeRequest(final RequestContext context); + + void afterResponseReceived(final ResponseContext context); + + void onException(final ExceptionContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java new file mode 100644 index 0000000..9ffc696 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.interceptor; + +import java.util.ArrayList; +import java.util.List; + +public class InterceptorGroup { + private final List<Interceptor> interceptors = new ArrayList<Interceptor>(); + + public void registerInterceptor(final Interceptor interceptor) { + if (interceptor != null) { + this.interceptors.add(interceptor); + } + } + + public void beforeRequest(final RequestContext context) { + for (Interceptor interceptor : interceptors) { + interceptor.beforeRequest(context); + } + } + + public void afterResponseReceived(final ResponseContext context) { + for (Interceptor interceptor : interceptors) { + interceptor.afterResponseReceived(context); + } + } + + public void onException(final ExceptionContext context) { + for (Interceptor interceptor : interceptors) { + interceptor.onException(context); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java new file mode 100644 index 0000000..d961556 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.interceptor; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.rocketmq.remoting.api.RemotingEndPoint; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; + +public class RequestContext { + protected RemotingEndPoint remotingEndPoint; + protected String remoteAddr; + protected RemotingCommand request; + + public RequestContext(RemotingEndPoint remotingEndPoint, String remoteAddr, RemotingCommand request) { + super(); + this.remotingEndPoint = remotingEndPoint; + this.remoteAddr = remoteAddr; + this.request = request; + } + + public RemotingEndPoint getRemotingEndPoint() { + return remotingEndPoint; + } + + public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) { + this.remotingEndPoint = remotingEndPoint; + } + + public String getRemoteAddr() { + return remoteAddr; + } + + public void setRemoteAddr(String remoteAddr) { + this.remoteAddr = remoteAddr; + } + + public RemotingCommand getRequest() { + return request; + } + + public void setRequest(RemotingCommand request) { + this.request = request; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java new file mode 100644 index 0000000..97ec2e6 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.interceptor; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.rocketmq.remoting.api.RemotingEndPoint; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; + +public class ResponseContext extends RequestContext { + private RemotingCommand response; + + public ResponseContext(RemotingEndPoint remotingEndPoint, String remoteAddr, RemotingCommand request, + RemotingCommand response) { + super(remotingEndPoint, remoteAddr, request); + this.remotingEndPoint = remotingEndPoint; + this.remoteAddr = remoteAddr; + this.request = request; + this.response = response; + } + + public RemotingEndPoint getRemotingEndPoint() { + return remotingEndPoint; + } + + public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) { + this.remotingEndPoint = remotingEndPoint; + } + + public String getRemoteAddr() { + return remoteAddr; + } + + public void setRemoteAddr(String remoteAddr) { + this.remoteAddr = remoteAddr; + } + + public RemotingCommand getRequest() { + return request; + } + + public void setRequest(RemotingCommand request) { + this.request = request; + } + + public RemotingCommand getResponse() { + return response; + } + + public void setResponse(RemotingCommand response) { + this.response = response; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java new file mode 100644 index 0000000..5caf167 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/Protocol.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.protocol; + +import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper; + +public interface Protocol { + /** + * Minimum Viable Protocol + */ + String MVP = "mvp"; + String HTTP2 = "http2"; + String WEBSOCKET = "websocket"; + + byte MVP_MAGIC = 0x14; + byte WEBSOCKET_MAGIC = 0x15; + byte HTTP_2_MAGIC = 0x16; + + String name(); + + byte type(); + + void assembleHandler(ChannelHandlerContextWrapper ctx); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java new file mode 100644 index 0000000..cf016f9 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/protocol/ProtocolFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.protocol; + +public interface ProtocolFactory { + void register(Protocol protocol); + + void resetAll(Protocol protocol); + + byte type(String protocolName); + + Protocol get(byte type); + + void clearAll(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java new file mode 100644 index 0000000..8ef8dcd --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/Serializer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.serializable; + +import java.lang.reflect.Type; +import java.nio.ByteBuffer; +import org.apache.rocketmq.remoting.common.TypePresentation; + +public interface Serializer { + String name(); + + byte type(); + + <T> T decode(final byte[] content, final Class<T> c); + + <T> T decode(final byte[] content, final TypePresentation<T> typePresentation); + + <T> T decode(final byte[] content, final Type type); + + ByteBuffer encode(final Object object); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java new file mode 100644 index 0000000..b47bf99 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/serializable/SerializerFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.serializable; + +public interface SerializerFactory { + void register(Serializer serialization); + + byte type(String serializationName); + + Serializer get(byte type); + + void clearAll(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/Pair.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/Pair.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/Pair.java new file mode 100644 index 0000000..505e104 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/Pair.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.common; + +public class Pair<L, R> { + private L left; + private R right; + + public Pair(L left, R right) { + this.left = left; + this.right = right; + } + + public L getLeft() { + return left; + } + + public void setLeft(L left) { + this.left = left; + } + + public R getRight() { + return right; + } + + public void setRight(R right) { + this.right = right; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java new file mode 100644 index 0000000..ef3d5f8 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.common; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Represents a generic type {@code T}. Java doesn't yet provide a way to + * represent generic types, so this class does. Forces clients to create a + * subclass of this class which enables retrieval the type information even at + * runtime. + * + * <p>For example, to create a type literal for {@code List<String>}, you can + * create an empty anonymous inner class: + * + * <pre> + * TypePresentation<List<String>> list = new TypePresentation<List<String>>() {}; + * </pre> + * + * To create a type literal for {@code Map<String, Integer>}: + * + * <pre> + * TypePresentation<Map<String, Integer>> map = new TypePresentation<Map<String, Integer>>() {}; + * </pre> + * + * This syntax cannot be used to create type literals that have wildcard + * parameters, such as {@code Class<?>} or {@code List<? extends CharSequence>}. + * + * @since 1.0.0 + */ +public class TypePresentation<T> { + static ConcurrentMap<Class<?>, ConcurrentMap<Type, ConcurrentMap<Type, Type>>> classTypeCache + = new ConcurrentHashMap<Class<?>, ConcurrentMap<Type, ConcurrentMap<Type, Type>>>(16, 0.75f, 1); + protected final Type type; + + /** + * Constructs a new type literal. Derives represented class from type + * parameter. + * + * <p>Clients create an empty anonymous subclass. Doing so embeds the type + * parameter in the anonymous class's type hierarchy so we can reconstitute it + * at runtime despite erasure. + */ + protected TypePresentation() { + Type superClass = getClass().getGenericSuperclass(); + type = ((ParameterizedType) superClass).getActualTypeArguments()[0]; + } + + /** + * @return underlying {@code Type} instance. + */ + public Type getType() { + return type; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/pom.xml ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/pom.xml b/remoting-core/remoting-impl/pom.xml new file mode 100644 index 0000000..53d1854 --- /dev/null +++ b/remoting-core/remoting-impl/pom.xml @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.rocketmq</groupId> + <artifactId>remoting-core</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>remoting-impl</artifactId> + + <dependencies> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> + <dependency> + <groupId>com.alibaba</groupId> + <artifactId>fastjson</artifactId> + </dependency> + <dependency> + <groupId>org.msgpack</groupId> + <artifactId>msgpack</artifactId> + </dependency> + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative-boringssl-static</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>remoting-api</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java new file mode 100644 index 0000000..8af61f7 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.common; + +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.remoting.api.channel.ChannelEventListener; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; + +public class ChannelEventListenerGroup { + private final List<ChannelEventListener> listenerList = new ArrayList<ChannelEventListener>(); + + public int size() { + return this.listenerList.size(); + } + + public void registerChannelEventListener(final ChannelEventListener listener) { + if (listener != null) { + this.listenerList.add(listener); + } + } + + public void onChannelConnect(final RemotingChannel channel) { + for (ChannelEventListener listener : listenerList) { + listener.onChannelConnect(channel); + } + } + + public void onChannelClose(final RemotingChannel channel) { + for (ChannelEventListener listener : listenerList) { + listener.onChannelClose(channel); + } + } + + public void onChannelException(final RemotingChannel channel) { + for (ChannelEventListener listener : listenerList) { + listener.onChannelException(channel); + } + } + + public void onChannelIdle(final RemotingChannel channel) { + for (ChannelEventListener listener : listenerList) { + listener.onChannelIdle(channel); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java new file mode 100644 index 0000000..d5c0aaa --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/RemotingCommandFactoryMeta.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.common; + +import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory; +import org.apache.rocketmq.remoting.api.serializable.SerializerFactory; +import org.apache.rocketmq.remoting.impl.protocol.Httpv2Protocol; +import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl; +import org.apache.rocketmq.remoting.impl.protocol.serializer.MsgPackSerializer; +import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl; + +public class RemotingCommandFactoryMeta { + private final ProtocolFactory protocolFactory = new ProtocolFactoryImpl(); + private final SerializerFactory serializerFactory = new SerializerFactoryImpl(); + private byte protocolType = Httpv2Protocol.MVP_MAGIC; + private byte serializeType = MsgPackSerializer.SERIALIZER_TYPE; + + public RemotingCommandFactoryMeta() { + } + + public RemotingCommandFactoryMeta(String protocolName, String serializeName) { + this.protocolType = protocolFactory.type(protocolName); + this.serializeType = serializerFactory.type(serializeName); + } + + public byte getSerializeType() { + return serializeType; + } + + public byte getProtocolType() { + return protocolType; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java new file mode 100644 index 0000000..2557cdf --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.common; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.RemotingEndPoint; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext; +import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup; +import org.apache.rocketmq.remoting.api.interceptor.ResponseContext; + +public class ResponseResult { + private final long beginTimestamp = System.currentTimeMillis(); + private final CountDownLatch countDownLatch = new CountDownLatch(1); + private final AtomicBoolean interceptorExecuted = new AtomicBoolean(false); + + private int requestId; + private long timeoutMillis; + private AsyncHandler asyncHandler; + + private volatile RemotingCommand responseCommand; + private volatile boolean sendRequestOK = true; + private volatile Throwable cause; + private SemaphoreReleaseOnlyOnce once; + + private RemotingCommand requestCommand; + private InterceptorGroup interceptorGroup; + private String remoteAddr; + + public ResponseResult(int requestId, long timeoutMillis, AsyncHandler asyncHandler, SemaphoreReleaseOnlyOnce once) { + this.requestId = requestId; + this.timeoutMillis = timeoutMillis; + this.asyncHandler = asyncHandler; + this.once = once; + } + + public ResponseResult(int requestId, long timeoutMillis) { + this.requestId = requestId; + this.timeoutMillis = timeoutMillis; + } + + public void executeRequestSendFailed() { + if (this.interceptorExecuted.compareAndSet(false, true)) { + try { + interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand, + cause, "REQUEST_SEND_FAILED")); + } catch (Throwable e) { + } + //Sync call + if (null != asyncHandler) { + asyncHandler.onFailure(requestCommand); + } + } + } + + public void executeCallbackArrived(final RemotingCommand response) { + if (this.interceptorExecuted.compareAndSet(false, true)) { + try { + interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, this.remoteAddr, + this.requestCommand, response)); + } catch (Throwable e) { + } + if (null != asyncHandler) { + asyncHandler.onSuccess(response); + } + } + } + + public void onTimeout(long costTimeMillis, long timoutMillis) { + if (this.interceptorExecuted.compareAndSet(false, true)) { + try { + interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand, + null, "CALLBACK_TIMEOUT")); + } catch (Throwable e) { + } + if (null != asyncHandler) { + asyncHandler.onTimeout(costTimeMillis, timoutMillis); + } + } + } + + public void release() { + if (this.once != null) { + this.once.release(); + } + } + + public RemotingCommand waitResponse(final long timeoutMillis) { + try { + this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return this.responseCommand; + } + + public void putResponse(final RemotingCommand responseCommand) { + this.responseCommand = responseCommand; + this.countDownLatch.countDown(); + } + + public long getBeginTimestamp() { + return beginTimestamp; + } + + public boolean isSendRequestOK() { + return sendRequestOK; + } + + public void setSendRequestOK(boolean sendRequestOK) { + this.sendRequestOK = sendRequestOK; + } + + public long getTimeoutMillis() { + return timeoutMillis; + } + + public AsyncHandler getAsyncHandler() { + return asyncHandler; + } + + public Throwable getCause() { + return cause; + } + + public void setCause(Throwable cause) { + this.cause = cause; + } + + public RemotingCommand getResponseCommand() { + return responseCommand; + } + + public void setResponseCommand(RemotingCommand responseCommand) { + this.responseCommand = responseCommand; + } + + public int getRequestId() { + return requestId; + } + + public RemotingCommand getRequestCommand() { + return requestCommand; + } + + public void setRequestCommand(RemotingCommand requestCommand) { + this.requestCommand = requestCommand; + } + + public InterceptorGroup getInterceptorGroup() { + return interceptorGroup; + } + + public void setInterceptorGroup(InterceptorGroup interceptorGroup) { + this.interceptorGroup = interceptorGroup; + } + + public String getRemoteAddr() { + return remoteAddr; + } + + public void setRemoteAddr(String remoteAddr) { + this.remoteAddr = remoteAddr; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java new file mode 100644 index 0000000..1c5849b --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.common; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SemaphoreReleaseOnlyOnce { + private final AtomicBoolean released = new AtomicBoolean(false); + private final Semaphore semaphore; + + public SemaphoreReleaseOnlyOnce(Semaphore semaphore) { + this.semaphore = semaphore; + } + + public void release() { + if (this.released.compareAndSet(false, true)) { + this.semaphore.release(); + } + } + + public Semaphore getSemaphore() { + return semaphore; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java new file mode 100755 index 0000000..db959b7 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.common.metrics; + +import io.netty.channel.group.ChannelGroup; + +public interface ChannelMetrics { + + Integer getChannelCount(); + + ChannelGroup getChannels(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java new file mode 100644 index 0000000..b330041 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.config; + +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.rocketmq.remoting.api.protocol.Protocol; +import org.apache.rocketmq.remoting.impl.protocol.compression.GZipCompressor; +import org.apache.rocketmq.remoting.impl.protocol.serializer.MsgPackSerializer; + +public class RemotingConfig extends TcpSocketConfig { + private int connectionMaxRetries = 3; + private int connectionChannelReaderIdleSeconds = 0; + private int connectionChannelWriterIdleSeconds = 0; + /** + * IdleStateEvent will be triggered when neither read nor write was + * performed for the specified period of this time. Specify {@code 0} to + * disable + */ + private int connectionChannelIdleSeconds = 120; + private int writeBufLowWaterMark = 32 * 10240; + private int writeBufHighWaterMark = 64 * 10240; + private int threadTaskLowWaterMark = 30000; + private int threadTaskHighWaterMark = 50000; + private int connectionRetryBackoffMillis = 3000; + private String protocolName = Protocol.MVP; + private String serializerName = MsgPackSerializer.SERIALIZER_NAME; + private String compressorName = GZipCompressor.COMPRESSOR_NAME; + private int serviceThreadBlockQueueSize = 50000; + private boolean clientNativeEpollEnable = false; + private int clientWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int clientConnectionFutureAwaitTimeoutMillis = 30000; + private int clientAsyncCallbackExecutorThreads = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int clientOnewayInvokeSemaphore = 20480; + + //=============Server configuration================== + private int clientAsyncInvokeSemaphore = 20480; + private boolean clientPooledBytebufAllocatorEnable = false; + private boolean clientCloseSocketIfTimeout = true; + private boolean clientShortConnectionEnable = false; + private long clientPublishServiceTimeout = 10000; + private long clientConsumerServiceTimeout = 10000; + private long clientInvokeServiceTimeout = 10000; + private int clientMaxRetryCount = 10; + private int clientSleepBeforeRetry = 100; + private int serverListenPort = 8888; + /** + * If server only listened 1 port,recommend to set the value to 1 + */ + private int serverAcceptorThreads = 1; + private int serverIoThreads = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int serverWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int serverOnewayInvokeSemaphore = 256; + private int serverAsyncInvokeSemaphore = 6400; + private boolean serverNativeEpollEnable = false; + private int serverAsyncCallbackExecutorThreads = Runtime.getRuntime().availableProcessors() * 2; + private boolean serverPooledBytebufAllocatorEnable = true; + private boolean serverAuthOpenEnable = true; + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } + + public int getConnectionMaxRetries() { + return connectionMaxRetries; + } + + public void setConnectionMaxRetries(final int connectionMaxRetries) { + this.connectionMaxRetries = connectionMaxRetries; + } + + public int getConnectionChannelReaderIdleSeconds() { + return connectionChannelReaderIdleSeconds; + } + + public void setConnectionChannelReaderIdleSeconds(final int connectionChannelReaderIdleSeconds) { + this.connectionChannelReaderIdleSeconds = connectionChannelReaderIdleSeconds; + } + + public int getConnectionChannelWriterIdleSeconds() { + return connectionChannelWriterIdleSeconds; + } + + public void setConnectionChannelWriterIdleSeconds(final int connectionChannelWriterIdleSeconds) { + this.connectionChannelWriterIdleSeconds = connectionChannelWriterIdleSeconds; + } + + public int getConnectionChannelIdleSeconds() { + return connectionChannelIdleSeconds; + } + + public void setConnectionChannelIdleSeconds(final int connectionChannelIdleSeconds) { + this.connectionChannelIdleSeconds = connectionChannelIdleSeconds; + } + + public int getWriteBufLowWaterMark() { + return writeBufLowWaterMark; + } + + public void setWriteBufLowWaterMark(final int writeBufLowWaterMark) { + this.writeBufLowWaterMark = writeBufLowWaterMark; + } + + public int getWriteBufHighWaterMark() { + return writeBufHighWaterMark; + } + + public void setWriteBufHighWaterMark(final int writeBufHighWaterMark) { + this.writeBufHighWaterMark = writeBufHighWaterMark; + } + + public int getThreadTaskLowWaterMark() { + return threadTaskLowWaterMark; + } + + public void setThreadTaskLowWaterMark(final int threadTaskLowWaterMark) { + this.threadTaskLowWaterMark = threadTaskLowWaterMark; + } + + public int getThreadTaskHighWaterMark() { + return threadTaskHighWaterMark; + } + + public void setThreadTaskHighWaterMark(final int threadTaskHighWaterMark) { + this.threadTaskHighWaterMark = threadTaskHighWaterMark; + } + + public int getConnectionRetryBackoffMillis() { + return connectionRetryBackoffMillis; + } + + public void setConnectionRetryBackoffMillis(final int connectionRetryBackoffMillis) { + this.connectionRetryBackoffMillis = connectionRetryBackoffMillis; + } + + public String getProtocolName() { + return protocolName; + } + + public void setProtocolName(final String protocolName) { + this.protocolName = protocolName; + } + + public String getSerializerName() { + return serializerName; + } + + public void setSerializerName(final String serializerName) { + this.serializerName = serializerName; + } + + public String getCompressorName() { + return compressorName; + } + + public void setCompressorName(final String compressorName) { + this.compressorName = compressorName; + } + + public int getServiceThreadBlockQueueSize() { + return serviceThreadBlockQueueSize; + } + + public void setServiceThreadBlockQueueSize(final int serviceThreadBlockQueueSize) { + this.serviceThreadBlockQueueSize = serviceThreadBlockQueueSize; + } + + public boolean isClientNativeEpollEnable() { + return clientNativeEpollEnable; + } + + public void setClientNativeEpollEnable(final boolean clientNativeEpollEnable) { + this.clientNativeEpollEnable = clientNativeEpollEnable; + } + + public int getClientWorkerThreads() { + return clientWorkerThreads; + } + + public void setClientWorkerThreads(final int clientWorkerThreads) { + this.clientWorkerThreads = clientWorkerThreads; + } + + public int getClientConnectionFutureAwaitTimeoutMillis() { + return clientConnectionFutureAwaitTimeoutMillis; + } + + public void setClientConnectionFutureAwaitTimeoutMillis(final int clientConnectionFutureAwaitTimeoutMillis) { + this.clientConnectionFutureAwaitTimeoutMillis = clientConnectionFutureAwaitTimeoutMillis; + } + + public int getClientAsyncCallbackExecutorThreads() { + return clientAsyncCallbackExecutorThreads; + } + + public void setClientAsyncCallbackExecutorThreads(final int clientAsyncCallbackExecutorThreads) { + this.clientAsyncCallbackExecutorThreads = clientAsyncCallbackExecutorThreads; + } + + public int getClientOnewayInvokeSemaphore() { + return clientOnewayInvokeSemaphore; + } + + public void setClientOnewayInvokeSemaphore(final int clientOnewayInvokeSemaphore) { + this.clientOnewayInvokeSemaphore = clientOnewayInvokeSemaphore; + } + + public int getClientAsyncInvokeSemaphore() { + return clientAsyncInvokeSemaphore; + } + + public void setClientAsyncInvokeSemaphore(final int clientAsyncInvokeSemaphore) { + this.clientAsyncInvokeSemaphore = clientAsyncInvokeSemaphore; + } + + public boolean isClientPooledBytebufAllocatorEnable() { + return clientPooledBytebufAllocatorEnable; + } + + public void setClientPooledBytebufAllocatorEnable(final boolean clientPooledBytebufAllocatorEnable) { + this.clientPooledBytebufAllocatorEnable = clientPooledBytebufAllocatorEnable; + } + + public boolean isClientCloseSocketIfTimeout() { + return clientCloseSocketIfTimeout; + } + + public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) { + this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout; + } + + public boolean isClientShortConnectionEnable() { + return clientShortConnectionEnable; + } + + public void setClientShortConnectionEnable(final boolean clientShortConnectionEnable) { + this.clientShortConnectionEnable = clientShortConnectionEnable; + } + + public long getClientPublishServiceTimeout() { + return clientPublishServiceTimeout; + } + + public void setClientPublishServiceTimeout(final long clientPublishServiceTimeout) { + this.clientPublishServiceTimeout = clientPublishServiceTimeout; + } + + public long getClientConsumerServiceTimeout() { + return clientConsumerServiceTimeout; + } + + public void setClientConsumerServiceTimeout(final long clientConsumerServiceTimeout) { + this.clientConsumerServiceTimeout = clientConsumerServiceTimeout; + } + + public long getClientInvokeServiceTimeout() { + return clientInvokeServiceTimeout; + } + + public void setClientInvokeServiceTimeout(final long clientInvokeServiceTimeout) { + this.clientInvokeServiceTimeout = clientInvokeServiceTimeout; + } + + public int getClientMaxRetryCount() { + return clientMaxRetryCount; + } + + public void setClientMaxRetryCount(final int clientMaxRetryCount) { + this.clientMaxRetryCount = clientMaxRetryCount; + } + + public int getClientSleepBeforeRetry() { + return clientSleepBeforeRetry; + } + + public void setClientSleepBeforeRetry(final int clientSleepBeforeRetry) { + this.clientSleepBeforeRetry = clientSleepBeforeRetry; + } + + public int getServerListenPort() { + return serverListenPort; + } + + public void setServerListenPort(final int serverListenPort) { + this.serverListenPort = serverListenPort; + } + + public int getServerAcceptorThreads() { + return serverAcceptorThreads; + } + + public void setServerAcceptorThreads(final int serverAcceptorThreads) { + this.serverAcceptorThreads = serverAcceptorThreads; + } + + public int getServerIoThreads() { + return serverIoThreads; + } + + public void setServerIoThreads(final int serverIoThreads) { + this.serverIoThreads = serverIoThreads; + } + + public int getServerWorkerThreads() { + return serverWorkerThreads; + } + + public void setServerWorkerThreads(final int serverWorkerThreads) { + this.serverWorkerThreads = serverWorkerThreads; + } + + public int getServerOnewayInvokeSemaphore() { + return serverOnewayInvokeSemaphore; + } + + public void setServerOnewayInvokeSemaphore(final int serverOnewayInvokeSemaphore) { + this.serverOnewayInvokeSemaphore = serverOnewayInvokeSemaphore; + } + + public int getServerAsyncInvokeSemaphore() { + return serverAsyncInvokeSemaphore; + } + + public void setServerAsyncInvokeSemaphore(final int serverAsyncInvokeSemaphore) { + this.serverAsyncInvokeSemaphore = serverAsyncInvokeSemaphore; + } + + public boolean isServerNativeEpollEnable() { + return serverNativeEpollEnable; + } + + public void setServerNativeEpollEnable(final boolean serverNativeEpollEnable) { + this.serverNativeEpollEnable = serverNativeEpollEnable; + } + + public int getServerAsyncCallbackExecutorThreads() { + return serverAsyncCallbackExecutorThreads; + } + + public void setServerAsyncCallbackExecutorThreads(final int serverAsyncCallbackExecutorThreads) { + this.serverAsyncCallbackExecutorThreads = serverAsyncCallbackExecutorThreads; + } + + public boolean isServerPooledBytebufAllocatorEnable() { + return serverPooledBytebufAllocatorEnable; + } + + public void setServerPooledBytebufAllocatorEnable(final boolean serverPooledBytebufAllocatorEnable) { + this.serverPooledBytebufAllocatorEnable = serverPooledBytebufAllocatorEnable; + } + + public boolean isServerAuthOpenEnable() { + return serverAuthOpenEnable; + } + + public void setServerAuthOpenEnable(final boolean serverAuthOpenEnable) { + this.serverAuthOpenEnable = serverAuthOpenEnable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java new file mode 100755 index 0000000..4dfcde7 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/TcpSocketConfig.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.config; + +/** + * TCP socket configuration + * + * @see java.net.SocketOptions + */ +public class TcpSocketConfig { + private boolean tcpSoReuseAddress; + private boolean tcpSoKeepAlive; + private boolean tcpSoNoDelay; + private int tcpSoSndBufSize; // see /proc/sys/net/ipv4/tcp_rmem + private int tcpSoRcvBufSize; // see /proc/sys/net/ipv4/tcp_wmem + private int tcpSoBacklogSize; + private int tcpSoLinger; + private int tcpSoTimeout; + + public boolean isTcpSoReuseAddress() { + return tcpSoReuseAddress; + } + + public void setTcpSoReuseAddress(final boolean tcpSoReuseAddress) { + this.tcpSoReuseAddress = tcpSoReuseAddress; + } + + public boolean isTcpSoKeepAlive() { + return tcpSoKeepAlive; + } + + public void setTcpSoKeepAlive(final boolean tcpSoKeepAlive) { + this.tcpSoKeepAlive = tcpSoKeepAlive; + } + + public boolean isTcpSoNoDelay() { + return tcpSoNoDelay; + } + + public void setTcpSoNoDelay(final boolean tcpSoNoDelay) { + this.tcpSoNoDelay = tcpSoNoDelay; + } + + public int getTcpSoSndBufSize() { + return tcpSoSndBufSize; + } + + public void setTcpSoSndBufSize(final int tcpSoSndBufSize) { + this.tcpSoSndBufSize = tcpSoSndBufSize; + } + + public int getTcpSoRcvBufSize() { + return tcpSoRcvBufSize; + } + + public void setTcpSoRcvBufSize(final int tcpSoRcvBufSize) { + this.tcpSoRcvBufSize = tcpSoRcvBufSize; + } + + public int getTcpSoBacklogSize() { + return tcpSoBacklogSize; + } + + public void setTcpSoBacklogSize(final int tcpSoBacklogSize) { + this.tcpSoBacklogSize = tcpSoBacklogSize; + } + + public int getTcpSoLinger() { + return tcpSoLinger; + } + + public void setTcpSoLinger(final int tcpSoLinger) { + this.tcpSoLinger = tcpSoLinger; + } + + public int getTcpSoTimeout() { + return tcpSoTimeout; + } + + public void setTcpSoTimeout(final int tcpSoTimeout) { + this.tcpSoTimeout = tcpSoTimeout; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java new file mode 100644 index 0000000..1a80d20 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.external; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ThreadUtils { + private static final Logger LOG = LoggerFactory.getLogger(ThreadUtils.class); + + /** + * A constructor to stop this class being constructed. + */ + private ThreadUtils() { + // Unused + + } + + public static ExecutorService newThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) { + return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon)); + } + + public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) { + return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon)); + } + + public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) { + return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon)); + } + + public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName, + boolean isDaemon) { + return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon)); + } + + public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) { + return newGenericThreadFactory("Remoting-" + processName, isDaemon); + } + + public static ThreadFactory newGenericThreadFactory(String processName) { + return newGenericThreadFactory(processName, false); + } + + public static ThreadFactory newGenericThreadFactory(String processName, int threads) { + return newGenericThreadFactory(processName, threads, false); + } + + public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) { + return new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet())); + thread.setDaemon(isDaemon); + return thread; + } + }; + } + + public static ThreadFactory newGenericThreadFactory(final String processName, final int threads, + final boolean isDaemon) { + return new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet())); + thread.setDaemon(isDaemon); + return thread; + } + }; + } + + /** + * Create a new thread + * + * @param name The name of the thread + * @param runnable The work for the thread to do + * @param daemon Should the thread block JVM stop? + * @return The unstarted thread + */ + public static Thread newThread(String name, Runnable runnable, boolean daemon) { + Thread thread = new Thread(runnable, name); + thread.setDaemon(daemon); + thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Uncaught exception in thread '" + t.getName() + "':", e); + } + }); + return thread; + } + + /** + * Shutdown passed thread using isAlive and join. + * + * @param t Thread to stop + */ + public static void shutdownGracefully(final Thread t) { + shutdownGracefully(t, 0); + } + + /** + * Shutdown passed thread using isAlive and join. + * + * @param millis Pass 0 if we're to wait forever. + * @param t Thread to stop + */ + public static void shutdownGracefully(final Thread t, final long millis) { + if (t == null) + return; + while (t.isAlive()) { + try { + t.interrupt(); + t.join(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * An implementation of the graceful stop sequence recommended by + * {@link ExecutorService}. + * + * @param executor executor + * @param timeout timeout + * @param timeUnit timeUnit + */ + public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) { + // Disable new tasks from being submitted. + executor.shutdown(); + try { + // Wait a while for existing tasks to terminate. + if (!executor + .awaitTermination(timeout, timeUnit)) { + executor.shutdownNow(); + // Wait a while for tasks to respond to being cancelled. + if (!executor.awaitTermination(timeout, timeUnit)) { + LOG.warn(String.format("%s didn't terminate!", executor)); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted. + executor.shutdownNow(); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java new file mode 100644 index 0000000..e17bcfd --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.buffer; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import java.nio.ByteBuffer; +import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper; + +public class NettyByteBufferWrapper implements ByteBufferWrapper { + private final ByteBuf buffer; + private final Channel channel; + + public NettyByteBufferWrapper(ByteBuf buffer) { + this(buffer, null); + } + + public NettyByteBufferWrapper(ByteBuf buffer, Channel channel) { + this.channel = channel; + this.buffer = buffer; + } + + public void writeByte(int index, byte data) { + buffer.writeByte(data); + } + + public void writeByte(byte data) { + buffer.writeByte(data); + } + + public byte readByte() { + return buffer.readByte(); + } + + public void writeInt(int data) { + buffer.writeInt(data); + } + + public void writeBytes(byte[] data) { + buffer.writeBytes(data); + } + + @Override + public void writeBytes(final ByteBuffer data) { + buffer.writeBytes(data); + } + + public int readableBytes() { + return buffer.readableBytes(); + } + + public int readInt() { + return buffer.readInt(); + } + + public void readBytes(byte[] dst) { + buffer.readBytes(dst); + } + + @Override + public void readBytes(final ByteBuffer dst) { + buffer.readBytes(dst); + } + + public int readerIndex() { + return buffer.readerIndex(); + } + + public void setReaderIndex(int index) { + buffer.setIndex(index, buffer.writerIndex()); + } + + @Override + public void writeLong(long value) { + buffer.writeLong(value); + } + + @Override + public long readLong() { + return buffer.readLong(); + } + + @Override + public void ensureCapacity(int capacity) { + buffer.capacity(capacity); + } + + @Override + public short readShort() { + return buffer.readShort(); + } + + @Override + public void writeShort(final short value) { + buffer.writeShort(value); + } +} + +