This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7052251b22d [FLINK-32977] Deprecate read and write method in Path
7052251b22d is described below

commit 7052251b22d3fcb7b34f29b13d19cd49eaff4aaf
Author: Wencong Liu <liuwencle...@163.com>
AuthorDate: Mon Jul 24 14:15:47 2023 +0800

    [FLINK-32977] Deprecate read and write method in Path
    
    This closes #23072
---
 .../file/src/FileSourceSplitSerializer.java        |   6 +-
 .../main/java/org/apache/flink/core/fs/Path.java   | 114 ++++++++++++++++-----
 .../sink/TestManagedSinkCommittableSerializer.java |   7 +-
 .../TestManagedFileSourceSplitSerializer.java      |   6 +-
 4 files changed, 96 insertions(+), 37 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
index d37b00950c7..6b7bcb58669 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
@@ -64,7 +64,7 @@ public final class FileSourceSplitSerializer implements 
SimpleVersionedSerialize
         final DataOutputSerializer out = SERIALIZER_CACHE.get();
 
         out.writeUTF(split.splitId());
-        split.path().write(out);
+        Path.serializeToDataOutputView(split.path(), out);
         out.writeLong(split.offset());
         out.writeLong(split.length());
         out.writeLong(split.fileModificationTime());
@@ -100,8 +100,8 @@ public final class FileSourceSplitSerializer implements 
SimpleVersionedSerialize
         final DataInputDeserializer in = new DataInputDeserializer(serialized);
 
         final String id = in.readUTF();
-        final Path path = new Path();
-        path.read(in);
+        Path result = Path.deserializeFromDataInputView(in);
+        Path path = result == null ? new Path() : result;
         final long offset = in.readLong();
         final long len = in.readLong();
         final long modificationTime = in.readLong();
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 912938d63a6..4f1069f3e6b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -28,6 +28,8 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -40,6 +42,9 @@ import java.util.regex.Pattern;
  * separator. A path string is absolute if it begins with a slash.
  *
  * <p>Tailing slashes are removed from the path.
