changliiu commented on code in PR #35408:
URL: https://github.com/apache/beam/pull/35408#discussion_r2167341335


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -223,12 +228,224 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
       return Collections.singletonList(
           toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), 
resultSetMetadata));
     }
-    // In GoogleSQL, change stream records are returned as an array of structs.
+
+    // In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records 
are returned as Protos.
+    if (isProtoChangeRecord(resultSet)) {
+      return Arrays.asList(
+          fromProtoChangeStreamRecord(
+              partition, resultSetMetadata, 
resultSet.getProtoChangeStreamRecord(0)));
+    }
+
+    // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are 
returned as an array
+    // of structs.
     return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
         .flatMap(struct -> toChangeStreamRecord(partition, struct, 
resultSetMetadata))
         .collect(Collectors.toList());
   }
 
+  boolean isProtoChangeRecord(ChangeStreamResultSet currentRow) {

Review Comment:
   Done.
   I previously tend to keep the changeStreamResultSet a clean Decorator class 
but I agree that we can move this function isProtoChangeRecord there. So that 
we can simplify the code by only exposing isProtoChangeRecord.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to