This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push:
new 837f8e58485 Revert "[FLINK-33532][network] Move the serialization of
ShuffleDescriptorGroup out of the RPC main thread]"
837f8e58485 is described below
commit 837f8e584850bdcbc586a54f58e3fe953a816be8
Author: caodizhou <[email protected]>
AuthorDate: Wed Mar 6 14:11:56 2024 +0800
Revert "[FLINK-33532][network] Move the serialization of
ShuffleDescriptorGroup out of the RPC main thread]"
This reverts commit d18a4bfe596fc580f8280750fa3bfa22007671d9.
(cherry picked from commit 7a709bf323c1cce3440887fe937311bae119aab0)
---
.../org/apache/flink/runtime/blob/BlobWriter.java | 11 ++--
.../deployment/CachedShuffleDescriptors.java | 2 +-
.../deployment/InputGateDeploymentDescriptor.java | 41 ++++++++++-----
.../deployment/TaskDeploymentDescriptor.java | 19 -------
.../TaskDeploymentDescriptorFactory.java | 58 ++++++++--------------
.../deployment/CachedShuffleDescriptorsTest.java | 30 ++++++-----
.../TaskDeploymentDescriptorTestUtils.java | 9 ++--
.../partition/consumer/SingleInputGateTest.java | 6 ++-
8 files changed, 83 insertions(+), 93 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
index 555cccfb7ca..2d5292b42cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Optional;
/** BlobWriter is used to upload data to the BLOB store. */
public interface BlobWriter {
@@ -103,13 +102,11 @@ public interface BlobWriter {
if (serializedValue.getByteArray().length <
blobWriter.getMinOffloadingSize()) {
return Either.Left(serializedValue);
} else {
- return offloadWithException(serializedValue, jobId, blobWriter)
- .map(Either::<SerializedValue<T>, PermanentBlobKey>Right)
- .orElse(Either.Left(serializedValue));
+ return offloadWithException(serializedValue, jobId, blobWriter);
}
}
- static <T> Optional<PermanentBlobKey> offloadWithException(
+ static <T> Either<SerializedValue<T>, PermanentBlobKey>
offloadWithException(
SerializedValue<T> serializedValue, JobID jobId, BlobWriter
blobWriter) {
Preconditions.checkNotNull(serializedValue);
Preconditions.checkNotNull(jobId);
@@ -117,10 +114,10 @@ public interface BlobWriter {
try {
final PermanentBlobKey permanentBlobKey =
blobWriter.putPermanent(jobId,
serializedValue.getByteArray());
- return Optional.of(permanentBlobKey);
+ return Either.Right(permanentBlobKey);
} catch (IOException e) {
LOG.warn("Failed to offload value for job {} to BLOB store.",
jobId, e);
- return Optional.empty();
+ return Either.Left(serializedValue);
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
index 4ddacbd671a..b8e0b44006f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptors.java
@@ -87,7 +87,7 @@ public class CachedShuffleDescriptors {
new ShuffleDescriptorGroup(
toBeSerialized.toArray(new
ShuffleDescriptorAndIndex[0]));
MaybeOffloaded<ShuffleDescriptorGroup>
serializedShuffleDescriptorGroup =
-
shuffleDescriptorSerializer.trySerializeAndOffloadShuffleDescriptor(
+
shuffleDescriptorSerializer.serializeAndTryOffloadShuffleDescriptor(
shuffleDescriptorGroup, numConsumers);
toBeSerialized.clear();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index 4e02c699331..333a91e0a73 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
-import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloadedRaw;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.Offloaded;
import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
@@ -98,7 +98,9 @@ public class InputGateDeploymentDescriptor implements
Serializable {
new IndexRange(consumedSubpartitionIndex,
consumedSubpartitionIndex),
inputChannels.length,
Collections.singletonList(
- new NonOffloadedRaw<>(new
ShuffleDescriptorGroup(inputChannels))));
+ new NonOffloaded<>(
+ CompressedSerializedValue.fromObject(
+ new
ShuffleDescriptorGroup(inputChannels)))));
}
public InputGateDeploymentDescriptor(
@@ -145,14 +147,18 @@ public class InputGateDeploymentDescriptor implements
Serializable {
// This is only for testing scenarios, in a production environment
we always call
// tryLoadAndDeserializeShuffleDescriptors to deserialize
ShuffleDescriptors first.
inputChannels = new ShuffleDescriptor[numberOfInputChannels];
- for (MaybeOffloaded<ShuffleDescriptorGroup> rawShuffleDescriptors :
- serializedInputChannels) {
- checkState(
- rawShuffleDescriptors instanceof NonOffloadedRaw,
- "Trying to work with offloaded serialized shuffle
descriptors.");
- NonOffloadedRaw<ShuffleDescriptorGroup> nonOffloadedRawValue =
- (NonOffloadedRaw<ShuffleDescriptorGroup>)
rawShuffleDescriptors;
- putOrReplaceShuffleDescriptors(nonOffloadedRawValue.value);
+ try {
+ for (MaybeOffloaded<ShuffleDescriptorGroup>
serializedShuffleDescriptors :
+ serializedInputChannels) {
+ checkState(
+ serializedShuffleDescriptors instanceof
NonOffloaded,
+ "Trying to work with offloaded serialized shuffle
descriptors.");
+ NonOffloaded<ShuffleDescriptorGroup>
nonOffloadedSerializedValue =
+ (NonOffloaded<ShuffleDescriptorGroup>)
serializedShuffleDescriptors;
+
tryDeserializeShuffleDescriptorGroup(nonOffloadedSerializedValue);
+ }
+ } catch (ClassNotFoundException | IOException e) {
+ throw new RuntimeException("Could not deserialize shuffle
descriptors.", e);
}
}
return inputChannels;
@@ -207,12 +213,21 @@ public class InputGateDeploymentDescriptor implements
Serializable {
}
putOrReplaceShuffleDescriptors(shuffleDescriptorGroup);
} else {
- NonOffloadedRaw<ShuffleDescriptorGroup>
nonOffloadedSerializedValue =
- (NonOffloadedRaw<ShuffleDescriptorGroup>)
serializedShuffleDescriptors;
- putOrReplaceShuffleDescriptors(nonOffloadedSerializedValue.value);
+ NonOffloaded<ShuffleDescriptorGroup> nonOffloadedSerializedValue =
+ (NonOffloaded<ShuffleDescriptorGroup>)
serializedShuffleDescriptors;
+ tryDeserializeShuffleDescriptorGroup(nonOffloadedSerializedValue);
}
}
+ private void tryDeserializeShuffleDescriptorGroup(
+ NonOffloaded<ShuffleDescriptorGroup>
nonOffloadedShuffleDescriptorGroup)
+ throws IOException, ClassNotFoundException {
+ ShuffleDescriptorGroup shuffleDescriptorGroup =
+
nonOffloadedShuffleDescriptorGroup.serializedValue.deserializeValue(
+ getClass().getClassLoader());
+ putOrReplaceShuffleDescriptors(shuffleDescriptorGroup);
+ }
+
private void putOrReplaceShuffleDescriptors(ShuffleDescriptorGroup
shuffleDescriptorGroup) {
for (ShuffleDescriptorAndIndex shuffleDescriptorAndIndex :
shuffleDescriptorGroup.getShuffleDescriptors()) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 016105d9aac..5684066735f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -80,25 +80,6 @@ public final class TaskDeploymentDescriptor implements
Serializable {
}
}
- /**
- * The raw value that is not offloaded to the {@link
org.apache.flink.runtime.blob.BlobServer}.
- *
- * @param <T> type of the raw value
- */
- public static class NonOffloadedRaw<T> extends MaybeOffloaded<T> {
- private static final long serialVersionUID = 1L;
-
- /** The raw value. */
- public T value;
-
- @SuppressWarnings("unused")
- public NonOffloadedRaw() {}
-
- public NonOffloadedRaw(T value) {
- this.value = Preconditions.checkNotNull(value);
- }
- }
-
/**
* Reference to a serialized value that was offloaded to the {@link
* org.apache.flink.runtime.blob.BlobServer}.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index d6a2b16010b..8b0498159a1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -47,14 +47,11 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.types.Either;
import org.apache.flink.util.CompressedSerializedValue;
-import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -452,7 +449,7 @@ public class TaskDeploymentDescriptorFactory {
public static class ShuffleDescriptorGroup implements Serializable {
private static final long serialVersionUID = 1L;
- private ShuffleDescriptorAndIndex[] shuffleDescriptors;
+ private final ShuffleDescriptorAndIndex[] shuffleDescriptors;
public ShuffleDescriptorGroup(ShuffleDescriptorAndIndex[]
shuffleDescriptors) {
this.shuffleDescriptors = checkNotNull(shuffleDescriptors);
@@ -461,31 +458,19 @@ public class TaskDeploymentDescriptorFactory {
public ShuffleDescriptorAndIndex[] getShuffleDescriptors() {
return shuffleDescriptors;
}
-
- private void writeObject(ObjectOutputStream oos) throws IOException {
- byte[] bytes =
InstantiationUtil.serializeObjectAndCompress(shuffleDescriptors);
- oos.writeObject(bytes);
- }
-
- private void readObject(ObjectInputStream ois) throws IOException,
ClassNotFoundException {
- byte[] bytes = (byte[]) ois.readObject();
- shuffleDescriptors =
- InstantiationUtil.decompressAndDeserializeObject(
- bytes, ClassLoader.getSystemClassLoader());
- }
}
- /** Offload shuffle descriptors. */
+ /** Serialize shuffle descriptors. */
interface ShuffleDescriptorSerializer {
/**
- * Try to serialize and offload shuffle descriptors.
+ * Serialize and try offload shuffle descriptors.
*
- * @param shuffleDescriptorGroup to serialize and offload
+ * @param shuffleDescriptorGroup to serialize
* @param numConsumer consumers number of these shuffle descriptors,
it means how many times
* serialized shuffle descriptor should be sent
- * @return offloaded serialized or non-offloaded raw shuffle
descriptors
+ * @return offloaded or non-offloaded serialized shuffle descriptors
*/
- MaybeOffloaded<ShuffleDescriptorGroup>
trySerializeAndOffloadShuffleDescriptor(
+ MaybeOffloaded<ShuffleDescriptorGroup>
serializeAndTryOffloadShuffleDescriptor(
ShuffleDescriptorGroup shuffleDescriptorGroup, int
numConsumer) throws IOException;
}
@@ -502,24 +487,25 @@ public class TaskDeploymentDescriptorFactory {
}
@Override
- public MaybeOffloaded<ShuffleDescriptorGroup>
trySerializeAndOffloadShuffleDescriptor(
+ public MaybeOffloaded<ShuffleDescriptorGroup>
serializeAndTryOffloadShuffleDescriptor(
ShuffleDescriptorGroup shuffleDescriptorGroup, int
numConsumer) throws IOException {
- final Either<ShuffleDescriptorGroup, PermanentBlobKey>
rawValueOrBlobKey =
-
shouldOffload(shuffleDescriptorGroup.getShuffleDescriptors(), numConsumer)
- ? BlobWriter.offloadWithException(
-
CompressedSerializedValue.fromObject(
- shuffleDescriptorGroup),
- jobID,
- blobWriter)
- .map(Either::<ShuffleDescriptorGroup,
PermanentBlobKey>Right)
-
.orElse(Either.Left(shuffleDescriptorGroup))
- : Either.Left(shuffleDescriptorGroup);
-
- if (rawValueOrBlobKey.isLeft()) {
- return new
TaskDeploymentDescriptor.NonOffloadedRaw<>(rawValueOrBlobKey.left());
+ final CompressedSerializedValue<ShuffleDescriptorGroup>
compressedSerializedValue =
+
CompressedSerializedValue.fromObject(shuffleDescriptorGroup);
+
+ final Either<SerializedValue<ShuffleDescriptorGroup>,
PermanentBlobKey>
+ serializedValueOrBlobKey =
+ shouldOffload(
+
shuffleDescriptorGroup.getShuffleDescriptors(),
+ numConsumer)
+ ? BlobWriter.offloadWithException(
+ compressedSerializedValue, jobID,
blobWriter)
+ : Either.Left(compressedSerializedValue);
+
+ if (serializedValueOrBlobKey.isLeft()) {
+ return new
TaskDeploymentDescriptor.NonOffloaded<>(serializedValueOrBlobKey.left());
} else {
- return new
TaskDeploymentDescriptor.Offloaded<>(rawValueOrBlobKey.right());
+ return new
TaskDeploymentDescriptor.Offloaded<>(serializedValueOrBlobKey.right());
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
index 0160d180e23..f9cd00e103b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/CachedShuffleDescriptorsTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.deployment;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
-import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloadedRaw;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -39,10 +39,12 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.CompressedSerializedValue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -88,7 +90,7 @@ class CachedShuffleDescriptorsTest {
assertThat(cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups()).hasSize(1);
MaybeOffloaded<ShuffleDescriptorGroup> maybeOffloadedShuffleDescriptor
=
cachedShuffleDescriptors.getAllSerializedShuffleDescriptorGroups().get(0);
- assertNonOffloadedRawShuffleDescriptorAndIndexEquals(
+ assertNonOffloadedShuffleDescriptorAndIndexEquals(
maybeOffloadedShuffleDescriptor,
Collections.singletonList(shuffleDescriptor),
Collections.singletonList(0));
@@ -142,22 +144,26 @@ class CachedShuffleDescriptorsTest {
intermediateResultPartition2,
TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN,
false);
- assertNonOffloadedRawShuffleDescriptorAndIndexEquals(
+ assertNonOffloadedShuffleDescriptorAndIndexEquals(
maybeOffloaded,
Arrays.asList(expectedShuffleDescriptor1,
expectedShuffleDescriptor2),
Arrays.asList(0, 1));
}
- private void assertNonOffloadedRawShuffleDescriptorAndIndexEquals(
+ private void assertNonOffloadedShuffleDescriptorAndIndexEquals(
MaybeOffloaded<ShuffleDescriptorGroup> maybeOffloaded,
List<ShuffleDescriptor> expectedDescriptors,
- List<Integer> expectedIndices) {
+ List<Integer> expectedIndices)
+ throws Exception {
assertThat(expectedDescriptors).hasSameSizeAs(expectedIndices);
- assertThat(maybeOffloaded).isInstanceOf(NonOffloadedRaw.class);
- NonOffloadedRaw<ShuffleDescriptorGroup> nonOffloadedRaw =
- (NonOffloadedRaw<ShuffleDescriptorGroup>) maybeOffloaded;
+ assertThat(maybeOffloaded).isInstanceOf(NonOffloaded.class);
+ NonOffloaded<ShuffleDescriptorGroup> nonOffloaded =
+ (NonOffloaded<ShuffleDescriptorGroup>) maybeOffloaded;
ShuffleDescriptorAndIndex[] shuffleDescriptorAndIndices =
- nonOffloadedRaw.value.getShuffleDescriptors();
+ nonOffloaded
+ .serializedValue
+ .deserializeValue(getClass().getClassLoader())
+ .getShuffleDescriptors();
assertThat(shuffleDescriptorAndIndices).hasSameSizeAs(expectedDescriptors);
for (int i = 0; i < shuffleDescriptorAndIndices.length; i++) {
assertThat(shuffleDescriptorAndIndices[i].getIndex()).isEqualTo(expectedIndices.get(i));
@@ -212,9 +218,9 @@ class CachedShuffleDescriptorsTest {
implements
TaskDeploymentDescriptorFactory.ShuffleDescriptorSerializer {
@Override
- public MaybeOffloaded<ShuffleDescriptorGroup>
trySerializeAndOffloadShuffleDescriptor(
- ShuffleDescriptorGroup shuffleDescriptorGroup, int
numConsumer) {
- return new NonOffloadedRaw<>(shuffleDescriptorGroup);
+ public MaybeOffloaded<ShuffleDescriptorGroup>
serializeAndTryOffloadShuffleDescriptor(
+ ShuffleDescriptorGroup shuffleDescriptorGroup, int
numConsumer) throws IOException {
+ return new
NonOffloaded<>(CompressedSerializedValue.fromObject(shuffleDescriptorGroup));
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java
index 04683d4489c..19fbefe2920 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTestUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.deployment;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
-import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloadedRaw;
+import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.NonOffloaded;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.Offloaded;
import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorAndIndex;
import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup;
@@ -47,8 +47,11 @@ public class TaskDeploymentDescriptorTestUtils {
int maxIndex = 0;
for (MaybeOffloaded<ShuffleDescriptorGroup> sd : maybeOffloaded) {
ShuffleDescriptorGroup shuffleDescriptorGroup;
- if (sd instanceof NonOffloadedRaw) {
- shuffleDescriptorGroup =
((NonOffloadedRaw<ShuffleDescriptorGroup>) sd).value;
+ if (sd instanceof NonOffloaded) {
+ shuffleDescriptorGroup =
+ ((NonOffloaded<ShuffleDescriptorGroup>) sd)
+ .serializedValue.deserializeValue(
+ ClassLoader.getSystemClassLoader());
} else {
final CompressedSerializedValue<ShuffleDescriptorGroup>
compressedSerializedValue =
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 071a23ad34b..0ae74209507 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -71,6 +71,7 @@ import
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
+import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.shaded.guava31.com.google.common.io.Closer;
@@ -1312,8 +1313,9 @@ class SingleInputGateTest extends InputGateTestBase {
subpartitionIndexRange,
channelDescs.length,
Collections.singletonList(
- new TaskDeploymentDescriptor.NonOffloadedRaw<>(
- new
ShuffleDescriptorGroup(channelDescs))));
+ new TaskDeploymentDescriptor.NonOffloaded<>(
+ CompressedSerializedValue.fromObject(
+ new
ShuffleDescriptorGroup(channelDescs)))));
final TaskMetricGroup taskMetricGroup =
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();