This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8834c6de1239e44ff6670bea8d7acdc217f0f54f
Author: yunfengzhou-hub <yuri.zhouyunf...@outlook.com>
AuthorDate: Tue Sep 24 21:14:22 2024 +0800

    [FLINK-36355][runtime] Remove deprecated DataStream#partitionCustom
---
 .../flink/streaming/api/datastream/DataStream.java | 46 ----------------------
 .../apache/flink/streaming/api/DataStreamTest.java |  9 +++--
 .../test/streaming/runtime/PartitionerITCase.java  |  2 +-
 3 files changed, 6 insertions(+), 51 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 36344ae952f..489c9c6ef91 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -299,42 +299,6 @@ public class DataStream<T> {
                 clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), 
getExecutionConfig())));
     }
 
-    /**
-     * Partitions a tuple DataStream on the specified key fields using a 
custom partitioner. This
-     * method takes the key position to partition on, and a partitioner that 
accepts the key type.
-     *
-     * <p>Note: This method works only on single field keys.
-     *
-     * @deprecated use {@link DataStream#partitionCustom(Partitioner, 
KeySelector)}.
-     * @param partitioner The partitioner to assign partitions to keys.
-     * @param field The field index on which the DataStream is partitioned.
-     * @return The partitioned DataStream.
-     */
-    @Deprecated
-    public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, int 
field) {
-        Keys.ExpressionKeys<T> outExpressionKeys =
-                new Keys.ExpressionKeys<>(new int[] {field}, getType());
-        return partitionCustom(partitioner, outExpressionKeys);
-    }
-
-    /**
-     * Partitions a POJO DataStream on the specified key fields using a custom 
partitioner. This
-     * method takes the key expression to partition on, and a partitioner that 
accepts the key type.
-     *
-     * <p>Note: This method works only on single field keys.
-     *
-     * @deprecated use {@link DataStream#partitionCustom(Partitioner, 
KeySelector)}.
-     * @param partitioner The partitioner to assign partitions to keys.
-     * @param field The expression for the field on which the DataStream is 
partitioned.
-     * @return The partitioned DataStream.
-     */
-    @Deprecated
-    public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, 
String field) {
-        Keys.ExpressionKeys<T> outExpressionKeys =
-                new Keys.ExpressionKeys<>(new String[] {field}, getType());
-        return partitionCustom(partitioner, outExpressionKeys);
-    }
-
     /**
      * Partitions a DataStream on the key returned by the selector, using a 
custom partitioner. This
      * method takes the key selector to get the key to partition on, and a 
partitioner that accepts
@@ -354,16 +318,6 @@ public class DataStream<T> {
                 new CustomPartitionerWrapper<>(clean(partitioner), 
clean(keySelector)));
     }
 
-    // private helper method for custom partitioning
-    private <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, 
Keys<T> keys) {
-        KeySelector<T, K> keySelector =
-                KeySelectorUtil.getSelectorForOneKey(
-                        keys, partitioner, getType(), getExecutionConfig());
-
-        return setConnectionType(
-                new CustomPartitionerWrapper<>(clean(partitioner), 
clean(keySelector)));
-    }
-
     /**
      * Sets the partitioning of the {@link DataStream} so that the output 
elements are broadcasted
      * to every parallel instance of the next operation.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 7e57151c644..4c600d621e0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -360,8 +360,8 @@ class DataStreamTest {
 
     /**
      * Tests that {@link DataStream#keyBy(KeySelector)} and {@link
-     * DataStream#partitionCustom(Partitioner, int)} result in different and 
correct topologies.
-     * Does the some for the {@link ConnectedStreams}.
+     * DataStream#partitionCustom(Partitioner, KeySelector)} result in 
different and correct
+     * topologies. Does the some for the {@link ConnectedStreams}.
      */
     @Test
     void testPartitioning() {
@@ -422,9 +422,10 @@ class DataStreamTest {
                     }
                 };
 
-        DataStream<Tuple2<Long, Long>> customPartition1 = 
src1.partitionCustom(longPartitioner, 0);
+        DataStream<Tuple2<Long, Long>> customPartition1 =
+                src1.partitionCustom(longPartitioner, x -> x.f0);
         DataStream<Tuple2<Long, Long>> customPartition3 =
-                src1.partitionCustom(longPartitioner, "f0");
+                src1.partitionCustom(longPartitioner, x -> x.f0);
         DataStream<Tuple2<Long, Long>> customPartition4 =
                 src1.partitionCustom(longPartitioner, new FirstSelector());
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
index a3a78d6ec1c..6c60f12b705 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
@@ -117,7 +117,7 @@ public class PartitionerITCase extends 
AbstractTestBaseJUnit4 {
                                         }
                                     }
                                 },
-                                0)
+                                x -> x.f0)
                         .map(new SubtaskIndexAssigner());
 
         partitionCustom.addSink(customPartitionResultSink);

Reply via email to