This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f973906 fix: fix bug when retrieving either string or json new a354889 Merge pull request #16890 from [BEAM-12164]: fix bug when retrieving either string or json f973906 is described below commit f9739064deedda4fd5ff2b283404d15e87a911fd Author: Thiago Nunes <thiagotnu...@google.com> AuthorDate: Fri Feb 18 19:03:17 2022 +1100 fix: fix bug when retrieving either string or json struct.getValue() throws an error when getting a struct that contains a json inside. We circumvent this, by checking the type and calling either struct.getString() or struct.getJson(). --- .../mapper/ChangeStreamRecordMapper.java | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java index 4368ba7..e2bae67 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper; import com.google.cloud.Timestamp; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.Type; -import com.google.cloud.spanner.Value; import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; @@ -280,7 +279,7 @@ public class ChangeStreamRecordMapper { private ColumnType columnTypeFrom(Struct struct) { // TODO: Move to type struct.getJson when backend is fully migrated - final String type = getJsonString(struct.getValue(TYPE_COLUMN)); + final String type = getJsonString(struct, TYPE_COLUMN); return new ColumnType( struct.getString(NAME_COLUMN), new TypeCode(type), @@ -290,13 +289,13 @@ public class ChangeStreamRecordMapper { private Mod modFrom(Struct struct) { // TODO: Move to keys struct.getJson when backend is fully migrated - final String keys = getJsonString(struct.getValue(KEYS_COLUMN)); + final String keys = getJsonString(struct, KEYS_COLUMN); // TODO: Move to oldValues struct.getJson when backend is fully migrated final String oldValues = - struct.isNull(OLD_VALUES_COLUMN) ? null : getJsonString(struct.getValue(OLD_VALUES_COLUMN)); + struct.isNull(OLD_VALUES_COLUMN) ? null : getJsonString(struct, OLD_VALUES_COLUMN); // TODO: Move to newValues struct.getJson when backend is fully migrated final String newValues = - struct.isNull(NEW_VALUES_COLUMN) ? null : getJsonString(struct.getValue(NEW_VALUES_COLUMN)); + struct.isNull(NEW_VALUES_COLUMN) ? null : getJsonString(struct, NEW_VALUES_COLUMN); return new Mod(keys, oldValues, newValues); } @@ -331,13 +330,13 @@ public class ChangeStreamRecordMapper { } // TODO: Remove when backend is fully migrated to JSON - private String getJsonString(Value value) { - if (value.getType().equals(Type.json())) { - return value.getJson(); - } else if (value.getType().equals(Type.string())) { - return value.getString(); + private String getJsonString(Struct struct, String columnName) { + if (struct.getColumnType(columnName).equals(Type.json())) { + return struct.getJson(columnName); + } else if (struct.getColumnType(columnName).equals(Type.string())) { + return struct.getString(columnName); } else { - throw new IllegalArgumentException("Can not extract string from value " + value); + throw new IllegalArgumentException("Can not extract string from value " + columnName); } } }