This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 7052251b22d [FLINK-32977] Deprecate read and write method in Path 7052251b22d is described below commit 7052251b22d3fcb7b34f29b13d19cd49eaff4aaf Author: Wencong Liu <liuwencle...@163.com> AuthorDate: Mon Jul 24 14:15:47 2023 +0800 [FLINK-32977] Deprecate read and write method in Path This closes #23072 --- .../file/src/FileSourceSplitSerializer.java | 6 +- .../main/java/org/apache/flink/core/fs/Path.java | 114 ++++++++++++++++----- .../sink/TestManagedSinkCommittableSerializer.java | 7 +- .../TestManagedFileSourceSplitSerializer.java | 6 +- 4 files changed, 96 insertions(+), 37 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java index d37b00950c7..6b7bcb58669 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java @@ -64,7 +64,7 @@ public final class FileSourceSplitSerializer implements SimpleVersionedSerialize final DataOutputSerializer out = SERIALIZER_CACHE.get(); out.writeUTF(split.splitId()); - split.path().write(out); + Path.serializeToDataOutputView(split.path(), out); out.writeLong(split.offset()); out.writeLong(split.length()); out.writeLong(split.fileModificationTime()); @@ -100,8 +100,8 @@ public final class FileSourceSplitSerializer implements SimpleVersionedSerialize final DataInputDeserializer in = new DataInputDeserializer(serialized); final String id = in.readUTF(); - final Path path = new Path(); - path.read(in); + Path result = Path.deserializeFromDataInputView(in); + Path path = result == null ? new Path() : result; final long offset = in.readLong(); final long len = in.readLong(); final long modificationTime = in.readLong(); diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java index 912938d63a6..4f1069f3e6b 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java @@ -28,6 +28,8 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.StringUtils; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.io.Serializable; @@ -40,6 +42,9 @@ import java.util.regex.Pattern; * separator. A path string is absolute if it begins with a slash. * * <p>Tailing slashes are removed from the path. + * + * <p>Note: Path will no longer implement {@link IOReadableWritable} in future versions. Please use + * {@code serializeToDataOutputView} and {@code deserializeFromDataInputView} instead. */ @Public public class Path implements IOReadableWritable, Serializable { @@ -443,40 +448,43 @@ public class Path implements IOReadableWritable, Serializable { // Legacy Serialization // ------------------------------------------------------------------------ + /** + * Read uri from {@link DataInputView}. + * + * @param in the input view to read the uri. + * @throws IOException if an error happened. + * @deprecated the method is deprecated since Flink 1.19 because Path will no longer implement + * {@link IOReadableWritable} in future versions. Please use {@code + * deserializeFromDataInputView} instead. + * @see <a + * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-347%3A+Remove+IOReadableWritable + * +serialization+in+Path"> FLIP-347: Remove IOReadableWritable serialization in Path </a> + */ + @Deprecated @Override public void read(DataInputView in) throws IOException { - final boolean isNotNull = in.readBoolean(); - if (isNotNull) { - final String scheme = StringUtils.readNullableString(in); - final String userInfo = StringUtils.readNullableString(in); - final String host = StringUtils.readNullableString(in); - final int port = in.readInt(); - final String path = StringUtils.readNullableString(in); - final String query = StringUtils.readNullableString(in); - final String fragment = StringUtils.readNullableString(in); - - try { - uri = new URI(scheme, userInfo, host, port, path, query, fragment); - } catch (URISyntaxException e) { - throw new IOException("Error reconstructing URI", e); - } + Path path = deserializeFromDataInputView(in); + if (path != null) { + uri = path.toUri(); } } + /** + * Write uri to {@link DataOutputView}. + * + * @param out the output view to be written the uri. + * @throws IOException if an error happened. + * @deprecated the method is deprecated since Flink 1.19 because Path will no longer implement + * {@link IOReadableWritable} in future versions. Please use {@code + * serializeToDataOutputView} instead. + * @see <a + * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-347%3A+Remove+IOReadableWritable + * +serialization+in+Path"> FLIP-347: Remove IOReadableWritable serialization in Path </a> + */ + @Deprecated @Override public void write(DataOutputView out) throws IOException { - if (uri == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - StringUtils.writeNullableString(uri.getScheme(), out); - StringUtils.writeNullableString(uri.getUserInfo(), out); - StringUtils.writeNullableString(uri.getHost(), out); - out.writeInt(uri.getPort()); - StringUtils.writeNullableString(uri.getPath(), out); - StringUtils.writeNullableString(uri.getQuery(), out); - StringUtils.writeNullableString(uri.getFragment(), out); - } + serializeToDataOutputView(this, out); } // ------------------------------------------------------------------------ @@ -526,4 +534,56 @@ public class Path implements IOReadableWritable, Serializable { public static Path fromLocalFile(File file) { return new Path(file.toURI()); } + + /** + * Deserialize the Path from {@link DataInputView}. + * + * @param in the data input view. + * @return the path + * @throws IOException if an error happened. + */ + @Nullable + public static Path deserializeFromDataInputView(DataInputView in) throws IOException { + final boolean isNotNull = in.readBoolean(); + Path result = null; + if (isNotNull) { + final String scheme = StringUtils.readNullableString(in); + final String userInfo = StringUtils.readNullableString(in); + final String host = StringUtils.readNullableString(in); + final int port = in.readInt(); + final String path = StringUtils.readNullableString(in); + final String query = StringUtils.readNullableString(in); + final String fragment = StringUtils.readNullableString(in); + + try { + result = new Path(new URI(scheme, userInfo, host, port, path, query, fragment)); + } catch (URISyntaxException e) { + throw new IOException("Error reconstructing URI", e); + } + } + return result; + } + + /** + * Serialize the path to {@link DataInputView}. + * + * @param path the file path. + * @param out the data out put view. + * @throws IOException if an error happened. + */ + public static void serializeToDataOutputView(Path path, DataOutputView out) throws IOException { + URI uri = path.toUri(); + if (uri == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + StringUtils.writeNullableString(uri.getScheme(), out); + StringUtils.writeNullableString(uri.getUserInfo(), out); + StringUtils.writeNullableString(uri.getHost(), out); + out.writeInt(uri.getPort()); + StringUtils.writeNullableString(uri.getPath(), out); + StringUtils.writeNullableString(uri.getQuery(), out); + StringUtils.writeNullableString(uri.getFragment(), out); + } + } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/sink/TestManagedSinkCommittableSerializer.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/sink/TestManagedSinkCommittableSerializer.java index d3ed071b5c0..9c983eaa417 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/sink/TestManagedSinkCommittableSerializer.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/sink/TestManagedSinkCommittableSerializer.java @@ -113,7 +113,7 @@ public class TestManagedSinkCommittableSerializer private void serializePaths(DataOutputSerializer out, Set<Path> paths) throws IOException { out.writeInt(paths.size()); for (Path path : paths) { - path.write(out); + Path.serializeToDataOutputView(path, out); } } @@ -143,9 +143,8 @@ public class TestManagedSinkCommittableSerializer int size = in.readInt(); Set<Path> paths = new HashSet<>(size); for (int i = 0; i < size; i++) { - Path path = new Path(); - path.read(in); - paths.add(path); + Path result = Path.deserializeFromDataInputView(in); + paths.add(result == null ? new Path() : result); } return paths; } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedFileSourceSplitSerializer.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedFileSourceSplitSerializer.java index 644a6f21855..b5aae10d41e 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedFileSourceSplitSerializer.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/connector/source/TestManagedFileSourceSplitSerializer.java @@ -40,7 +40,7 @@ public class TestManagedFileSourceSplitSerializer public byte[] serialize(TestManagedIterableSourceSplit split) throws IOException { final DataOutputSerializer out = new DataOutputSerializer(64); out.writeUTF(split.splitId()); - split.getFilePath().write(out); + Path.serializeToDataOutputView(split.getFilePath(), out); final byte[] result = out.getCopyOfBuffer(); out.clear(); return result; @@ -52,8 +52,8 @@ public class TestManagedFileSourceSplitSerializer if (version == VERSION) { final DataInputDeserializer in = new DataInputDeserializer(serialized); final String id = in.readUTF(); - final Path path = new Path(); - path.read(in); + Path result = Path.deserializeFromDataInputView(in); + final Path path = result == null ? new Path() : result; return new TestManagedIterableSourceSplit(id, path); } throw new IOException(String.format("Unknown version %d", version));