chenxuesdu commented on code in PR #36667:
URL: https://github.com/apache/beam/pull/36667#discussion_r2632433097
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java:
##########
@@ -138,4 +166,76 @@ public ChangeStreamResultSet changeStreamQuery(
private boolean isPostgres() {
return this.dialect == Dialect.POSTGRESQL;
}
+
+ // Returns the PartitionMode, fetching from Spanner on first call and
caching.
+ protected PartitionMode getPartitionMode() {
+ if (this.partitionMode != PartitionMode.UNKNOWN) {
+ return this.partitionMode;
+ }
+ synchronized (this) {
+ if (this.partitionMode == PartitionMode.UNKNOWN) {
+ String fetchedPartitionMode =
+ fetchPartitionMode(this.databaseClient, this.dialect,
this.changeStreamName);
+ if (fetchedPartitionMode.isEmpty()
+ || fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
+ this.partitionMode = PartitionMode.IMMUTABLE_KEY_RANGE;
+ } else {
+ this.partitionMode = PartitionMode.MUTABLE_KEY_RANGE;
+ }
+ }
+ }
+ return this.partitionMode;
+ }
+
+ // Convenience boolean method kept for compatibility
+ protected boolean isMutableKeyRangeChangeStream() {
+ return getPartitionMode() == PartitionMode.MUTABLE_KEY_RANGE;
+ }
+
+ // Returns the partition_mode option value for the given change stream.
+ private static String fetchPartitionMode(
+ DatabaseClient databaseClient, Dialect dialect, String changeStreamName)
{
+ try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
+ Statement statement;
+ if (dialect == Dialect.POSTGRESQL) {
+ statement =
+ Statement.newBuilder(
+ "select option_name, option_value\n"
+ + "from information_schema.change_stream_options\n"
+ + "where change_stream_name = $1")
Review Comment:
Updated.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java:
##########
@@ -138,4 +166,76 @@ public ChangeStreamResultSet changeStreamQuery(
private boolean isPostgres() {
return this.dialect == Dialect.POSTGRESQL;
}
+
+ // Returns the PartitionMode, fetching from Spanner on first call and
caching.
+ protected PartitionMode getPartitionMode() {
+ if (this.partitionMode != PartitionMode.UNKNOWN) {
+ return this.partitionMode;
+ }
+ synchronized (this) {
+ if (this.partitionMode == PartitionMode.UNKNOWN) {
+ String fetchedPartitionMode =
+ fetchPartitionMode(this.databaseClient, this.dialect,
this.changeStreamName);
+ if (fetchedPartitionMode.isEmpty()
+ || fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
+ this.partitionMode = PartitionMode.IMMUTABLE_KEY_RANGE;
+ } else {
+ this.partitionMode = PartitionMode.MUTABLE_KEY_RANGE;
+ }
+ }
+ }
+ return this.partitionMode;
+ }
+
+ // Convenience boolean method kept for compatibility
+ protected boolean isMutableKeyRangeChangeStream() {
+ return getPartitionMode() == PartitionMode.MUTABLE_KEY_RANGE;
+ }
+
+ // Returns the partition_mode option value for the given change stream.
+ private static String fetchPartitionMode(
+ DatabaseClient databaseClient, Dialect dialect, String changeStreamName)
{
+ try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
+ Statement statement;
+ if (dialect == Dialect.POSTGRESQL) {
+ statement =
+ Statement.newBuilder(
+ "select option_name, option_value\n"
+ + "from information_schema.change_stream_options\n"
+ + "where change_stream_name = $1")
+ .bind("p1")
+ .to(changeStreamName)
+ .build();
+ } else {
+ statement =
+ Statement.newBuilder(
+ "select option_name, option_value\n"
+ + "from information_schema.change_stream_options\n"
+ + "where change_stream_name = @changeStreamName")
Review Comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]