This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2798fb2dd3 [core] Fix the serialization of 
JindoMultiPartUploadCommitter (#6613)
2798fb2dd3 is described below

commit 2798fb2dd363dc4b17ee07384994a4ab379342a7
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Nov 17 12:59:55 2025 +0800

    [core] Fix the serialization of JindoMultiPartUploadCommitter (#6613)
---
 .../paimon/fs/BaseMultiPartUploadCommitter.java    |  8 +-
 .../apache/paimon/jindo/JindoMultiPartUpload.java  | 15 ++--
 .../jindo/JindoMultiPartUploadCommitter.java       |  8 +-
 .../paimon/jindo/JindoTwoPhaseOutputStream.java    |  6 +-
 ...tStream.java => SerializableJdoObjectPart.java} | 49 +++++++-----
 .../jindo/TestJindoMultiPartUploadCommitter.java   | 88 ++++++++++++++++++++++
 6 files changed, 142 insertions(+), 32 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
index a8545bf0b4..ffe7214de4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/BaseMultiPartUploadCommitter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.fs;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.rest.RESTTokenFileIO;
 
 import org.slf4j.Logger;
@@ -32,7 +33,7 @@ public abstract class BaseMultiPartUploadCommitter<T, C> 
implements TwoPhaseOutp
 
     private static final Logger LOG = 
LoggerFactory.getLogger(BaseMultiPartUploadCommitter.class);
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private final String uploadId;
     private final String objectName;
@@ -82,6 +83,11 @@ public abstract class BaseMultiPartUploadCommitter<T, C> 
implements TwoPhaseOutp
         return this.targetPath;
     }
 
+    @VisibleForTesting
+    public List<T> uploadedParts() {
+        return uploadedParts;
+    }
+
     @Override
     public void clean(FileIO fileIO) throws IOException {}
 
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
index 6051d315ec..ff6ba49f75 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUpload.java
@@ -35,7 +35,8 @@ import java.nio.channels.FileChannel;
 import java.util.List;
 
 /** Provides the multipart upload by Jindo. */
