dedocibula commented on code in PR #35408: URL: https://github.com/apache/beam/pull/35408#discussion_r2165239223
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java: ########## @@ -223,12 +228,224 @@ public List<ChangeStreamRecord> toChangeStreamRecords( return Collections.singletonList( toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata)); } - // In GoogleSQL, change stream records are returned as an array of structs. + + // In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records are returned as Protos. + if (isProtoChangeRecord(resultSet)) { + return Arrays.asList( + fromProtoChangeStreamRecord( + partition, resultSetMetadata, resultSet.getProtoChangeStreamRecord(0))); + } + + // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as an array + // of structs. return resultSet.getCurrentRowAsStruct().getStructList(0).stream() .flatMap(struct -> toChangeStreamRecord(partition, struct, resultSetMetadata)) .collect(Collectors.toList()); } + boolean isProtoChangeRecord(ChangeStreamResultSet currentRow) { Review Comment: Can we move this method to the result set instead? And also add precondition check for getProtoChangeStreamRecord there -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org