pnowojski commented on a change in pull request #13501:
URL: https://github.com/apache/flink/pull/13501#discussion_r497371743



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
##########
@@ -86,11 +94,34 @@ public int append(ByteBuffer source) {
                int available = getMaxCapacity() - positionMarker.getCached();
                int toCopy = Math.min(needed, available);
 
+               // Each Data BufferBuilder starts with a 4-byte integer header
+               // Since length can only be 0 or positive numbers, the first 
bit of the integer
+               // is used to identify whether the first record is partial (1) 
or not (0)
+               // The remaining 31 bits stands for the length of the record 
remaining.
+               // The data written is not made visible to reader until {@link 
#commit()}, so the BufferBuilder
+               // ends either with a complete record or full buffer after 
append();
+               if (isEmptyBufferBuilder()) {
+                       available = available - BUFFER_BUILDER_HEADER_SIZE;
+                       toCopy = Math.min(needed, available);
+                       int header = 
Header.headerEncode(isPartialRecord(source), toCopy);
+                       
memorySegment.putIntBigEndian(positionMarker.getCached(), header);
+                       positionMarker.move(BUFFER_BUILDER_HEADER_SIZE);
+               }
+

Review comment:
       1. first 4 bytes of `ByteBuffer source` already contain length. Why do 
we need to encode the length for the second time?
   
   2. I think it would be easier/cleaner/more reliable to detect the partial 
record on the higher layer. Maybe somewhere around 
`BufferWritingResultPartition#emitRecord/broadcastRecord`? After all if it ever 
needs to de a second iteration in 
   ```
                do {
                        final BufferBuilder bufferBuilder = 
getSubpartitionBufferBuilder(targetSubpartition);
                        bufferBuilder.appendAndCommit(record);
   
                        if (bufferBuilder.isFull()) {
                                
finishSubpartitionBufferBuilder(targetSubpartition);
                        }
                } while (record.hasRemaining());
   ```
   it directly means "hey! that's partial record".
   
   It would be cleaner, as on this layer we do not know anything about the 
content of the buffers, or how are they being used. Thus `isPartialRecord` here 
is a bit strange concept.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartition.java
##########
@@ -0,0 +1,138 @@
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class PipelinedApproximateSubpartition extends PipelinedSubpartition {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedApproximateSubpartition.class);
+
+       private boolean isPartialBuffer = false;
+
+       PipelinedApproximateSubpartition(int index, ResultPartition parent) {
+               super(index, parent);
+       }
+
+       public void releaseView() {
+               readView = null;
+               isPartialBuffer = true;
+               isBlockedByCheckpoint = false;
+               sequenceNumber = 0;
+       }
+
+       @Override
+       public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) {
+               synchronized (buffers) {
+                       checkState(!isReleased);
+
+                       // if the view is not released yet
+                       if (readView != null) {
+                               releaseView();
+                       }
+
+                       LOG.debug("{}: Creating read view for subpartition {} 
of partition {}.",
+                               parent.getOwningTaskName(), 
getSubPartitionIndex(), parent.getPartitionId());
+
+                       readView = new 
PipelinedApproximateSubpartitionView(this, availabilityListener);
+               }
+
+               return readView;
+       }
+
+       @Nullable
+       @Override
+       BufferAndBacklog pollBuffer() {

Review comment:
       I would proceed with caution, but if it can be done cleanly, it would 
definitely be worth to deduplicate this code.
   
   As it is, it's hard for me to even review it and see the difference.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
##########
@@ -96,15 +98,43 @@ public boolean isFinished() {
                return writerPosition.isFinished();
        }
 
+       public boolean startOfDataBuffer() {
+               return buffer.getDataType() == DATA_BUFFER && 
currentReaderPosition == 0;
+       }
+
        /**
+        * BufferConsumer skips the buffer header before building buffer.
         * @return sliced {@link Buffer} containing the not yet consumed data. 
Returned {@link Buffer} shares the reference
         * counter with the parent {@link BufferConsumer} - in order to recycle 
memory both of them must be recycled/closed.
         */
        public Buffer build() {
                writerPosition.update();
                int cachedWriterPosition = writerPosition.getCached();
-               Buffer slice = buffer.readOnlySlice(currentReaderPosition, 
cachedWriterPosition - currentReaderPosition);
+
+               Buffer slice = null;
+
+               // data buffer && starting from the beginning of the buffer
+               if (startOfDataBuffer()) {
+                       // either do not have any data, or at least have 4 
bytes (header + data)
+                       checkState(
+                               (cachedWriterPosition - currentReaderPosition > 
BUFFER_BUILDER_HEADER_SIZE)
+                                       || (currentReaderPosition == 
cachedWriterPosition)
+                       );
+
+                       // remove the header
+                       if (cachedWriterPosition - currentReaderPosition > 
BUFFER_BUILDER_HEADER_SIZE) {
+                               slice = buffer.readOnlySlice(
+                                       currentReaderPosition + 
BUFFER_BUILDER_HEADER_SIZE,
+                                       cachedWriterPosition - 
currentReaderPosition - BUFFER_BUILDER_HEADER_SIZE);
+                       }
+               }

Review comment:
       Wouldn't the same as my previous commit apply here as well? Handling the 
partial record length might be more meaningful in 
`SpillingAdaptiveSpanningRecordDeserializer#setNextBuffer`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedApproximateSubpartitionView.java
##########
@@ -0,0 +1,27 @@
+package org.apache.flink.runtime.io.network.partition;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class PipelinedApproximateSubpartitionView extends 
PipelinedSubpartitionView {
+
+       PipelinedApproximateSubpartitionView(PipelinedApproximateSubpartition 
parent, BufferAvailabilityListener listener) {
+               super(parent, listener);
+       }
+
+       @Override
+       public void releaseAllResources() {
+               if (isReleased.compareAndSet(false, true)) {
+                       // The view doesn't hold any resources and the parent 
cannot be restarted. Therefore,
+                       // it's OK to notify about consumption as well.
+                       checkState(parent instanceof 
PipelinedApproximateSubpartition);
+                       ((PipelinedApproximateSubpartition) 
parent).releaseView();
+               }
+       }

Review comment:
       Why is this method behaving differently compared to 
`PipelinedSubpartitionView#releaseAllResources`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to