-public class JindoMultiPartUpload implements 
MultiPartUploadStore<JdoObjectPart, String> {
+public class JindoMultiPartUpload
+        implements MultiPartUploadStore<SerializableJdoObjectPart, String> {
 
     private final JindoMpuStore mpuStore;
     private final Path workingDirectory;
@@ -64,12 +65,16 @@ public class JindoMultiPartUpload implements 
MultiPartUploadStore<JdoObjectPart,
     public String completeMultipartUpload(
             String objectName,
             String uploadId,
-            List<JdoObjectPart> partETags,
+            List<SerializableJdoObjectPart> partETags,
             long numBytesInParts) {
         try {
             JdoObjectPartList partList =
                     new 
com.aliyun.jindodata.api.spec.protos.JdoObjectPartList();
-            partList.setParts(partETags.toArray(new JdoObjectPart[0]));
+            JdoObjectPart[] jdoObjectParts = new 
JdoObjectPart[partETags.size()];
+            for (int i = 0; i < partETags.size(); i++) {
+                jdoObjectParts[i] = partETags.get(i).toJdoObjectPart();
+            }
+            partList.setParts(jdoObjectParts);
             mpuStore.commitMultiPartUpload(new Path(objectName), uploadId, 
partList);
             return uploadId;
         } catch (Exception e) {
@@ -78,7 +83,7 @@ public class JindoMultiPartUpload implements 
MultiPartUploadStore<JdoObjectPart,
     }
 
     @Override
-    public JdoObjectPart uploadPart(
+    public SerializableJdoObjectPart uploadPart(
             String objectName, String uploadId, int partNumber, File file, int 
byteLength)
             throws IOException {
         try {
@@ -92,7 +97,7 @@ public class JindoMultiPartUpload implements 
MultiPartUploadStore<JdoObjectPart,
 
             JdoMpuUploadPartReply result =
                     mpuStore.uploadPart(new Path(objectName), uploadId, 
partNumber, buffer);
-            return result.getPartInfo();
+            return new SerializableJdoObjectPart(result.getPartInfo());
         } catch (Exception e) {
             throw new IOException("Failed to upload part " + partNumber + " 
for: " + objectName, e);
         }
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
index 64205e82b0..435a0bc49a 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java
@@ -24,7 +24,6 @@ import org.apache.paimon.fs.MultiPartUploadStore;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.utils.Pair;
 
-import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
 import com.aliyun.jindodata.common.JindoHadoopSystem;
 
 import java.io.IOException;
@@ -32,10 +31,11 @@ import java.util.List;
 
 /** Jindo implementation of MultiPartUploadCommitter. */
 public class JindoMultiPartUploadCommitter
-        extends BaseMultiPartUploadCommitter<JdoObjectPart, String> {
+        extends BaseMultiPartUploadCommitter<SerializableJdoObjectPart, 
String> {
+
     public JindoMultiPartUploadCommitter(
             String uploadId,
-            List<JdoObjectPart> uploadedParts,
+            List<SerializableJdoObjectPart> uploadedParts,
             String objectName,
             long position,
             Path targetPath) {
@@ -43,7 +43,7 @@ public class JindoMultiPartUploadCommitter
     }
 
     @Override
-    protected MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore(
+    protected MultiPartUploadStore<SerializableJdoObjectPart, String> 
multiPartUploadStore(
             FileIO fileIO, Path targetPath) throws IOException {
         JindoFileIO jindoFileIO = (JindoFileIO) fileIO;
         org.apache.hadoop.fs.Path hadoopPath = jindoFileIO.path(targetPath);
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
index 9c6fb5df10..3bf5cde03c 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
@@ -22,16 +22,14 @@ import org.apache.paimon.fs.MultiPartUploadStore;
 import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
 import org.apache.paimon.fs.Path;
 
-import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
-
 import java.io.IOException;
 
 /** Jindo implementation of TwoPhaseOutputStream using multipart upload. */
 public class JindoTwoPhaseOutputStream
-        extends MultiPartUploadTwoPhaseOutputStream<JdoObjectPart, String> {
+        extends MultiPartUploadTwoPhaseOutputStream<SerializableJdoObjectPart, 
String> {
 
     public JindoTwoPhaseOutputStream(
-            MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore,
+            MultiPartUploadStore<SerializableJdoObjectPart, String> 
multiPartUploadStore,
             org.apache.hadoop.fs.Path hadoopPath,
             Path targetPath)
             throws IOException {
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/SerializableJdoObjectPart.java
similarity index 51%
copy from 
paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
copy to 
paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/SerializableJdoObjectPart.java
index 9c6fb5df10..6f000ac7a4 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/SerializableJdoObjectPart.java
@@ -18,29 +18,42 @@
 
 package org.apache.paimon.jindo;
 
-import org.apache.paimon.fs.MultiPartUploadStore;
-import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
-import org.apache.paimon.fs.Path;
-
 import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
 
-import java.io.IOException;
+import java.io.Serializable;
+
+/** Serializable wrapper for JdoObjectPart. */
+public class SerializableJdoObjectPart implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final int partNum;
+    private final long size;
+    private final String eTag;
+
+    public SerializableJdoObjectPart(JdoObjectPart part) {
+        this.partNum = part.getPartNum();
+        this.size = part.getSize();
+        this.eTag = part.getETag();
+    }
+
+    public JdoObjectPart toJdoObjectPart() {
+        JdoObjectPart part = new JdoObjectPart();
+        part.setPartNum(this.partNum);
+        part.setSize(this.size);
+        part.setETag(this.eTag);
+        return part;
+    }
 
-/** Jindo implementation of TwoPhaseOutputStream using multipart upload. */
-public class JindoTwoPhaseOutputStream
-        extends MultiPartUploadTwoPhaseOutputStream<JdoObjectPart, String> {
+    // Getters
+    public int getPartNum() {
+        return partNum;
+    }
 
-    public JindoTwoPhaseOutputStream(
-            MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore,
-            org.apache.hadoop.fs.Path hadoopPath,
-            Path targetPath)
-            throws IOException {
-        super(multiPartUploadStore, hadoopPath, targetPath);
+    public long getSize() {
+        return size;
     }
 
-    @Override
-    public Committer committer() {
-        return new JindoMultiPartUploadCommitter(
-                uploadId, uploadedParts, objectName, position, targetPath);
+    public String getETag() {
+        return eTag;
     }
 }
diff --git 
a/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoMultiPartUploadCommitter.java
 
b/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoMultiPartUploadCommitter.java
new file mode 100644
index 0000000000..9d9696cd7a
--- /dev/null
+++ 
b/paimon-filesystems/paimon-jindo/src/test/java/org/apache/paimon/jindo/TestJindoMultiPartUploadCommitter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for JindoMultiPartUploadCommitter. */
+public class TestJindoMultiPartUploadCommitter {
+
+    @Test
+    public void testSerializableJdoObjectPart() throws Exception {
+        // Test the SerializableJdoObjectPart wrapper directly
+        JdoObjectPart originalPart = new JdoObjectPart();
+        originalPart.setPartNum(5);
+        originalPart.setSize(2048L);
+        originalPart.setETag("test-etag-2");
+
+        SerializableJdoObjectPart wrapper = new 
SerializableJdoObjectPart(originalPart);
+
+        byte[] serialized = InstantiationUtil.serializeObject(wrapper);
+        SerializableJdoObjectPart deserialized =
+                InstantiationUtil.deserializeObject(serialized, 
getClass().getClassLoader());
+
+        assertThat(deserialized).isNotNull();
+        assertThat(deserialized.getPartNum()).isEqualTo(5);
+        assertThat(deserialized.getSize()).isEqualTo(2048L);
+        assertThat(deserialized.getETag()).isEqualTo("test-etag-2");
+
+        JdoObjectPart reconstructedPart = deserialized.toJdoObjectPart();
+        assertThat(reconstructedPart.getPartNum()).isEqualTo(5);
+        assertThat(reconstructedPart.getSize()).isEqualTo(2048L);
+        assertThat(reconstructedPart.getETag()).isEqualTo("test-etag-2");
+    }
+
+    @Test
+    public void testJindoMultiPartUploadCommitterSerialization() throws 
Exception {
+        String uploadId = "test-upload-id";
+        String objectName = "test-object-name";
+        long position = 1024L;
+        Path targetPath = new Path("/test/path/file.txt");
+        List<SerializableJdoObjectPart> uploadedParts = new ArrayList<>();
+        JdoObjectPart originalPart = new JdoObjectPart();
+        originalPart.setPartNum(5);
+        originalPart.setSize(2048L);
+        originalPart.setETag("test-etag-2");
+        uploadedParts.add(new SerializableJdoObjectPart(originalPart));
+
+        JindoMultiPartUploadCommitter committer =
+                new JindoMultiPartUploadCommitter(
+                        uploadId, uploadedParts, objectName, position, 
targetPath);
+
+        byte[] serialized = InstantiationUtil.serializeObject(committer);
+        JindoMultiPartUploadCommitter deserialized =
+                InstantiationUtil.deserializeObject(serialized, 
getClass().getClassLoader());
+
+        // Verify the deserialized object
+        assertThat(deserialized).isNotNull();
+        SerializableJdoObjectPart deserializedPart = 
deserialized.uploadedParts().get(0);
+        assertThat(deserializedPart.getPartNum()).isEqualTo(5);
+        assertThat(deserializedPart.getSize()).isEqualTo(2048L);
+        assertThat(deserializedPart.getETag()).isEqualTo("test-etag-2");
+    }
+}

Reply via email to