[ https://issues.apache.org/jira/browse/CASSANDRA-16808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Caleb Rackliffe updated CASSANDRA-16808: ---------------------------------------- Reviewers: Caleb Rackliffe > Pre-4.0 FWD_FRM message parameter serialization and message-id forwarding is > incorrect > -------------------------------------------------------------------------------------- > > Key: CASSANDRA-16808 > URL: https://issues.apache.org/jira/browse/CASSANDRA-16808 > Project: Cassandra > Issue Type: Bug > Components: Messaging/Internode > Reporter: Jon Meredith > Assignee: Jon Meredith > Priority: Normal > Fix For: 4.0-rc > > Time Spent: 10m > Remaining Estimate: 0h > > Fixing CASSANDRA-16797 has exposed an issue with the way {{FWD_FRM}} is > serialized. > In the code cleanup during the internode messaging refactor, the > serialization for {{FWD_FRM}} (the endpoint to respond to for forwarded > messages) was implemented using the same serialization format as > CompactEndpointSerializationHelper which prefixes the address bytes with > their length, however the FWD_FRM parameter value does not include a length > and just converts the parameter value to an InetAddress. > In a mixed version cluster this causes the pre-4.0 nodes to fail when > deserializing the mutation > {code:java} > java.lang.RuntimeException: java.net.UnknownHostException: addr is of illegal > length > at > org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:72) > ~[dtest-3.0.25.jar:na] > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > ~[na:na] > at > org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$FutureTask.run(AbstractLocalAwareExecutorService.java:162) > ~[dtest-3.0.25.jar:na] > at > org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$LocalSessionFutureTask.run(AbstractLocalAwareExecutorService.java:134) > ~[dtest-3.0.25.jar:na] > at org.apache.cassandra.concurrent.SEPWorker.run(SEPWorker.java:109) > ~[dtest-3.0.25.jar:na] > at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na] > Caused by: java.net.UnknownHostException: addr is of illegal length > at java.base/java.net.InetAddress.getByAddress(InetAddress.java:1208) > ~[na:na] > at java.base/java.net.InetAddress.getByAddress(InetAddress.java:1571) > ~[na:na] > at > org.apache.cassandra.db.MutationVerbHandler.doVerb(MutationVerbHandler.java:57) > ~[dtest-3.0.25.jar:na] > at > org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:67) > ~[dtest-3.0.25.jar:na] > ... 5 common frames omitted > {code} > Unfortunately there isn't a clean fix I can see as > {{org.apache.cassandra.io.IVersionedAsymmetricSerializer#deserialize}} used > to deserialize the FWD_FRM address does not take a maximum length to > deserialize and it's impossible to tell definitely know if it's an IPv4 or > IPv6 address from the first four bytes. > The patch I'm submitting special-cases the deserializing pre-4.0 {{FWD_FRM}} > parameters in the {{Message}} deserializer. That seems preferable to > extending the deserialization interface or creating a new {{DataInputBuffer}} > limited by the parameter value length. > Once that was fixed, the INSERT statements were still failing which I tracked > down to the 4.0 optimization of serializing the forwarded message once if the > message id is the same > > [https://github.com/apache/cassandra/blob/cassandra-4.0/src/java/org/apache/cassandra/db/MutationVerbHandler.java#L76] > In the test case I wrote, only one message was being forwarded and that had a > different id to the original forwarded message. The {{useSameMessageID}} > method only checked message Ids within the forwarded messages. > > Code Details: > When MutationVerbHandler.forwardToLocalNodes is constructing the forwarding > message it just stores the the byte array representing the IPv4 or IPv6 > address in the parameter array. > (link > [https://github.com/apache/cassandra/blob/44604b7316fcbfd7d0d7425e75cd7ebe267e3247/src/java/org/apache/cassandra/db/MutationVerbHandler.java#L90] > ) > {code:java} > private static void forwardToLocalNodes(Mutation mutation, > MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws > IOException > { > try (DataInputStream in = new DataInputStream(new > FastByteArrayInputStream(forwardBytes))) > { > int size = in.readInt(); > // tell the recipients who to send their ack to > MessageOut<Mutation> message = new MessageOut<>(verb, mutation, > Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress()); > {code} > When the message is serialized in 3.0 MessageOut.serialize, that raw entry of > bytes is written with the length > (link > [https://github.com/apache/cassandra/blob/44604b7316fcbfd7d0d7425e75cd7ebe267e3247/src/java/org/apache/cassandra/net/MessageOut.java#L119] > ) > {code:java} > public void serialize(DataOutputPlus out, int version) throws IOException > { > CompactEndpointSerializationHelper.serialize(from, out); > > out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(verb, > version).getId()); > out.writeInt(parameters.size()); > for (Map.Entry<String, byte[]> entry : parameters.entrySet()) > { > out.writeUTF(entry.getKey()); > out.writeInt(entry.getValue().length); > out.write(entry.getValue()); > } > .... > } > {code} > And we do the same on 4.0, however in 4.0 the parameter is serialized using > the ParamType enum > (link > [https://github.com/apache/cassandra/blob/fcd30b6e0db3622a8e78e9aa35221f630c77f6de/src/java/org/apache/cassandra/net/Message.java#L1154] > ) > {code:java} > for (int i = 0; i < count; i++) > { > ParamType type = version >= VERSION_40 > ? > ParamType.lookUpById(Ints.checkedCast(in.readUnsignedVInt())) > : ParamType.lookUpByAlias(in.readUTF()); > int length = version >= VERSION_40 > ? Ints.checkedCast(in.readUnsignedVInt()) > : in.readInt(); > if (null != type) > params.put(type, type.serializer.deserialize(in, > version)); > else > in.skipBytesFully(length); // forward compatibiliy with > minor version changes > } > {code} > (link > [https://github.com/apache/cassandra/blob/fcd30b6e0db3622a8e78e9aa35221f630c77f6de/src/java/org/apache/cassandra/net/ParamType.java#L45] > ) > {code:java} > public enum ParamType > { > FORWARD_TO (0, "FWD_TO", ForwardingInfo.serializer), > RESPOND_TO (1, "FWD_FRM", inetAddressAndPortSerializer), > ... > } > {code} > The {{InetAddressAndPortSerializer}} has been based on the 3.0 > {{CompactEndpointSerializationHelper}} encoding used in the message header, > however that format includes a single byte with the length of the address > when pre-4.0 nodes are just expecting the parameter value > to contain the raw address bytes. > (link > [https://github.com/apache/cassandra/blob/fcd30b6e0db3622a8e78e9aa35221f630c77f6de/src/java/org/apache/cassandra/locator/InetAddressAndPort.java#L308] > ) > {code:java} > if (version >= MessagingService.VERSION_40) > { > out.writeByte(buf.length + 2); > out.write(buf); > out.writeShort(endpoint.port); > } > else > { > out.writeByte(buf.length); //// Surprise! Bonus byte! > out.write(buf); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org