yzeng1618 commented on code in PR #10222:
URL: https://github.com/apache/seatunnel/pull/10222#discussion_r2638468892


##########
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeOutputFormat.java:
##########
@@ -55,22 +60,15 @@ public MaxcomputeOutputFormat(SeaTunnelRowType rowType, 
ReadonlyConfig readonlyC
     }
 
     public void write(SeaTunnelRow seaTunnelRow) throws IOException, 
TunnelException {
-        ensureUpsertSessionAndWriter();
-        Record newRecord =
-                MaxcomputeTypeMapper.getMaxcomputeRowData(
-                        upsertSession.newRecord(),
-                        seaTunnelRow,
-                        this.tableSchema,
-                        this.rowType,
-                        formatterContext);
-
         switch (seaTunnelRow.getRowKind()) {
             case INSERT:
+                insertRecord(seaTunnelRow);
+                break;
             case UPDATE_AFTER:

Review Comment:
   For UPDATE/DELETE operations on tables without a primary key, will an error 
still be thrown because the UpsertSession requires a primary key? It would be 
better to either clarify this in the documentation or throw an explicit 
exception.



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-maxcompute-e2e/src/test/java/org/apache/seatunnel/e2e/connector/maxcompute/MaxComputeIT.java:
##########
@@ -216,6 +227,34 @@ public void testMaxCompute(TestContainer container)
     }
 
     @TestTemplate
+    @Disabled(
+            "maxcompute-emulator does not support upload session for now. 
MaxcomputeWriter uses upload session to insert data.")

Review Comment:
   The key new capability (UploadSession insertion) is marked with @Disabled in 
the CI environment. Could we add a unit test to verify that the basic 
invocation process runs without errors?



##########
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeOutputFormat.java:
##########
@@ -55,22 +60,15 @@ public MaxcomputeOutputFormat(SeaTunnelRowType rowType, 
ReadonlyConfig readonlyC
     }
 
     public void write(SeaTunnelRow seaTunnelRow) throws IOException, 
TunnelException {
-        ensureUpsertSessionAndWriter();
-        Record newRecord =
-                MaxcomputeTypeMapper.getMaxcomputeRowData(
-                        upsertSession.newRecord(),
-                        seaTunnelRow,
-                        this.tableSchema,
-                        this.rowType,
-                        formatterContext);
-
         switch (seaTunnelRow.getRowKind()) {
             case INSERT:
+                insertRecord(seaTunnelRow);
+                break;
             case UPDATE_AFTER:
-                upsertStream.upsert(newRecord);
+                upsertRecord(seaTunnelRow);
                 break;
             case DELETE:

Review Comment:
   ditto
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to