This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2798fb2dd3 [core] Fix the serialization of
JindoMultiPartUploadCommitter (#6613)
2798fb2dd3 is described below
commit 2798fb2dd363dc4b17ee07384994a4ab379342a7
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Nov 17 12:59:55 2025 +0800
[core] Fix the serialization of JindoMultiPartUploadCommitter (#6613)
---
.../paimon/fs/BaseMultiPartUploadCommitter.java | 8 +-
.../apache/paimon/jindo/JindoMultiPartUpload.java | 15 ++--
.../jindo/JindoMultiPartUploadCommitter.java | 8 +-
.../paimon/jindo/JindoTwoPhaseOutputStream.java | 6 +-
...tStream.java => SerializableJdoObjectPart.java} | 49 +++++++-----
.../jindo/TestJindoMultiPartUploadCommitter.java | 88 ++++++++++++++++++++++
6 files changed, 142 insertions(+), 32 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
index a8545bf0b4..ffe7214de4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.fs;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.rest.RESTTokenFileIO;
import org.slf4j.Logger;
@@ -32,7 +33,7 @@ public abstract class BaseMultiPartUploadCommitter<T, C>
implements TwoPhaseOutp
private static final Logger LOG =
LoggerFactory.getLogger(BaseMultiPartUploadCommitter.class);
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final String uploadId;
private final String objectName;
@@ -82,6 +83,11 @@ public abstract class BaseMultiPartUploadCommitter<T, C>
implements TwoPhaseOutp
return this.targetPath;
}
+ @VisibleForTesting
+ public List<T> uploadedParts() {
+ return uploadedParts;
+ }
+
@Override
public void clean(FileIO fileIO) throws IOException {}
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
index 6051d315ec..ff6ba49f75 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
@@ -35,7 +35,8 @@ import java.nio.channels.FileChannel;
import java.util.List;
/** Provides the multipart upload by Jindo. */
-public class JindoMultiPartUpload implements
MultiPartUploadStore<JdoObjectPart, String> {
+public class JindoMultiPartUpload
+ implements MultiPartUploadStore<SerializableJdoObjectPart, String> {
private final JindoMpuStore mpuStore;
private final Path workingDirectory;
@@ -64,12 +65,16 @@ public class JindoMultiPartUpload implements
MultiPartUploadStore<JdoObjectPart,
public String completeMultipartUpload(
String objectName,
String uploadId,
- List<JdoObjectPart> partETags,
+ List<SerializableJdoObjectPart> partETags,
long numBytesInParts) {
try {
JdoObjectPartList partList =
new
com.aliyun.jindodata.api.spec.protos.JdoObjectPartList();
- partList.setParts(partETags.toArray(new JdoObjectPart[0]));
+ JdoObjectPart[] jdoObjectParts = new
JdoObjectPart[partETags.size()];
+ for (int i = 0; i < partETags.size(); i++) {
+ jdoObjectParts[i] = partETags.get(i).toJdoObjectPart();
+ }
+ partList.setParts(jdoObjectParts);
mpuStore.commitMultiPartUpload(new Path(objectName), uploadId,
partList);
return uploadId;
} catch (Exception e) {
@@ -78,7 +83,7 @@ public class JindoMultiPartUpload implements
MultiPartUploadStore<JdoObjectPart,
}
@Override
- public JdoObjectPart uploadPart(
+ public SerializableJdoObjectPart uploadPart(
String objectName, String uploadId, int partNumber, File file, int
byteLength)
throws IOException {
try {
@@ -92,7 +97,7 @@ public class JindoMultiPartUpload implements
MultiPartUploadStore<JdoObjectPart,
JdoMpuUploadPartReply result =
mpuStore.uploadPart(new Path(objectName), uploadId,
partNumber, buffer);
- return result.getPartInfo();
+ return new SerializableJdoObjectPart(result.getPartInfo());
} catch (Exception e) {
throw new IOException("Failed to upload part " + partNumber + "
for: " + objectName, e);
}
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
index 64205e82b0..435a0bc49a 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
@@ -24,7 +24,6 @@ import org.apache.paimon.fs.MultiPartUploadStore;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.Pair;
-import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
import com.aliyun.jindodata.common.JindoHadoopSystem;
import java.io.IOException;
@@ -32,10 +31,11 @@ import java.util.List;
/** Jindo implementation of MultiPartUploadCommitter. */
public class JindoMultiPartUploadCommitter
- extends BaseMultiPartUploadCommitter<JdoObjectPart, String> {
+ extends BaseMultiPartUploadCommitter<SerializableJdoObjectPart,
String> {
+
public JindoMultiPartUploadCommitter(
String uploadId,
- List<JdoObjectPart> uploadedParts,
+ List<SerializableJdoObjectPart> uploadedParts,
String objectName,
long position,
Path targetPath) {
@@ -43,7 +43,7 @@ public class JindoMultiPartUploadCommitter
}
@Override
- protected MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore(
+ protected MultiPartUploadStore<SerializableJdoObjectPart, String>
multiPartUploadStore(
FileIO fileIO, Path targetPath) throws IOException {
JindoFileIO jindoFileIO = (JindoFileIO) fileIO;
org.apache.hadoop.fs.Path hadoopPath = jindoFileIO.path(targetPath);
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
index 9c6fb5df10..3bf5cde03c 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
@@ -22,16 +22,14 @@ import org.apache.paimon.fs.MultiPartUploadStore;
import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
import org.apache.paimon.fs.Path;
-import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
-
import java.io.IOException;
/** Jindo implementation of TwoPhaseOutputStream using multipart upload. */
public class JindoTwoPhaseOutputStream
- extends MultiPartUploadTwoPhaseOutputStream<JdoObjectPart, String> {
+ extends MultiPartUploadTwoPhaseOutputStream<SerializableJdoObjectPart,
String> {
public JindoTwoPhaseOutputStream(
- MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore,
+ MultiPartUploadStore<SerializableJdoObjectPart, String>
multiPartUploadStore,
org.apache.hadoop.fs.Path hadoopPath,
Path targetPath)
throws IOException {
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/SerializableJdoObjectPart.java
similarity index 51%
copy from
paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
copy to
paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/SerializableJdoObjectPart.java
index 9c6fb5df10..6f000ac7a4 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/SerializableJdoObjectPart.java
@@ -18,29 +18,42 @@
package org.apache.paimon.jindo;
-import org.apache.paimon.fs.MultiPartUploadStore;
-import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
-import org.apache.paimon.fs.Path;
-
import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
-import java.io.IOException;
+import java.io.Serializable;
+
+/** Serializable wrapper for JdoObjectPart. */
+public class SerializableJdoObjectPart implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final int partNum;
+ private final long size;
+ private final String eTag;
+
+ public SerializableJdoObjectPart(JdoObjectPart part) {
+ this.partNum = part.getPartNum();
+ this.size = part.getSize();
+ this.eTag = part.getETag();
+ }
+
+ public JdoObjectPart toJdoObjectPart() {
+ JdoObjectPart part = new JdoObjectPart();
+ part.setPartNum(this.partNum);
+ part.setSize(this.size);
+ part.setETag(this.eTag);
+ return part;
+ }
-/** Jindo implementation of TwoPhaseOutputStream using multipart upload. */
-public class JindoTwoPhaseOutputStream
- extends MultiPartUploadTwoPhaseOutputStream<JdoObjectPart, String> {
+ // Getters
+ public int getPartNum() {
+ return partNum;
+ }
- public JindoTwoPhaseOutputStream(
- MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore,
- org.apache.hadoop.fs.Path hadoopPath,
- Path targetPath)
- throws IOException {
- super(multiPartUploadStore, hadoopPath, targetPath);
+ public long getSize() {
+ return size;
}
- @Override
- public Committer committer() {
- return new JindoMultiPartUploadCommitter(
- uploadId, uploadedParts, objectName, position, targetPath);
+ public String getETag() {
+ return eTag;
}
}
diff --git
a/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoMultiPartUploadCommitter.java
b/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoMultiPartUploadCommitter.java
new file mode 100644
index 0000000000..9d9696cd7a
--- /dev/null
+++
b/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoMultiPartUploadCommitter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for JindoMultiPartUploadCommitter. */
+public class TestJindoMultiPartUploadCommitter {
+
+ @Test
+ public void testSerializableJdoObjectPart() throws Exception {
+ // Test the SerializableJdoObjectPart wrapper directly
+ JdoObjectPart originalPart = new JdoObjectPart();
+ originalPart.setPartNum(5);
+ originalPart.setSize(2048L);
+ originalPart.setETag("test-etag-2");
+
+ SerializableJdoObjectPart wrapper = new
SerializableJdoObjectPart(originalPart);
+
+ byte[] serialized = InstantiationUtil.serializeObject(wrapper);
+ SerializableJdoObjectPart deserialized =
+ InstantiationUtil.deserializeObject(serialized,
getClass().getClassLoader());
+
+ assertThat(deserialized).isNotNull();
+ assertThat(deserialized.getPartNum()).isEqualTo(5);
+ assertThat(deserialized.getSize()).isEqualTo(2048L);
+ assertThat(deserialized.getETag()).isEqualTo("test-etag-2");
+
+ JdoObjectPart reconstructedPart = deserialized.toJdoObjectPart();
+ assertThat(reconstructedPart.getPartNum()).isEqualTo(5);
+ assertThat(reconstructedPart.getSize()).isEqualTo(2048L);
+ assertThat(reconstructedPart.getETag()).isEqualTo("test-etag-2");
+ }
+
+ @Test
+ public void testJindoMultiPartUploadCommitterSerialization() throws
Exception {
+ String uploadId = "test-upload-id";
+ String objectName = "test-object-name";
+ long position = 1024L;
+ Path targetPath = new Path("/test/path/file.txt");
+ List<SerializableJdoObjectPart> uploadedParts = new ArrayList<>();
+ JdoObjectPart originalPart = new JdoObjectPart();
+ originalPart.setPartNum(5);
+ originalPart.setSize(2048L);
+ originalPart.setETag("test-etag-2");
+ uploadedParts.add(new SerializableJdoObjectPart(originalPart));
+
+ JindoMultiPartUploadCommitter committer =
+ new JindoMultiPartUploadCommitter(
+ uploadId, uploadedParts, objectName, position,
targetPath);
+
+ byte[] serialized = InstantiationUtil.serializeObject(committer);
+ JindoMultiPartUploadCommitter deserialized =
+ InstantiationUtil.deserializeObject(serialized,
getClass().getClassLoader());
+
+ // Verify the deserialized object
+ assertThat(deserialized).isNotNull();
+ SerializableJdoObjectPart deserializedPart =
deserialized.uploadedParts().get(0);
+ assertThat(deserializedPart.getPartNum()).isEqualTo(5);
+ assertThat(deserializedPart.getSize()).isEqualTo(2048L);
+ assertThat(deserializedPart.getETag()).isEqualTo("test-etag-2");
+ }
+}