This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fbd2e4f93f0f32ed86df34784566c4dcb3a9f1ac Author: Rui Fan <[email protected]> AuthorDate: Thu Jan 15 17:28:49 2026 +0100 [hotfix] Extract VirtualChannel as the public class --- .../recovery/DemultiplexingRecordDeserializer.java | 55 +-------- .../runtime/io/recovery/VirtualChannel.java | 130 +++++++++++++++++++++ 2 files changed, 133 insertions(+), 52 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/recovery/DemultiplexingRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/recovery/DemultiplexingRecordDeserializer.java index c63b762435a..6f1f3bda8b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/recovery/DemultiplexingRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/recovery/DemultiplexingRecordDeserializer.java @@ -57,56 +57,6 @@ class DemultiplexingRecordDeserializer<T> private VirtualChannel<T> currentVirtualChannel; - static class VirtualChannel<T> { - private final RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer; - private final RecordFilter<T> recordFilter; - Watermark lastWatermark = Watermark.UNINITIALIZED; - WatermarkStatus watermarkStatus = WatermarkStatus.ACTIVE; - private DeserializationResult lastResult; - - VirtualChannel( - RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer, - RecordFilter<T> recordFilter) { - this.deserializer = deserializer; - this.recordFilter = recordFilter; - } - - public DeserializationResult getNextRecord(DeserializationDelegate<StreamElement> delegate) - throws IOException { - do { - lastResult = deserializer.getNextRecord(delegate); - - if (lastResult.isFullRecord()) { - final StreamElement element = delegate.getInstance(); - // test if record belongs to this subtask if it comes from ambiguous channel - if (element.isRecord() && recordFilter.filter(element.asRecord())) { - return lastResult; - } else if (element.isWatermark()) { - lastWatermark = element.asWatermark(); - return lastResult; - } else if (element.isWatermarkStatus()) { - watermarkStatus = element.asWatermarkStatus(); - return lastResult; - } - } - // loop is only re-executed for filtered full records - } while (!lastResult.isBufferConsumed()); - return DeserializationResult.PARTIAL_RECORD; - } - - public void setNextBuffer(Buffer buffer) throws IOException { - deserializer.setNextBuffer(buffer); - } - - public void clear() { - deserializer.clear(); - } - - public boolean hasPartialData() { - return lastResult != null && !lastResult.isBufferConsumed(); - } - } - public DemultiplexingRecordDeserializer( Map<SubtaskConnectionDescriptor, VirtualChannel<T>> channels) { this.channels = checkNotNull(channels); @@ -159,7 +109,7 @@ class DemultiplexingRecordDeserializer<T> // basically, do not emit a watermark if not all virtual channel are past it final Watermark minWatermark = channels.values().stream() - .map(virtualChannel -> virtualChannel.lastWatermark) + .map(VirtualChannel::getLastWatermark) .min(Comparator.comparing(Watermark::getTimestamp)) .orElseThrow( () -> @@ -174,7 +124,8 @@ class DemultiplexingRecordDeserializer<T> } else if (element.isWatermarkStatus()) { // summarize statuses across all virtual channels // duplicate statuses are filtered in StatusWatermarkValve - if (channels.values().stream().anyMatch(d -> d.watermarkStatus.isActive())) { + if (channels.values().stream() + .anyMatch(vc -> vc.getWatermarkStatus().isActive())) { delegate.setInstance(WatermarkStatus.ACTIVE); } return result; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/recovery/VirtualChannel.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/recovery/VirtualChannel.java new file mode 100644 index 00000000000..ddcbba79f25 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/recovery/VirtualChannel.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.recovery; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +import java.io.IOException; + +/** + * Represents a virtual channel for demultiplexing records during recovery. + * + * <p>A virtual channel wraps a {@link RecordDeserializer} and adds record filtering capability, + * along with tracking watermark and watermark status state. + * + * @param <T> The type of record values. + */ +@Internal +public class VirtualChannel<T> { + private final RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer; + private final RecordFilter<T> recordFilter; + + private Watermark lastWatermark = Watermark.UNINITIALIZED; + private WatermarkStatus watermarkStatus = WatermarkStatus.ACTIVE; + private DeserializationResult lastResult; + + public VirtualChannel( + RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer, + RecordFilter<T> recordFilter) { + this.deserializer = deserializer; + this.recordFilter = recordFilter; + } + + /** + * Deserializes the next record from the buffer, applying the record filter. + * + * <p>This method loops through records until it finds one that passes the filter or the buffer + * is consumed. Watermarks and watermark statuses are always accepted and their state is + * updated. + * + * @param delegate The deserialization delegate to populate with the record. + * @return The deserialization result indicating whether a full record was read. + * @throws IOException If an I/O error occurs during deserialization. + */ + public DeserializationResult getNextRecord(DeserializationDelegate<StreamElement> delegate) + throws IOException { + do { + lastResult = deserializer.getNextRecord(delegate); + + if (lastResult.isFullRecord()) { + final StreamElement element = delegate.getInstance(); + // test if record belongs to this subtask if it comes from ambiguous channel + if (element.isRecord() && recordFilter.filter(element.asRecord())) { + return lastResult; + } else if (element.isWatermark()) { + lastWatermark = element.asWatermark(); + return lastResult; + } else if (element.isWatermarkStatus()) { + watermarkStatus = element.asWatermarkStatus(); + return lastResult; + } + } + // loop is only re-executed for filtered full records + } while (!lastResult.isBufferConsumed()); + return DeserializationResult.PARTIAL_RECORD; + } + + /** + * Sets the next buffer to be deserialized. + * + * @param buffer The buffer containing serialized records. + * @throws IOException If an I/O error occurs. + */ + public void setNextBuffer(Buffer buffer) throws IOException { + deserializer.setNextBuffer(buffer); + } + + /** Clears the deserializer state. */ + public void clear() { + deserializer.clear(); + } + + /** + * Checks if there is partial data remaining in the buffer. + * + * @return true if the last result indicates the buffer was not fully consumed. + */ + public boolean hasPartialData() { + return lastResult != null && !lastResult.isBufferConsumed(); + } + + /** + * Gets the last watermark received on this virtual channel. + * + * @return The last watermark, or {@link Watermark#UNINITIALIZED} if none received yet. + */ + public Watermark getLastWatermark() { + return lastWatermark; + } + + /** + * Gets the current watermark status of this virtual channel. + * + * @return The current watermark status. + */ + public WatermarkStatus getWatermarkStatus() { + return watermarkStatus; + } +}
