viirya commented on code in PR #55620: URL: https://github.com/apache/spark/pull/55620#discussion_r3246746736
########## common/network-common/src/main/java/org/apache/spark/network/shuffle/streaming/DataMessage.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.streaming; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; + +public final class DataMessage extends StreamingShuffleMessage { + + public ByteBuf data; + public int shuffleWriterId; + public int shuffleReaderId; + public int dataSize; + public long checksum; + + public DataMessage(int shuffleWriterId, int shuffleReaderId, int dataSize, ByteBuf data, + long checksum) { + this.shuffleWriterId = shuffleWriterId; + this.shuffleReaderId = shuffleReaderId; + this.dataSize = dataSize; + this.data = data; + this.ownedBuf = data.retain(); + this.checksum = checksum; + } + + @Override + public StreamingShuffleMessageType messageType() { + return StreamingShuffleMessageType.DATA_MESSAGE_UNSAFE_ROW; + } + + @Override + public int headerLength() { + // 4 bytes EACH for shuffle writer ID, shuffle reader ID, data size + // 8 bytes for checksum + return super.headerLength() + 20; + } + + @Override + public void encode(CompositeByteBuf buf) { + super.encode(buf); + buf.writeInt(shuffleWriterId); + buf.writeInt(shuffleReaderId); + buf.writeInt(dataSize); + buf.writeLong(checksum); + + // Adding data as a component to buf transfers ownership of data to buf. However, + // this DataMessage still has a reference to data, so we need to retain it here. + buf.addComponent(true, data.retain()); Review Comment: L58 writes dataSize into header but L63 directly put all readable bytes of data into encoded message, not just dataSize bytes. The constructor also doesn't check if dataSize == data.readableBytes(). So if the caller passes dataSize < data.readableBytes(), it will send more bytes than the header claims. The receiver side also only checks dataSize <= message.readableBytes(), the trailing bytes would be accepted. It is too loose for a wire protocol that could easily cause framing/data leak/semantic inconsistency. I suggest the encode side either checks if dataSize == data.readableBytes() or only add data.retainedSlice(data.readerIndex(), dataSize). On decode side, if the frame only contains one DataMessage, it should also check if dataSize == message.readableBytes(). We should add related tests that dataSize is smaller/bigger than readable bytes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
