dedocibula commented on code in PR #35408: URL: https://github.com/apache/beam/pull/35408#discussion_r2165241824
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java: ########## @@ -223,12 +228,224 @@ public List<ChangeStreamRecord> toChangeStreamRecords( return Collections.singletonList( toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata)); } - // In GoogleSQL, change stream records are returned as an array of structs. + + // In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos. + if (isProtoChangeRecord(resultSet)) { + return Arrays.asList( + fromProtoChangeStreamRecord( Review Comment: I think we should try and follow the flatMap.collect declarative mode of parsing here. Also let's keep the method name as toChangeStreamRecord just accepting proto instead of struct as the second argument -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org