This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8834c6de1239e44ff6670bea8d7acdc217f0f54f Author: yunfengzhou-hub <yuri.zhouyunf...@outlook.com> AuthorDate: Tue Sep 24 21:14:22 2024 +0800 [FLINK-36355][runtime] Remove deprecated DataStream#partitionCustom --- .../flink/streaming/api/datastream/DataStream.java | 46 ---------------------- .../apache/flink/streaming/api/DataStreamTest.java | 9 +++-- .../test/streaming/runtime/PartitionerITCase.java | 2 +- 3 files changed, 6 insertions(+), 51 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 36344ae952f..489c9c6ef91 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -299,42 +299,6 @@ public class DataStream<T> { clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig()))); } - /** - * Partitions a tuple DataStream on the specified key fields using a custom partitioner. This - * method takes the key position to partition on, and a partitioner that accepts the key type. - * - * <p>Note: This method works only on single field keys. - * - * @deprecated use {@link DataStream#partitionCustom(Partitioner, KeySelector)}. - * @param partitioner The partitioner to assign partitions to keys. - * @param field The field index on which the DataStream is partitioned. - * @return The partitioned DataStream. - */ - @Deprecated - public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, int field) { - Keys.ExpressionKeys<T> outExpressionKeys = - new Keys.ExpressionKeys<>(new int[] {field}, getType()); - return partitionCustom(partitioner, outExpressionKeys); - } - - /** - * Partitions a POJO DataStream on the specified key fields using a custom partitioner. This - * method takes the key expression to partition on, and a partitioner that accepts the key type. - * - * <p>Note: This method works only on single field keys. - * - * @deprecated use {@link DataStream#partitionCustom(Partitioner, KeySelector)}. - * @param partitioner The partitioner to assign partitions to keys. - * @param field The expression for the field on which the DataStream is partitioned. - * @return The partitioned DataStream. - */ - @Deprecated - public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, String field) { - Keys.ExpressionKeys<T> outExpressionKeys = - new Keys.ExpressionKeys<>(new String[] {field}, getType()); - return partitionCustom(partitioner, outExpressionKeys); - } - /** * Partitions a DataStream on the key returned by the selector, using a custom partitioner. This * method takes the key selector to get the key to partition on, and a partitioner that accepts @@ -354,16 +318,6 @@ public class DataStream<T> { new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector))); } - // private helper method for custom partitioning - private <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, Keys<T> keys) { - KeySelector<T, K> keySelector = - KeySelectorUtil.getSelectorForOneKey( - keys, partitioner, getType(), getExecutionConfig()); - - return setConnectionType( - new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector))); - } - /** * Sets the partitioning of the {@link DataStream} so that the output elements are broadcasted * to every parallel instance of the next operation. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 7e57151c644..4c600d621e0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -360,8 +360,8 @@ class DataStreamTest { /** * Tests that {@link DataStream#keyBy(KeySelector)} and {@link - * DataStream#partitionCustom(Partitioner, int)} result in different and correct topologies. - * Does the some for the {@link ConnectedStreams}. + * DataStream#partitionCustom(Partitioner, KeySelector)} result in different and correct + * topologies. Does the some for the {@link ConnectedStreams}. */ @Test void testPartitioning() { @@ -422,9 +422,10 @@ class DataStreamTest { } }; - DataStream<Tuple2<Long, Long>> customPartition1 = src1.partitionCustom(longPartitioner, 0); + DataStream<Tuple2<Long, Long>> customPartition1 = + src1.partitionCustom(longPartitioner, x -> x.f0); DataStream<Tuple2<Long, Long>> customPartition3 = - src1.partitionCustom(longPartitioner, "f0"); + src1.partitionCustom(longPartitioner, x -> x.f0); DataStream<Tuple2<Long, Long>> customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java index a3a78d6ec1c..6c60f12b705 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java @@ -117,7 +117,7 @@ public class PartitionerITCase extends AbstractTestBaseJUnit4 { } } }, - 0) + x -> x.f0) .map(new SubtaskIndexAssigner()); partitionCustom.addSink(customPartitionResultSink);