This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 32ada2487d Fix heartbeat failed if serialization changed (#11512)
32ada2487d is described below

commit 32ada2487d570e675a1811e7a4bd9332373bfe6e
Author: Albumen Kevin <[email protected]>
AuthorDate: Mon Feb 27 20:31:00 2023 +0800

    Fix heartbeat failed if serialization changed (#11512)
    
    fixes #11268
---
 .../dubbo/remoting/exchange/HeartBeatRequest.java  | 33 ++++++++++++++++++++++
 .../dubbo/remoting/exchange/HeartBeatResponse.java | 33 ++++++++++++++++++++++
 .../remoting/exchange/codec/ExchangeCodec.java     | 18 +++++++-----
 .../exchange/support/header/HeartbeatHandler.java  | 10 +++++--
 .../dubbo/rpc/protocol/dubbo/DubboCodec.java       | 22 ++++++++++-----
 5 files changed, 99 insertions(+), 17 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatRequest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatRequest.java
new file mode 100644
index 0000000000..3cacbb9577
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatRequest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.exchange;
+
+public class HeartBeatRequest extends Request {
+    private byte proto;
+
+    public HeartBeatRequest(long id) {
+        super(id);
+    }
+
+    public byte getProto() {
+        return proto;
+    }
+
+    public void setProto(byte proto) {
+        this.proto = proto;
+    }
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatResponse.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatResponse.java
new file mode 100644
index 0000000000..35d4477c49
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/HeartBeatResponse.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.exchange;
+
+public class HeartBeatResponse extends Response{
+    private byte proto;
+
+    public HeartBeatResponse(long id, String version) {
+        super(id, version);
+    }
+
+    public byte getProto() {
+        return proto;
+    }
+
+    public void setProto(byte proto) {
+        this.proto = proto;
+    }
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
index d631030637..9937c573fb 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/codec/ExchangeCodec.java
@@ -32,6 +32,7 @@ import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.buffer.ChannelBuffer;
 import org.apache.dubbo.remoting.buffer.ChannelBufferInputStream;
 import org.apache.dubbo.remoting.buffer.ChannelBufferOutputStream;
+import org.apache.dubbo.remoting.exchange.HeartBeatRequest;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
@@ -184,31 +185,34 @@ public class ExchangeCodec extends TelnetCodec {
             return res;
         } else {
             // decode request.
-            Request req = new Request(id);
-            req.setVersion(Version.getProtocolVersion());
-            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
-            if ((flag & FLAG_EVENT) != 0) {
-                req.setEvent(true);
-            }
+            Request req;
             try {
                 Object data;
-                if (req.isEvent()) {
+                if ((flag & FLAG_EVENT) != 0) {
                     byte[] eventPayload = CodecSupport.getPayload(is);
                     if (CodecSupport.isHeartBeat(eventPayload, proto)) {
                         // heart beat response data is always null;
+                        req = new HeartBeatRequest(id);
+                        ((HeartBeatRequest) req).setProto(proto);
                         data = null;
                     } else {
+                        req = new Request(id);
                         data = decodeEventData(channel, 
CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto), eventPayload);
                     }
+                    req.setEvent(true);
                 } else {
+                    req = new Request(id);
                     data = decodeRequestData(channel, 
CodecSupport.deserialize(channel.getUrl(), is, proto));
                 }
                 req.setData(data);
             } catch (Throwable t) {
                 // bad request
+                req = new Request(id);
                 req.setBroken(true);
                 req.setData(t);
             }
+            req.setVersion(Version.getProtocolVersion());
+            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
             return req;
         }
     }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
index 38a10e34cf..f1bb03cbef 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
@@ -23,6 +23,8 @@ import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.exchange.HeartBeatRequest;
+import org.apache.dubbo.remoting.exchange.HeartBeatResponse;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate;
@@ -65,10 +67,12 @@ public class HeartbeatHandler extends 
AbstractChannelHandlerDelegate {
     public void received(Channel channel, Object message) throws 
RemotingException {
         setReadTimestamp(channel);
         if (isHeartbeatRequest(message)) {
-            Request req = (Request) message;
+            HeartBeatRequest req = (HeartBeatRequest) message;
             if (req.isTwoWay()) {
-                Response res = new Response(req.getId(), req.getVersion());
+                HeartBeatResponse res;
+                res = new HeartBeatResponse(req.getId(), req.getVersion());
                 res.setEvent(HEARTBEAT_EVENT);
+                res.setProto(req.getProto());
                 channel.send(res);
                 if (logger.isDebugEnabled()) {
                     int heartbeat = 
channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
@@ -105,7 +109,7 @@ public class HeartbeatHandler extends 
AbstractChannelHandlerDelegate {
     }
 
     private boolean isHeartbeatRequest(Object message) {
-        return message instanceof Request && ((Request) message).isHeartbeat();
+        return message instanceof HeartBeatRequest && ((Request) 
message).isHeartbeat();
     }
 
     private boolean isHeartbeatResponse(Object message) {
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
index 28b1453588..88a0f5622d 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboCodec.java
@@ -27,6 +27,8 @@ import org.apache.dubbo.common.serialize.Serialization;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.exchange.HeartBeatRequest;
+import org.apache.dubbo.remoting.exchange.HeartBeatResponse;
 import org.apache.dubbo.remoting.exchange.Request;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.exchange.codec.ExchangeCodec;
@@ -131,24 +133,24 @@ public class DubboCodec extends ExchangeCodec {
             return res;
         } else {
             // decode request.
-            Request req = new Request(id);
-            req.setVersion(Version.getProtocolVersion());
-            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
-            if ((flag & FLAG_EVENT) != 0) {
-                req.setEvent(true);
-            }
+            Request req;
             try {
                 Object data;
-                if (req.isEvent()) {
+                if ((flag & FLAG_EVENT) != 0) {
                     byte[] eventPayload = CodecSupport.getPayload(is);
                     if (CodecSupport.isHeartBeat(eventPayload, proto)) {
                         // heart beat response data is always null;
+                        req = new HeartBeatRequest(id);
+                        ((HeartBeatRequest) req).setProto(proto);
                         data = null;
                     } else {
+                        req = new Request(id);
                         ObjectInput in = 
CodecSupport.deserialize(channel.getUrl(), new 
ByteArrayInputStream(eventPayload), proto);
                         data = decodeEventData(channel, in, eventPayload);
                     }
+                    req.setEvent(true);
                 } else {
+                    req = new HeartBeatRequest(id);
                     DecodeableRpcInvocation inv;
                     if (isDecodeDataInIoThread(channel)) {
                         inv = new DecodeableRpcInvocation(frameworkModel, 
channel, req, is, proto);
@@ -165,9 +167,12 @@ public class DubboCodec extends ExchangeCodec {
                     log.warn(PROTOCOL_FAILED_DECODE, "", "", "Decode request 
failed: " + t.getMessage(), t);
                 }
                 // bad request
+                req = new HeartBeatRequest(id);
                 req.setBroken(true);
                 req.setData(t);
             }
+            req.setVersion(Version.getProtocolVersion());
+            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
 
             return req;
         }
@@ -267,6 +272,9 @@ public class DubboCodec extends ExchangeCodec {
 
     @Override
     protected Serialization getSerialization(Channel channel, Response res) {
+        if (res instanceof HeartBeatResponse) {
+            return CodecSupport.getSerializationById(((HeartBeatResponse) 
res).getProto());
+        }
         if (!(res.getResult() instanceof AppResponse)) {
             return super.getSerialization(channel, res);
         }

Reply via email to