+ *
+ * <p>Note: Path will no longer implement {@link IOReadableWritable} in future 
versions. Please use
+ * {@code serializeToDataOutputView} and {@code deserializeFromDataInputView} 
instead.
  */
 @Public
 public class Path implements IOReadableWritable, Serializable {
@@ -443,40 +448,43 @@ public class Path implements IOReadableWritable, 
Serializable {
     //  Legacy Serialization
     // ------------------------------------------------------------------------
 
+    /**
+     * Read uri from {@link DataInputView}.
+     *
+     * @param in the input view to read the uri.
+     * @throws IOException if an error happened.
+     * @deprecated the method is deprecated since Flink 1.19 because Path will 
no longer implement
+     *     {@link IOReadableWritable} in future versions. Please use {@code
+     *     deserializeFromDataInputView} instead.
+     * @see <a
+     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-347%3A+Remove+IOReadableWritable
+     *     +serialization+in+Path"> FLIP-347: Remove IOReadableWritable 
serialization in Path </a>
+     */
+    @Deprecated
     @Override
     public void read(DataInputView in) throws IOException {
-        final boolean isNotNull = in.readBoolean();
-        if (isNotNull) {
-            final String scheme = StringUtils.readNullableString(in);
-            final String userInfo = StringUtils.readNullableString(in);
-            final String host = StringUtils.readNullableString(in);
-            final int port = in.readInt();
-            final String path = StringUtils.readNullableString(in);
-            final String query = StringUtils.readNullableString(in);
-            final String fragment = StringUtils.readNullableString(in);
-
-            try {
-                uri = new URI(scheme, userInfo, host, port, path, query, 
fragment);
-            } catch (URISyntaxException e) {
-                throw new IOException("Error reconstructing URI", e);
-            }
+        Path path = deserializeFromDataInputView(in);
+        if (path != null) {
+            uri = path.toUri();
         }
     }
 
+    /**
+     * Write uri to {@link DataOutputView}.
+     *
+     * @param out the output view to be written the uri.
+     * @throws IOException if an error happened.
+     * @deprecated the method is deprecated since Flink 1.19 because Path will 
no longer implement
+     *     {@link IOReadableWritable} in future versions. Please use {@code
+     *     serializeToDataOutputView} instead.
+     * @see <a
+     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-347%3A+Remove+IOReadableWritable
+     *     +serialization+in+Path"> FLIP-347: Remove IOReadableWritable 
serialization in Path </a>
+     */
+    @Deprecated
     @Override
     public void write(DataOutputView out) throws IOException {
-        if (uri == null) {
-            out.writeBoolean(false);
-        } else {
-            out.writeBoolean(true);
-            StringUtils.writeNullableString(uri.getScheme(), out);
-            StringUtils.writeNullableString(uri.getUserInfo(), out);
-            StringUtils.writeNullableString(uri.getHost(), out);
-            out.writeInt(uri.getPort());
-            StringUtils.writeNullableString(uri.getPath(), out);
-            StringUtils.writeNullableString(uri.getQuery(), out);
-            StringUtils.writeNullableString(uri.getFragment(), out);
-        }
+        serializeToDataOutputView(this, out);
     }
 
     // ------------------------------------------------------------------------
@@ -526,4 +534,56 @@ public class Path implements IOReadableWritable, 
Serializable {
     public static Path fromLocalFile(File file) {
         return new Path(file.toURI());
     }
+
+    /**
+     * Deserialize the Path from {@link DataInputView}.
+     *
+     * @param in the data input view.
+     * @return the path
+     * @throws IOException if an error happened.
+     */
+    @Nullable
+    public static Path deserializeFromDataInputView(DataInputView in) throws 
IOException {
+        final boolean isNotNull = in.readBoolean();
+        Path result = null;
+        if (isNotNull) {
+            final String scheme = StringUtils.readNullableString(in);
+            final String userInfo = StringUtils.readNullableString(in);
+            final String host = StringUtils.readNullableString(in);
+            final int port = in.readInt();
+            final String path = StringUtils.readNullableString(in);
+            final String query = StringUtils.readNullableString(in);
+            final String fragment = StringUtils.readNullableString(in);
+
+            try {
+                result = new Path(new URI(scheme, userInfo, host, port, path, 
query, fragment));
+            } catch (URISyntaxException e) {
+                throw new IOException("Error reconstructing URI", e);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Serialize the path to {@link DataInputView}.
+     *
+     * @param path the file path.
+     * @param out the data out put view.
+     * @throws IOException if an error happened.
+     */
+    public static void serializeToDataOutputView(Path path, DataOutputView 
out) throws IOException {
+        URI uri = path.toUri();
+        if (uri == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            StringUtils.writeNullableString(uri.getScheme(), out);
+            StringUtils.writeNullableString(uri.getUserInfo(), out);
+            StringUtils.writeNullableString(uri.getHost(), out);
+            out.writeInt(uri.getPort());
+            StringUtils.writeNullableString(uri.getPath(), out);
+            StringUtils.writeNullableString(uri.getQuery(), out);
+            StringUtils.writeNullableString(uri.getFragment(), out);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/sink/TestManagedSinkCommittableSerializer.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/sink/TestManagedSinkCommittableSerializer.java
index d3ed071b5c0..9c983eaa417 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/sink/TestManagedSinkCommittableSerializer.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/sink/TestManagedSinkCommittableSerializer.java
@@ -113,7 +113,7 @@ public class TestManagedSinkCommittableSerializer
     private void serializePaths(DataOutputSerializer out, Set<Path> paths) 
throws IOException {
         out.writeInt(paths.size());
         for (Path path : paths) {
-            path.write(out);
+            Path.serializeToDataOutputView(path, out);
         }
     }
 
@@ -143,9 +143,8 @@ public class TestManagedSinkCommittableSerializer
         int size = in.readInt();
         Set<Path> paths = new HashSet<>(size);
         for (int i = 0; i < size; i++) {
-            Path path = new Path();
-            path.read(in);
-            paths.add(path);
+            Path result = Path.deserializeFromDataInputView(in);
+            paths.add(result == null ? new Path() : result);
         }
         return paths;
     }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedFileSourceSplitSerializer.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedFileSourceSplitSerializer.java
index 644a6f21855..b5aae10d41e 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedFileSourceSplitSerializer.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedFileSourceSplitSerializer.java
@@ -40,7 +40,7 @@ public class TestManagedFileSourceSplitSerializer
     public byte[] serialize(TestManagedIterableSourceSplit split) throws 
IOException {
         final DataOutputSerializer out = new DataOutputSerializer(64);
         out.writeUTF(split.splitId());
-        split.getFilePath().write(out);
+        Path.serializeToDataOutputView(split.getFilePath(), out);
         final byte[] result = out.getCopyOfBuffer();
         out.clear();
         return result;
@@ -52,8 +52,8 @@ public class TestManagedFileSourceSplitSerializer
         if (version == VERSION) {
             final DataInputDeserializer in = new 
DataInputDeserializer(serialized);
             final String id = in.readUTF();
-            final Path path = new Path();
-            path.read(in);
+            Path result = Path.deserializeFromDataInputView(in);
+            final Path path = result == null ? new Path() : result;
             return new TestManagedIterableSourceSplit(id, path);
         }
         throw new IOException(String.format("Unknown version %d", version));

Reply via email to