changliiu commented on code in PR #35408: URL: https://github.com/apache/beam/pull/35408#discussion_r2167345517
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java: ########## @@ -223,12 +228,224 @@ public List<ChangeStreamRecord> toChangeStreamRecords( return Collections.singletonList( toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata)); } - // In GoogleSQL, change stream records are returned as an array of structs. + + // In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos. + if (isProtoChangeRecord(resultSet)) { + return Arrays.asList( + fromProtoChangeStreamRecord( Review Comment: 1. Changed the name to toChangeStreamRecord. SGTM to keep consistent. 2. I lean against using the same pattern - flatMap.collect . Because for V1 change stream, each row is a struct containing arrays of all three type or change records, so the flatMap.collect makes sense. However for v2, each row only contain one single proto. So the single function call is clean and enough. Please re-open if you have different opinion then we can discuss offline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org