Abacn commented on code in PR #34763:
URL: https://github.com/apache/beam/pull/34763#discussion_r2136710448


##########
CHANGES.md:
##########
@@ -184,6 +184,7 @@ N/A
 * [Python] Reshuffle now correctly respects user-specified type hints, fixing 
a previous bug where it might use FastPrimitivesCoder wrongly. This change 
could break pipelines with incorrect type hints in Reshuffle. If you have 
issues after upgrading, temporarily set update_compatibility_version to a 
previous Beam version to use the old behavior. The recommended solution is to 
fix the type hints in your code. 
([#33932](https://github.com/apache/beam/pull/33932))
 * [Java] SparkReceiver 2 has been moved to SparkReceiver 3 that supports Spark 
3.x. ([#33574](https://github.com/apache/beam/pull/33574))
 * [Python] Correct parsing of `collections.abc.Sequence` type hints was added, 
which can lead to pipelines failing type hint checks that were previously 
passing erroneously. These issues will be most commonly seen trying to consume 
a PCollection with a `Sequence` type hint after a GroupByKey or a CoGroupByKey. 
([#33999](https://github.com/apache/beam/pull/33999)).
+* Debezium IO (Java) has been upgraded from depending on version 1.3.1.Final 
of io.debezium to 3.1.1.Final. This may cause some breaking changes since the 
libraries do not maintain full compatibility 
([#33526](https://github.com/apache/beam/issues/33526)).

Review Comment:
   Remove this. It is 2.64.0 Section here



##########
CHANGES.md:
##########
@@ -31,7 +31,7 @@
 
 ## I/Os
 
-* Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* Debezium IO upgraded to 3.1.1 requires Java 17 (Java) 
([#34747](https://github.com/apache/beam/issues/34747)).

Review Comment:
   Please move it under 2.66.0 Section. Here it is template.



##########
sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java:
##########
@@ -104,6 +105,11 @@ private PTransform<PCollectionRowTuple, 
PCollectionRowTuple> makePtransform(
                 // is "database.table".
                 .setTable("inventory.customers")
                 .setPort(port)
+                .setDebeziumConnectionProperties(
+                    Lists.newArrayList(
+                        "database.server.id=579676",
+                        
"schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory",
+                        
"schema.history.internal.file.filename=data/schema_history.dat"))

Review Comment:
   please check https://github.com/apache/beam/pull/34763/files#r2068902221.
   
   Consider use 'data/schema_history.dat' ?



##########
sdks/java/io/debezium/build.gradle:
##########
@@ -90,13 +139,25 @@ task integrationTest(type: Test, dependsOn: 
processTestResources) {
   }
 }
 
-configurations.all (Configuration it) -> {
-  resolutionStrategy {
-    // Force protobuf 3 because debezium is currently incompatible with 
protobuf 4.
-    // TODO - remove this and upgrade the version of debezium once a proto-4 
compatible version is available
-    // https://github.com/apache/beam/pull/33526 does some of this, but was 
abandoned because it still doesn't
-    // work with protobuf 4.
-    force "com.google.protobuf:protobuf-java:3.25.5"
-    force "com.google.protobuf:protobuf-java-util:3.25.5"
-  }
+// The order is intended here - Debezium 3 requires Java 17, or later

Review Comment:
   We don't need these after `requireJavaVersion = JavaVersion.VERSION_17` 
added above



##########
sdks/java/io/debezium/build.gradle:
##########
@@ -38,40 +50,77 @@ dependencies {
     implementation library.java.joda_time
     provided library.java.jackson_dataformat_csv
     permitUnusedDeclared library.java.jackson_dataformat_csv
-    testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
-    testImplementation project(path: ":sdks:java:io:common")
+
+    // Kafka connect dependencies
+    implementation "org.apache.kafka:connect-api:3.9.0"
+    implementation "org.apache.kafka:connect-json:3.9.0"
+    permitUnusedDeclared "org.apache.kafka:connect-json:3.9.0"
+
+    // Debezium dependencies
+    implementation group: 'io.debezium', name: 'debezium-core', version: 
'3.1.1.Final'
 
     // Test dependencies
+    testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
+    testImplementation project(path: ":sdks:java:io:common")
     testImplementation library.java.junit
     testImplementation project(path: ":sdks:java:io:jdbc")
     testRuntimeOnly library.java.slf4j_jdk14
     testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
     testImplementation project(":runners:google-cloud-dataflow-java")
     testImplementation library.java.hamcrest
     testImplementation library.java.testcontainers_base
-    testImplementation library.java.testcontainers_mysql
-    testImplementation library.java.testcontainers_postgresql
-    // TODO(https://github.com/apache/beam/issues/31678) HikariCP 5.x requires 
Java11+
-    testImplementation 'com.zaxxer:HikariCP:4.0.3'
+    testImplementation "org.testcontainers:kafka"
+    testImplementation "org.testcontainers:mysql"
+    testImplementation "org.testcontainers:postgresql"
+    testImplementation 
"io.debezium:debezium-testing-testcontainers:3.1.1.Final"
+    testImplementation 'com.zaxxer:HikariCP:5.1.0'
 
-    // Kafka connect dependencies
-    implementation "org.apache.kafka:connect-api:2.5.0"
-    implementation "org.apache.kafka:connect-json:2.5.0"
-    permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761
+    // Debezium connector implementations for testing
+    testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', 
version: '3.1.1.Final'
+    testImplementation group: 'io.debezium', name: 
'debezium-connector-postgres', version: '3.1.1.Final'
+}
 
-    // Debezium dependencies
-    implementation group: 'io.debezium', name: 'debezium-core', version: 
'1.3.1.Final'
-    testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', 
version: '1.3.1.Final'
-    testImplementation group: 'io.debezium', name: 
'debezium-connector-postgres', version: '1.3.1.Final'
+
+// Force Jackson versions for the test runtime classpath

Review Comment:
   add a TODO comment: remove pin after upgrading Beam's Jackson version



##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -677,8 +677,8 @@ class BeamModulePlugin implements Plugin<Project> {
         activemq_junit                              : 
"org.apache.activemq.tooling:activemq-junit:$activemq_version",
         activemq_kahadb_store                       : 
"org.apache.activemq:activemq-kahadb-store:$activemq_version",
         activemq_mqtt                               : 
"org.apache.activemq:activemq-mqtt:$activemq_version",
-        antlr                                       : "org.antlr:antlr4:4.7",
-        antlr_runtime                               : 
"org.antlr:antlr4-runtime:4.7",
+        antlr                                       : "org.antlr:antlr4:4.10",
+        antlr_runtime                               : 
"org.antlr:antlr4-runtime:4.10",

Review Comment:
   antlr is only used by Beam SDK core and is shaded. We should be able to 
unpin antlr. Let me open a PR



##########
sdks/java/io/debezium/build.gradle:
##########
@@ -38,40 +50,77 @@ dependencies {
     implementation library.java.joda_time
     provided library.java.jackson_dataformat_csv
     permitUnusedDeclared library.java.jackson_dataformat_csv
-    testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
-    testImplementation project(path: ":sdks:java:io:common")
+
+    // Kafka connect dependencies
+    implementation "org.apache.kafka:connect-api:3.9.0"
+    implementation "org.apache.kafka:connect-json:3.9.0"
+    permitUnusedDeclared "org.apache.kafka:connect-json:3.9.0"

Review Comment:
   why need to declare this dependency then permitUnusedDeclared?



##########
sdks/java/io/debezium/build.gradle:
##########
@@ -18,6 +18,18 @@
 import groovy.json.JsonOutput
 
 plugins { id 'org.apache.beam.module' }
+

Review Comment:
   please rebase (and possibly squash commits) onto the latest master and 
replace these chunk to
   
   ```
   applyJavaNature(
   ...requireJavaVersion = JavaVersion.VERSION_17
   ```



##########
sdks/java/io/debezium/expansion-service/build.gradle:
##########
@@ -20,6 +20,17 @@ apply plugin: 'org.apache.beam.module'
 apply plugin: 'application'
 mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"
 
+// The order is intended here - Debezium 3 requires Java 17, or later

Review Comment:
   same, use `requireJavaVersion = JavaVersion.VERSION_17` and remove the 
version logics here and at the bottom of the file



##########
sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java:
##########
@@ -98,19 +122,22 @@ private void monitorEssentialMetrics() {
           rs.close();
           Thread.sleep(4000);
         } else {
-          throw new IllegalArgumentException("OIOI");
+          throw new IllegalArgumentException(
+              "Illegal Argument Exception in monitorEssentialMetrics.");
         }
       }
-    } catch (InterruptedException | SQLException ex) {
-      throw new IllegalArgumentException("Oi", ex);
+    } catch (SQLException ex) {
+      LOG.error("SQL error in monitoring thread. Shutting down.", ex);

Review Comment:
   The exception is suppressed here. Is it intended? The test will pass even 
with exception here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to