This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d0b133814838e3614a41e70b5e6a27b80afa0967 Author: Weijie Guo <res...@163.com> AuthorDate: Thu Jul 28 20:14:52 2022 +0800 [FLINK-27908] Introduce HsSubpartitionView based on HsDataView and it's implementations. --- .../io/network/partition/hybrid/HsDataView.java | 60 ++++ .../partition/hybrid/HsFileDataManager.java | 16 +- .../partition/hybrid/HsMemoryDataManager.java | 23 +- .../partition/hybrid/HsSubpartitionFileReader.java | 6 +- .../hybrid/HsSubpartitionFileReaderImpl.java | 86 +++++- .../hybrid/HsSubpartitionMemoryDataManager.java | 29 +- .../partition/hybrid/HsSubpartitionView.java | 262 +++++++++++++++++ .../partition/hybrid/HsFileDataManagerTest.java | 27 +- .../hybrid/HsSubpartitionFileReaderImplTest.java | 88 +++++- .../HsSubpartitionMemoryDataManagerTest.java | 20 +- .../partition/hybrid/HsSubpartitionViewTest.java | 313 +++++++++++++++++++++ .../partition/hybrid/TestingHsDataView.java | 127 +++++++++ 12 files changed, 1011 insertions(+), 46 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java new file mode 100644 index 00000000000..c3d4cb9ed83 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.buffer.Buffer.DataType; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; + +import java.util.Optional; + +/** + * A view for {@link HsSubpartitionView} to find out what data exists in memory or disk and polling + * the data. + */ +public interface HsDataView { + + /** + * Try to consume next buffer. + * + * <p>Only invoked by consumer thread. + * + * @param nextBufferToConsume next buffer index to consume. + * @return If the target buffer does exist, return buffer and next buffer's backlog, otherwise + * return {@link Optional#empty()}. + */ + Optional<BufferAndBacklog> consumeBuffer(int nextBufferToConsume) throws Throwable; + + /** + * Get dataType of next buffer to consume. + * + * @param nextBufferToConsume next buffer index to consume + * @return next buffer's dataType. If not found in memory, return {@link DataType#NONE}. + */ + DataType peekNextToConsumeDataType(int nextBufferToConsume); + + /** + * Get the number of buffers backlog. + * + * @return backlog of this view's corresponding subpartition. + */ + int getBacklog(); + + /** Release this {@link HsDataView} when related subpartition view is releasing. */ + void releaseDataView(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java index f37a1ff5164..6070a838110 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java @@ -147,7 +147,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler { } /** This method only called by result partition to create subpartitionFileReader. */ - public HsSubpartitionFileReader registerNewSubpartition( + public HsDataView registerNewSubpartition( int subpartitionId, HsSubpartitionViewInternalOperations operation) throws IOException { synchronized (lock) { checkState(!isReleased, "HsFileDataManager is already released."); @@ -159,7 +159,8 @@ public class HsFileDataManager implements Runnable, BufferRecycler { dataFileChannel, operation, dataIndex, - hybridShuffleConfiguration.getMaxBuffersReadAhead()); + hybridShuffleConfiguration.getMaxBuffersReadAhead(), + this::releaseSubpartitionReader); allReaders.add(subpartitionReader); @@ -172,6 +173,17 @@ public class HsFileDataManager implements Runnable, BufferRecycler { IOUtils.deleteFileQuietly(dataFilePath); } + /** + * Release specific {@link HsSubpartitionFileReader} from {@link HsFileDataManager}. + * + * @param subpartitionFileReader to release. + */ + public void releaseSubpartitionReader(HsSubpartitionFileReader subpartitionFileReader) { + synchronized (lock) { + removeSubpartitionReaders(Collections.singleton(subpartitionFileReader)); + } + } + /** Releases this file data manager and delete shuffle data after all readers is removed. */ public void release() { synchronized (lock) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java index 9bf57b51c4b..11228fe7206 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java @@ -121,7 +121,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData * #subpartitionViewOperationsMap}. It is used to obtain the consumption progress of the * subpartition. */ - public void registerSubpartitionView( + public HsDataView registerSubpartitionView( int subpartitionId, HsSubpartitionViewInternalOperations viewOperations) { HsSubpartitionViewInternalOperations oldView = subpartitionViewOperationsMap.put(subpartitionId, viewOperations); @@ -130,6 +130,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData "subpartition : {} register subpartition view will replace old view. ", subpartitionId); } + return getSubpartitionMemoryDataManager(subpartitionId); } /** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */ @@ -318,24 +319,4 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData lock.unlock(); } } - - /** Integrate the buffer and dataType of next buffer. */ - public static class BufferAndNextDataType { - private final Buffer buffer; - - private final Buffer.DataType nextDataType; - - public BufferAndNextDataType(Buffer buffer, Buffer.DataType nextDataType) { - this.buffer = buffer; - this.nextDataType = nextDataType; - } - - public Buffer getBuffer() { - return buffer; - } - - public Buffer.DataType getNextDataType() { - return nextDataType; - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java index e582be2a620..fbc044d6da0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import java.io.IOException; import java.nio.channels.FileChannel; import java.util.Queue; +import java.util.function.Consumer; /** * This component is responsible for reading data from disk for a specific subpartition. @@ -31,7 +32,7 @@ import java.util.Queue; * <p>In order to access the disk as sequentially as possible {@link HsSubpartitionFileReader} need * to be able to compare priorities. */ -public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileReader> { +public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileReader>, HsDataView { /** Do prep work before this {@link HsSubpartitionFileReader} is scheduled to read data. */ void prepareForScheduling(); @@ -58,6 +59,7 @@ public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileR FileChannel dataFileChannel, HsSubpartitionViewInternalOperations operation, HsFileDataIndex dataIndex, - int maxBuffersReadAhead); + int maxBuffersReadAhead, + Consumer<HsSubpartitionFileReader> fileReaderReleaser); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java index ecf02260dd1..a355765ac35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java @@ -23,6 +23,8 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition; +import org.apache.flink.util.ExceptionUtils; import javax.annotation.Nullable; @@ -34,6 +36,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.Consumer; import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer; import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel; @@ -62,6 +65,8 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { private final Deque<BufferIndexOrError> loadedBuffers = new LinkedBlockingDeque<>(); + private final Consumer<HsSubpartitionFileReader> fileReaderReleaser; + private volatile boolean isFailed; public HsSubpartitionFileReaderImpl( @@ -69,12 +74,14 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { FileChannel dataFileChannel, HsSubpartitionViewInternalOperations operations, HsFileDataIndex dataIndex, - int maxBufferReadAhead) { + int maxBufferReadAhead, + Consumer<HsSubpartitionFileReader> fileReaderReleaser) { this.subpartitionId = subpartitionId; this.dataFileChannel = dataFileChannel; this.operations = operations; this.bufferIndexManager = new BufferIndexManager(maxBufferReadAhead); this.cachedRegionManager = new CachedRegionManager(subpartitionId, dataIndex); + this.fileReaderReleaser = fileReaderReleaser; } @Override @@ -192,10 +199,77 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { return loadedBuffers; } + @Override + public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int nextBufferToConsume) + throws Throwable { + if (!checkAndGetFirstBufferIndexOrError(nextBufferToConsume).isPresent()) { + return Optional.empty(); + } + + // already ensure that peek element is not null and not throwable. + BufferIndexOrError current = checkNotNull(loadedBuffers.poll()); + + BufferIndexOrError next = loadedBuffers.peek(); + + Buffer.DataType nextDataType = next == null ? Buffer.DataType.NONE : next.getDataType(); + int backlog = loadedBuffers.size(); + int bufferIndex = current.getIndex(); + Buffer buffer = + current.getBuffer() + .orElseThrow( + () -> + new NullPointerException( + "Get a non-throwable and non-buffer bufferIndexOrError, which is not allowed")); + return Optional.of( + ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead( + buffer, nextDataType, backlog, bufferIndex)); + } + + @Override + public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) { + Buffer.DataType dataType = Buffer.DataType.NONE; + try { + dataType = + checkAndGetFirstBufferIndexOrError(nextBufferToConsume) + .map(BufferIndexOrError::getDataType) + .orElse(Buffer.DataType.NONE); + } catch (Throwable throwable) { + ExceptionUtils.rethrow(throwable); + } + return dataType; + } + + @Override + public void releaseDataView() { + fileReaderReleaser.accept(this); + } + + @Override + public int getBacklog() { + return loadedBuffers.size(); + } + // ------------------------------------------------------------------------ // Internal Methods // ------------------------------------------------------------------------ + private Optional<BufferIndexOrError> checkAndGetFirstBufferIndexOrError(int expectedBufferIndex) + throws Throwable { + if (loadedBuffers.isEmpty()) { + return Optional.empty(); + } + + BufferIndexOrError peek = loadedBuffers.peek(); + + if (peek.getThrowable().isPresent()) { + throw peek.getThrowable().get(); + } else if (peek.getIndex() != expectedBufferIndex) { + return Optional.empty(); + } + + return Optional.of(peek); + } + private void moveFileOffsetToBuffer(int bufferIndex) throws IOException { Tuple2<Integer, Long> indexAndOffset = cachedRegionManager.getNumSkipAndFileOffset(bufferIndex); @@ -403,9 +477,15 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { FileChannel dataFileChannel, HsSubpartitionViewInternalOperations operation, HsFileDataIndex dataIndex, - int maxBuffersReadAhead) { + int maxBuffersReadAhead, + Consumer<HsSubpartitionFileReader> fileReaderReleaser) { return new HsSubpartitionFileReaderImpl( - subpartitionId, dataFileChannel, operation, dataIndex, maxBuffersReadAhead); + subpartitionId, + dataFileChannel, + operation, + dataIndex, + maxBuffersReadAhead, + fileReaderReleaser); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java index b5079e7df21..17b7e833e29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus; import org.apache.flink.util.function.SupplierWithException; @@ -54,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * This class is responsible for managing the data in a single subpartition. One {@link * HsMemoryDataManager} will hold multiple {@link HsSubpartitionMemoryDataManager}. */ -public class HsSubpartitionMemoryDataManager { +public class HsSubpartitionMemoryDataManager implements HsDataView { private final int targetChannel; private final int bufferSize; @@ -108,22 +109,24 @@ public class HsSubpartitionMemoryDataManager { @SuppressWarnings("FieldAccessNotGuarded") // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and // subpartitionLock. + @Override public DataType peekNextToConsumeDataType(int nextToConsumeIndex) { return callWithLock(() -> peekNextToConsumeDataTypeInternal(nextToConsumeIndex)); } /** * Check whether the head of {@link #unConsumedBuffers} is the buffer to be consumed. If so, - * return the buffer and next data type. + * return the buffer and backlog. * * @param toConsumeIndex index of buffer to be consumed. * @return If the head of {@link #unConsumedBuffers} is target, return optional of the buffer - * and next data type. Otherwise, return {@link Optional#empty()}. + * and backlog. Otherwise, return {@link Optional#empty()}. */ @SuppressWarnings("FieldAccessNotGuarded") // Note that: callWithLock ensure that code block guarded by resultPartitionReadLock and // subpartitionLock. - public Optional<HsMemoryDataManager.BufferAndNextDataType> consumeBuffer(int toConsumeIndex) { + @Override + public Optional<BufferAndBacklog> consumeBuffer(int toConsumeIndex) { Optional<Tuple2<HsBufferContext, DataType>> bufferAndNextDataType = callWithLock( () -> { @@ -145,8 +148,22 @@ public class HsSubpartitionMemoryDataManager { tuple.f0.getBufferIndexAndChannel())); return bufferAndNextDataType.map( tuple -> - new HsMemoryDataManager.BufferAndNextDataType( - tuple.f0.getBuffer(), tuple.f1)); + new BufferAndBacklog( + tuple.f0.getBuffer(), getBacklog(), tuple.f1, toConsumeIndex)); + } + + @SuppressWarnings("FieldAccessNotGuarded") + // Un-synchronized get unConsumedBuffers size to provide memory data backlog,this will make the + // result greater than or equal to the actual backlog, but obtaining an accurate backlog will + // bring too much extra overhead. + @Override + public int getBacklog() { + return unConsumedBuffers.size(); + } + + @Override + public void releaseDataView() { + // nothing to do for memory data. } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java new file mode 100644 index 00000000000..5aa7e8bac0b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; +import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** The read view of HsResultPartition, data can be read from memory or disk. */ +public class HsSubpartitionView + implements ResultSubpartitionView, HsSubpartitionViewInternalOperations { + private final BufferAvailabilityListener availabilityListener; + private final Object lock = new Object(); + + /** Index of last consumed buffer. */ + @GuardedBy("lock") + private int lastConsumedBufferIndex = -1; + + @GuardedBy("lock") + private boolean needNotify = false; + + @Nullable + @GuardedBy("lock") + private Buffer.DataType cachedNextDataType = null; + + @Nullable + @GuardedBy("lock") + private Throwable failureCause = null; + + @GuardedBy("lock") + private boolean isReleased = false; + + @Nullable + @GuardedBy("lock") + // diskDataView can be null only before initialization. + private HsDataView diskDataView; + + @Nullable + @GuardedBy("lock") + // memoryDataView can be null only before initialization. + private HsDataView memoryDataView; + + public HsSubpartitionView(BufferAvailabilityListener availabilityListener) { + this.availabilityListener = availabilityListener; + } + + @Nullable + @Override + public BufferAndBacklog getNextBuffer() { + synchronized (lock) { + try { + checkNotNull(diskDataView, "disk data view must be not null."); + checkNotNull(memoryDataView, "memory data view must be not null."); + + Optional<BufferAndBacklog> bufferToConsume = tryReadFromDisk(); + if (!bufferToConsume.isPresent()) { + bufferToConsume = memoryDataView.consumeBuffer(lastConsumedBufferIndex + 1); + } + updateConsumingStatus(bufferToConsume); + return bufferToConsume.orElse(null); + } catch (Throwable cause) { + releaseInternal(cause); + return null; + } + } + } + + @Override + public void notifyDataAvailable() { + boolean notifyDownStream = false; + synchronized (lock) { + if (isReleased) { + return; + } + if (needNotify) { + notifyDownStream = true; + needNotify = false; + } + } + // notify outside of lock to avoid deadlock + if (notifyDownStream) { + availabilityListener.notifyDataAvailable(); + } + } + + @Override + public AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable) { + synchronized (lock) { + boolean availability = numCreditsAvailable > 0; + if (numCreditsAvailable <= 0 + && cachedNextDataType != null + && cachedNextDataType == Buffer.DataType.EVENT_BUFFER) { + availability = true; + } + return new AvailabilityWithBacklog(availability, diskDataView.getBacklog()); + } + } + + @Override + public void releaseAllResources() throws IOException { + releaseInternal(null); + } + + @Override + public boolean isReleased() { + synchronized (lock) { + return isReleased; + } + } + + @Override + public int getConsumingOffset() { + synchronized (lock) { + return lastConsumedBufferIndex; + } + } + + @Override + public Throwable getFailureCause() { + synchronized (lock) { + return failureCause; + } + } + + /** + * Set {@link HsDataView} for this subpartition, this method only called when {@link + * HsSubpartitionFileReader} is creating. + */ + void setDiskDataView(HsDataView diskDataView) { + synchronized (lock) { + checkState(this.diskDataView == null, "repeatedly set disk data view is not allowed."); + this.diskDataView = diskDataView; + } + } + + /** + * Set {@link HsDataView} for this subpartition, this method only called when {@link + * HsSubpartitionFileReader} is creating. + */ + void setMemoryDataView(HsDataView memoryDataView) { + synchronized (lock) { + checkState( + this.memoryDataView == null, "repeatedly set memory data view is not allowed."); + this.memoryDataView = memoryDataView; + } + } + + @Override + public void resumeConsumption() { + throw new UnsupportedOperationException("resumeConsumption should never be called."); + } + + @Override + public void acknowledgeAllDataProcessed() { + // in case of bounded partitions there is no upstream to acknowledge, we simply ignore + // the ack, as there are no checkpoints + } + + @SuppressWarnings("FieldAccessNotGuarded") + @Override + public int unsynchronizedGetNumberOfQueuedBuffers() { + return diskDataView.getBacklog(); + } + + @SuppressWarnings("FieldAccessNotGuarded") + @Override + public int getNumberOfQueuedBuffers() { + return diskDataView.getBacklog(); + } + + @Override + public void notifyNewBufferSize(int newBufferSize) { + throw new UnsupportedOperationException("Method should never be called."); + } + + // ------------------------------- + // Internal Methods + // ------------------------------- + + @GuardedBy("lock") + private Optional<BufferAndBacklog> tryReadFromDisk() throws Throwable { + final int nextBufferIndexToConsume = lastConsumedBufferIndex + 1; + return checkNotNull(diskDataView) + .consumeBuffer(nextBufferIndexToConsume) + .map( + bufferAndBacklog -> { + if (bufferAndBacklog.getNextDataType() == Buffer.DataType.NONE) { + return new BufferAndBacklog( + bufferAndBacklog.buffer(), + bufferAndBacklog.buffersInBacklog(), + checkNotNull(memoryDataView) + .peekNextToConsumeDataType( + nextBufferIndexToConsume + 1), + bufferAndBacklog.getSequenceNumber()); + } + return bufferAndBacklog; + }); + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + @GuardedBy("lock") + private void updateConsumingStatus(Optional<BufferAndBacklog> bufferAndBacklog) { + assert Thread.holdsLock(lock); + // if consumed, update and check consume offset + if (bufferAndBacklog.isPresent()) { + ++lastConsumedBufferIndex; + checkState(bufferAndBacklog.get().getSequenceNumber() == lastConsumedBufferIndex); + } + + // update need-notify + boolean dataAvailable = + bufferAndBacklog.map(BufferAndBacklog::isDataAvailable).orElse(false); + needNotify = !dataAvailable; + // update cached next data type + cachedNextDataType = bufferAndBacklog.map(BufferAndBacklog::getNextDataType).orElse(null); + } + + private void releaseInternal(@Nullable Throwable throwable) { + boolean releaseSubpartitionReader = false; + synchronized (lock) { + if (isReleased) { + return; + } + isReleased = true; + failureCause = throwable; + if (diskDataView != null) { + releaseSubpartitionReader = true; + } + } + // release subpartition reader outside of lock to avoid deadlock. + if (releaseSubpartitionReader) { + //noinspection FieldAccessNotGuarded + diskDataView.releaseDataView(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java index 1a39e14ca02..1a2e28959a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java @@ -21,7 +21,9 @@ package org.apache.flink.runtime.io.network.partition.hybrid; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.util.function.BiConsumerWithException; @@ -40,6 +42,7 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.time.Duration; import java.util.ArrayDeque; +import java.util.Optional; import java.util.Queue; import java.util.Random; import java.util.concurrent.CompletableFuture; @@ -396,6 +399,27 @@ class HsFileDataManagerTest { this.failConsumer = failConsumer; } + @Override + public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer( + int nextBufferToConsume) { + return Optional.empty(); + } + + @Override + public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) { + return Buffer.DataType.NONE; + } + + @Override + public int getBacklog() { + return 0; + } + + @Override + public void releaseDataView() { + // do nothing. + } + /** Factory for {@link TestingHsSubpartitionFileReader}. */ private static class Factory implements HsSubpartitionFileReader.Factory { private final Queue<HsSubpartitionFileReader> allReaders = new ArrayDeque<>(); @@ -406,7 +430,8 @@ class HsFileDataManagerTest { FileChannel dataFileChannel, HsSubpartitionViewInternalOperations operation, HsFileDataIndex dataIndex, - int maxBuffersReadAhead) { + int maxBuffersReadAhead, + Consumer<HsSubpartitionFileReader> fileReaderReleaser) { return checkNotNull(allReaders.poll()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java index 36a0833163d..d25a5a34fb3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java @@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.Buffer.DataType; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; @@ -360,6 +361,86 @@ class HsSubpartitionFileReaderImplTest { assertThat(fileReader1).isGreaterThan(fileReader2); } + @Test + void testConsumeBuffer() throws Throwable { + TestingSubpartitionViewInternalOperation viewNotifier = + new TestingSubpartitionViewInternalOperation(); + HsSubpartitionFileReaderImpl subpartitionFileReader = + createSubpartitionFileReader(0, viewNotifier); + + // if no preload data in file reader, return Optional.empty. + assertThat(subpartitionFileReader.consumeBuffer(0)).isNotPresent(); + + // buffers in file: (0-0, 0-1) + writeDataToFile(0, 0, 0, 2); + + Queue<MemorySegment> memorySegments = createsMemorySegments(2); + // trigger reading, add buffer to queue. + subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {}); + + // if nextBufferToConsume is not equal to peek elements index, return Optional.empty. + assertThat(subpartitionFileReader.consumeBuffer(10)).isNotPresent(); + + assertThat(subpartitionFileReader.consumeBuffer(0)) + .hasValueSatisfying( + (bufferAndBacklog -> { + assertThat(bufferAndBacklog.getNextDataType()) + .isEqualTo(DataType.EVENT_BUFFER); + assertThat(bufferAndBacklog.getSequenceNumber()).isEqualTo(0); + // first buffer's data is 0. + assertThat( + bufferAndBacklog + .buffer() + .getNioBufferReadable() + .order(ByteOrder.nativeOrder()) + .getInt()) + .isEqualTo(0); + })); + } + + @Test + void testPeekNextToConsumeDataTypeOrConsumeBufferThrowException() { + TestingSubpartitionViewInternalOperation viewNotifier = + new TestingSubpartitionViewInternalOperation(); + HsSubpartitionFileReaderImpl subpartitionFileReader = + createSubpartitionFileReader(0, viewNotifier); + + subpartitionFileReader.fail(new RuntimeException("expected exception.")); + + assertThatThrownBy(() -> subpartitionFileReader.peekNextToConsumeDataType(0)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("expected exception."); + + assertThatThrownBy(() -> subpartitionFileReader.consumeBuffer(0)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("expected exception."); + } + + @Test + void testPeekNextToConsumeDataType() throws Throwable { + TestingSubpartitionViewInternalOperation viewNotifier = + new TestingSubpartitionViewInternalOperation(); + HsSubpartitionFileReaderImpl subpartitionFileReader = + createSubpartitionFileReader(0, viewNotifier); + + // if no preload data in file reader, return DataType.NONE. + assertThat(subpartitionFileReader.peekNextToConsumeDataType(0)).isEqualTo(DataType.NONE); + + // buffers in file: (0-0, 0-1) + writeDataToFile(0, 0, 2); + + Queue<MemorySegment> memorySegments = createsMemorySegments(2); + // trigger reading, add buffer to queue. + subpartitionFileReader.readBuffers(memorySegments, (ignore) -> {}); + + // if nextBufferToConsume is not equal to peek elements index, return DataType.NONE. + assertThat(subpartitionFileReader.peekNextToConsumeDataType(10)).isEqualTo(DataType.NONE); + + // if nextBufferToConsume is equal to peek elements index, return the real DataType. + assertThat(subpartitionFileReader.peekNextToConsumeDataType(0)) + .isEqualTo(DataType.DATA_BUFFER); + } + private static void checkData(HsSubpartitionFileReaderImpl fileReader, int... expectedData) { assertThat(fileReader.getLoadedBuffers()).hasSameSizeAs(expectedData); for (int data : expectedData) { @@ -383,7 +464,12 @@ class HsSubpartitionFileReaderImplTest { private HsSubpartitionFileReaderImpl createSubpartitionFileReader( int targetChannel, HsSubpartitionViewInternalOperations operations) { return new HsSubpartitionFileReaderImpl( - targetChannel, dataFileChannel, operations, diskIndex, MAX_BUFFERS_READ_AHEAD); + targetChannel, + dataFileChannel, + operations, + diskIndex, + MAX_BUFFERS_READ_AHEAD, + (ignore) -> {}); } private static FileChannel openFileChannel(Path path) throws IOException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java index a15dfddc9f2..f1b16d0d231 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManagerTest.java @@ -24,7 +24,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.Buffer.DataType; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.partition.hybrid.HsMemoryDataManager.BufferAndNextDataType; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.ConsumeStatus; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider.SpillStatus; @@ -372,14 +372,14 @@ class HsSubpartitionMemoryDataManagerTest { private static void checkConsumedBufferAndNextDataType( List<Tuple2<Integer, Buffer.DataType>> expectedRecords, - List<Optional<BufferAndNextDataType>> bufferAndNextDataTypesOpt) { - checkArgument(expectedRecords.size() == bufferAndNextDataTypesOpt.size()); - for (int i = 0; i < bufferAndNextDataTypesOpt.size(); i++) { + List<Optional<BufferAndBacklog>> bufferAndBacklogOpt) { + checkArgument(expectedRecords.size() == bufferAndBacklogOpt.size()); + for (int i = 0; i < bufferAndBacklogOpt.size(); i++) { final int index = i; - assertThat(bufferAndNextDataTypesOpt.get(index)) + assertThat(bufferAndBacklogOpt.get(index)) .hasValueSatisfying( - (bufferAndNextDataType -> { - Buffer buffer = bufferAndNextDataType.getBuffer(); + (bufferAndBacklog -> { + Buffer buffer = bufferAndBacklog.buffer(); int value = buffer.getNioBufferReadable() .order(ByteOrder.LITTLE_ENDIAN) @@ -387,11 +387,11 @@ class HsSubpartitionMemoryDataManagerTest { Buffer.DataType dataType = buffer.getDataType(); assertThat(value).isEqualTo(expectedRecords.get(index).f0); assertThat(dataType).isEqualTo(expectedRecords.get(index).f1); - if (index != bufferAndNextDataTypesOpt.size() - 1) { - assertThat(bufferAndNextDataType.getNextDataType()) + if (index != bufferAndBacklogOpt.size() - 1) { + assertThat(bufferAndBacklog.getNextDataType()) .isEqualTo(expectedRecords.get(index + 1).f1); } else { - assertThat(bufferAndNextDataType.getNextDataType()) + assertThat(bufferAndBacklog.getNextDataType()) .isEqualTo(Buffer.DataType.NONE); } })); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java new file mode 100644 index 00000000000..77a797a80f1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.Buffer.DataType; +import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; +import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView.AvailabilityWithBacklog; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link HsSubpartitionView}. */ +class HsSubpartitionViewTest { + @Test + void testGetNextBufferFromDisk() { + HsSubpartitionView subpartitionView = createSubpartitionView(); + + BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0); + CompletableFuture<Void> consumeBufferFromMemoryFuture = new CompletableFuture<>(); + TestingHsDataView diskDataView = + TestingHsDataView.builder() + .setConsumeBufferFunction( + (bufferToConsume) -> Optional.of(bufferAndBacklog)) + .build(); + TestingHsDataView memoryDataView = + TestingHsDataView.builder() + .setConsumeBufferFunction( + (ignore) -> { + consumeBufferFromMemoryFuture.complete(null); + return Optional.empty(); + }) + .build(); + subpartitionView.setDiskDataView(diskDataView); + subpartitionView.setMemoryDataView(memoryDataView); + + BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer(); + assertThat(consumeBufferFromMemoryFuture).isNotCompleted(); + assertThat(nextBuffer).isSameAs(bufferAndBacklog); + } + + @Test + void testGetNextBufferFromDiskNextDataTypeIsNone() { + HsSubpartitionView subpartitionView = createSubpartitionView(); + BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.NONE, 0); + + TestingHsDataView diskDataView = + TestingHsDataView.builder() + .setConsumeBufferFunction( + (bufferToConsume) -> Optional.of(bufferAndBacklog)) + .build(); + + TestingHsDataView memoryDataView = + TestingHsDataView.builder() + .setPeekNextToConsumeDataTypeFunction( + (bufferToConsume) -> { + assertThat(bufferToConsume).isEqualTo(1); + return DataType.EVENT_BUFFER; + }) + .build(); + subpartitionView.setDiskDataView(diskDataView); + subpartitionView.setMemoryDataView(memoryDataView); + + BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer(); + assertThat(nextBuffer).isNotNull(); + assertThat(nextBuffer.buffer()).isSameAs(bufferAndBacklog.buffer()); + assertThat(nextBuffer.buffersInBacklog()).isEqualTo(bufferAndBacklog.buffersInBacklog()); + assertThat(nextBuffer.getSequenceNumber()).isEqualTo(bufferAndBacklog.getSequenceNumber()); + assertThat(nextBuffer.getNextDataType()).isEqualTo(DataType.EVENT_BUFFER); + } + + @Test + void testGetNextBufferFromMemory() { + HsSubpartitionView subpartitionView = createSubpartitionView(); + + BufferAndBacklog bufferAndBacklog = createBufferAndBacklog(0, DataType.DATA_BUFFER, 0); + TestingHsDataView memoryDataView = + TestingHsDataView.builder() + .setConsumeBufferFunction( + (bufferToConsume) -> Optional.of(bufferAndBacklog)) + .build(); + TestingHsDataView diskDataView = + TestingHsDataView.builder() + .setConsumeBufferFunction((bufferToConsume) -> Optional.empty()) + .build(); + subpartitionView.setDiskDataView(diskDataView); + subpartitionView.setMemoryDataView(memoryDataView); + + BufferAndBacklog nextBuffer = subpartitionView.getNextBuffer(); + assertThat(nextBuffer).isSameAs(bufferAndBacklog); + } + + @Test + void testGetNextBufferThrowException() { + HsSubpartitionView subpartitionView = createSubpartitionView(); + + TestingHsDataView diskDataView = + TestingHsDataView.builder() + .setConsumeBufferFunction( + (nextToConsume) -> { + throw new RuntimeException("expected exception."); + }) + .build(); + subpartitionView.setDiskDataView(diskDataView); + subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP); + + subpartitionView.getNextBuffer(); + assertThat(subpartitionView.getFailureCause()) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("expected exception."); + assertThat(subpartitionView.isReleased()).isTrue(); + } + + @Test + void testNotifyDataAvailableNeedNotify() { + CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>(); + HsSubpartitionView subpartitionView = + createSubpartitionView(() -> notifyAvailableFuture.complete(null)); + + TestingHsDataView memoryDataView = + TestingHsDataView.builder() + .setConsumeBufferFunction( + (bufferToConsume) -> + Optional.of(createBufferAndBacklog(0, DataType.NONE, 0))) + .build(); + subpartitionView.setMemoryDataView(memoryDataView); + subpartitionView.setDiskDataView(TestingHsDataView.NO_OP); + + subpartitionView.getNextBuffer(); + subpartitionView.notifyDataAvailable(); + assertThat(notifyAvailableFuture).isCompleted(); + } + + @Test + void testNotifyDataAvailableNotNeedNotify() { + CompletableFuture<Void> notifyAvailableFuture = new CompletableFuture<>(); + HsSubpartitionView subpartitionView = + createSubpartitionView(() -> notifyAvailableFuture.complete(null)); + + TestingHsDataView memoryDataView = + TestingHsDataView.builder() + .setConsumeBufferFunction( + (bufferToConsume) -> + Optional.of( + createBufferAndBacklog(0, DataType.DATA_BUFFER, 0))) + .build(); + subpartitionView.setMemoryDataView(memoryDataView); + subpartitionView.setDiskDataView(TestingHsDataView.NO_OP); + + subpartitionView.getNextBuffer(); + subpartitionView.notifyDataAvailable(); + assertThat(notifyAvailableFuture).isNotCompleted(); + } + + @Test + void testGetAvailabilityAndBacklogPositiveCredit() { + HsSubpartitionView subpartitionView = createSubpartitionView(); + subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP); + + final int backlog = 2; + subpartitionView.setDiskDataView( + TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build()); + AvailabilityWithBacklog availabilityAndBacklog = + subpartitionView.getAvailabilityAndBacklog(1); + assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog); + // positive credit always available. + assertThat(availabilityAndBacklog.isAvailable()).isTrue(); + } + + @Test + void testGetAvailabilityAndBacklogNonPositiveCreditNextIsData() { + final int backlog = 2; + + HsSubpartitionView subpartitionView = createSubpartitionView(); + subpartitionView.setMemoryDataView( + TestingHsDataView.builder() + .setConsumeBufferFunction( + (nextToConsume) -> + Optional.of( + createBufferAndBacklog( + backlog, DataType.DATA_BUFFER, 0))) + .build()); + subpartitionView.setDiskDataView( + TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build()); + + subpartitionView.getNextBuffer(); + + AvailabilityWithBacklog availabilityAndBacklog = + subpartitionView.getAvailabilityAndBacklog(0); + assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog); + // if credit is non-positive, only event can be available. + assertThat(availabilityAndBacklog.isAvailable()).isFalse(); + } + + @Test + void testGetAvailabilityAndBacklogNonPositiveCreditNextIsEvent() { + final int backlog = 2; + + HsSubpartitionView subpartitionView = createSubpartitionView(); + subpartitionView.setMemoryDataView( + TestingHsDataView.builder() + .setConsumeBufferFunction( + (nextToConsume) -> + Optional.of( + createBufferAndBacklog( + backlog, DataType.EVENT_BUFFER, 0))) + .build()); + subpartitionView.setDiskDataView( + TestingHsDataView.builder().setGetBacklogSupplier(() -> backlog).build()); + + subpartitionView.getNextBuffer(); + + AvailabilityWithBacklog availabilityAndBacklog = + subpartitionView.getAvailabilityAndBacklog(0); + assertThat(availabilityAndBacklog.getBacklog()).isEqualTo(backlog); + // if credit is non-positive, only event can be available. + assertThat(availabilityAndBacklog.isAvailable()).isTrue(); + } + + @Test + void testRelease() throws Exception { + HsSubpartitionView subpartitionView = createSubpartitionView(); + CompletableFuture<Void> releaseDataViewFuture = new CompletableFuture<>(); + TestingHsDataView diskDataView = + TestingHsDataView.builder() + .setReleaseDataViewRunnable(() -> releaseDataViewFuture.complete(null)) + .build(); + subpartitionView.setDiskDataView(diskDataView); + subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP); + subpartitionView.releaseAllResources(); + assertThat(subpartitionView.isReleased()).isTrue(); + assertThat(releaseDataViewFuture).isCompleted(); + } + + @Test + void testGetConsumingOffset() { + AtomicInteger nextBufferIndex = new AtomicInteger(0); + HsSubpartitionView subpartitionView = createSubpartitionView(); + TestingHsDataView diskDataView = + TestingHsDataView.builder() + .setConsumeBufferFunction( + (toConsumeBuffer) -> + Optional.of( + createBufferAndBacklog( + 0, + DataType.DATA_BUFFER, + nextBufferIndex.getAndIncrement()))) + .build(); + subpartitionView.setDiskDataView(diskDataView); + subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP); + + assertThat(subpartitionView.getConsumingOffset()).isEqualTo(-1); + subpartitionView.getNextBuffer(); + assertThat(subpartitionView.getConsumingOffset()).isEqualTo(0); + subpartitionView.getNextBuffer(); + assertThat(subpartitionView.getConsumingOffset()).isEqualTo(1); + } + + @Test + void testSetDataViewRepeatedly() { + HsSubpartitionView subpartitionView = createSubpartitionView(); + + subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP); + assertThatThrownBy(() -> subpartitionView.setMemoryDataView(TestingHsDataView.NO_OP)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("repeatedly set memory data view is not allowed."); + + subpartitionView.setDiskDataView(TestingHsDataView.NO_OP); + assertThatThrownBy(() -> subpartitionView.setDiskDataView(TestingHsDataView.NO_OP)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("repeatedly set disk data view is not allowed."); + } + + private static HsSubpartitionView createSubpartitionView() { + return new HsSubpartitionView(new NoOpBufferAvailablityListener()); + } + + private static HsSubpartitionView createSubpartitionView( + BufferAvailabilityListener bufferAvailabilityListener) { + return new HsSubpartitionView(bufferAvailabilityListener); + } + + private static BufferAndBacklog createBufferAndBacklog( + int buffersInBacklog, DataType nextDataType, int sequenceNumber) { + final int bufferSize = 8; + Buffer buffer = HybridShuffleTestUtils.createBuffer(bufferSize, true); + return new BufferAndBacklog(buffer, buffersInBacklog, nextDataType, sequenceNumber); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java new file mode 100644 index 00000000000..343bd9586df --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingHsDataView.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition; +import org.apache.flink.util.function.FunctionWithException; + +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + +/** Mock {@link HsDataView} for testing. */ +public class TestingHsDataView implements HsDataView { + public static final TestingHsDataView NO_OP = TestingHsDataView.builder().build(); + + private final FunctionWithException< + Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable> + consumeBufferFunction; + + private final Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction; + + private final Supplier<Integer> getBacklogSupplier; + + private final Runnable releaseDataViewRunnable; + + private TestingHsDataView( + FunctionWithException<Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable> + consumeBufferFunction, + Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction, + Supplier<Integer> getBacklogSupplier, + Runnable releaseDataViewRunnable) { + this.consumeBufferFunction = consumeBufferFunction; + this.peekNextToConsumeDataTypeFunction = peekNextToConsumeDataTypeFunction; + this.getBacklogSupplier = getBacklogSupplier; + this.releaseDataViewRunnable = releaseDataViewRunnable; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public Optional<ResultSubpartition.BufferAndBacklog> consumeBuffer(int nextBufferToConsume) + throws Throwable { + return consumeBufferFunction.apply(nextBufferToConsume); + } + + @Override + public Buffer.DataType peekNextToConsumeDataType(int nextBufferToConsume) { + return peekNextToConsumeDataTypeFunction.apply(nextBufferToConsume); + } + + @Override + public int getBacklog() { + return getBacklogSupplier.get(); + } + + @Override + public void releaseDataView() { + releaseDataViewRunnable.run(); + } + + /** Builder for {@link TestingHsDataView}. */ + public static class Builder { + private FunctionWithException< + Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable> + consumeBufferFunction = (ignore) -> Optional.empty(); + + private Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction = + (ignore) -> Buffer.DataType.NONE; + + private Supplier<Integer> getBacklogSupplier = () -> 0; + + private Runnable releaseDataViewRunnable = () -> {}; + + private Builder() {} + + public Builder setConsumeBufferFunction( + FunctionWithException< + Integer, Optional<ResultSubpartition.BufferAndBacklog>, Throwable> + consumeBufferFunction) { + this.consumeBufferFunction = consumeBufferFunction; + return this; + } + + public Builder setPeekNextToConsumeDataTypeFunction( + Function<Integer, Buffer.DataType> peekNextToConsumeDataTypeFunction) { + this.peekNextToConsumeDataTypeFunction = peekNextToConsumeDataTypeFunction; + return this; + } + + public Builder setGetBacklogSupplier(Supplier<Integer> getBacklogSupplier) { + this.getBacklogSupplier = getBacklogSupplier; + return this; + } + + public Builder setReleaseDataViewRunnable(Runnable releaseDataViewRunnable) { + this.releaseDataViewRunnable = releaseDataViewRunnable; + return this; + } + + public TestingHsDataView build() { + return new TestingHsDataView( + consumeBufferFunction, + peekNextToConsumeDataTypeFunction, + getBacklogSupplier, + releaseDataViewRunnable); + } + } +}