Abacn commented on code in PR #34763: URL: https://github.com/apache/beam/pull/34763#discussion_r2136710448
########## CHANGES.md: ########## @@ -184,6 +184,7 @@ N/A * [Python] Reshuffle now correctly respects user-specified type hints, fixing a previous bug where it might use FastPrimitivesCoder wrongly. This change could break pipelines with incorrect type hints in Reshuffle. If you have issues after upgrading, temporarily set update_compatibility_version to a previous Beam version to use the old behavior. The recommended solution is to fix the type hints in your code. ([#33932](https://github.com/apache/beam/pull/33932)) * [Java] SparkReceiver 2 has been moved to SparkReceiver 3 that supports Spark 3.x. ([#33574](https://github.com/apache/beam/pull/33574)) * [Python] Correct parsing of `collections.abc.Sequence` type hints was added, which can lead to pipelines failing type hint checks that were previously passing erroneously. These issues will be most commonly seen trying to consume a PCollection with a `Sequence` type hint after a GroupByKey or a CoGroupByKey. ([#33999](https://github.com/apache/beam/pull/33999)). +* Debezium IO (Java) has been upgraded from depending on version 1.3.1.Final of io.debezium to 3.1.1.Final. This may cause some breaking changes since the libraries do not maintain full compatibility ([#33526](https://github.com/apache/beam/issues/33526)). Review Comment: Remove this. It is 2.64.0 Section here ########## CHANGES.md: ########## @@ -31,7 +31,7 @@ ## I/Os -* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Debezium IO upgraded to 3.1.1 requires Java 17 (Java) ([#34747](https://github.com/apache/beam/issues/34747)). Review Comment: Please move it under 2.66.0 Section. Here it is template. ########## sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java: ########## @@ -104,6 +105,11 @@ private PTransform<PCollectionRowTuple, PCollectionRowTuple> makePtransform( // is "database.table". .setTable("inventory.customers") .setPort(port) + .setDebeziumConnectionProperties( + Lists.newArrayList( + "database.server.id=579676", + "schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory", + "schema.history.internal.file.filename=data/schema_history.dat")) Review Comment: please check https://github.com/apache/beam/pull/34763/files#r2068902221. Consider use 'data/schema_history.dat' ? ########## sdks/java/io/debezium/build.gradle: ########## @@ -90,13 +139,25 @@ task integrationTest(type: Test, dependsOn: processTestResources) { } } -configurations.all (Configuration it) -> { - resolutionStrategy { - // Force protobuf 3 because debezium is currently incompatible with protobuf 4. - // TODO - remove this and upgrade the version of debezium once a proto-4 compatible version is available - // https://github.com/apache/beam/pull/33526 does some of this, but was abandoned because it still doesn't - // work with protobuf 4. - force "com.google.protobuf:protobuf-java:3.25.5" - force "com.google.protobuf:protobuf-java-util:3.25.5" - } +// The order is intended here - Debezium 3 requires Java 17, or later Review Comment: We don't need these after `requireJavaVersion = JavaVersion.VERSION_17` added above ########## sdks/java/io/debezium/build.gradle: ########## @@ -38,40 +50,77 @@ dependencies { implementation library.java.joda_time provided library.java.jackson_dataformat_csv permitUnusedDeclared library.java.jackson_dataformat_csv - testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") - testImplementation project(path: ":sdks:java:io:common") + + // Kafka connect dependencies + implementation "org.apache.kafka:connect-api:3.9.0" + implementation "org.apache.kafka:connect-json:3.9.0" + permitUnusedDeclared "org.apache.kafka:connect-json:3.9.0" + + // Debezium dependencies + implementation group: 'io.debezium', name: 'debezium-core', version: '3.1.1.Final' // Test dependencies + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation project(path: ":sdks:java:io:common") testImplementation library.java.junit testImplementation project(path: ":sdks:java:io:jdbc") testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testImplementation project(":runners:google-cloud-dataflow-java") testImplementation library.java.hamcrest testImplementation library.java.testcontainers_base - testImplementation library.java.testcontainers_mysql - testImplementation library.java.testcontainers_postgresql - // TODO(https://github.com/apache/beam/issues/31678) HikariCP 5.x requires Java11+ - testImplementation 'com.zaxxer:HikariCP:4.0.3' + testImplementation "org.testcontainers:kafka" + testImplementation "org.testcontainers:mysql" + testImplementation "org.testcontainers:postgresql" + testImplementation "io.debezium:debezium-testing-testcontainers:3.1.1.Final" + testImplementation 'com.zaxxer:HikariCP:5.1.0' - // Kafka connect dependencies - implementation "org.apache.kafka:connect-api:2.5.0" - implementation "org.apache.kafka:connect-json:2.5.0" - permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761 + // Debezium connector implementations for testing + testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '3.1.1.Final' + testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '3.1.1.Final' +} - // Debezium dependencies - implementation group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final' - testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.1.Final' - testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '1.3.1.Final' + +// Force Jackson versions for the test runtime classpath Review Comment: add a TODO comment: remove pin after upgrading Beam's Jackson version ########## buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy: ########## @@ -677,8 +677,8 @@ class BeamModulePlugin implements Plugin<Project> { activemq_junit : "org.apache.activemq.tooling:activemq-junit:$activemq_version", activemq_kahadb_store : "org.apache.activemq:activemq-kahadb-store:$activemq_version", activemq_mqtt : "org.apache.activemq:activemq-mqtt:$activemq_version", - antlr : "org.antlr:antlr4:4.7", - antlr_runtime : "org.antlr:antlr4-runtime:4.7", + antlr : "org.antlr:antlr4:4.10", + antlr_runtime : "org.antlr:antlr4-runtime:4.10", Review Comment: antlr is only used by Beam SDK core and is shaded. We should be able to unpin antlr. Let me open a PR ########## sdks/java/io/debezium/build.gradle: ########## @@ -38,40 +50,77 @@ dependencies { implementation library.java.joda_time provided library.java.jackson_dataformat_csv permitUnusedDeclared library.java.jackson_dataformat_csv - testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") - testImplementation project(path: ":sdks:java:io:common") + + // Kafka connect dependencies + implementation "org.apache.kafka:connect-api:3.9.0" + implementation "org.apache.kafka:connect-json:3.9.0" + permitUnusedDeclared "org.apache.kafka:connect-json:3.9.0" Review Comment: why need to declare this dependency then permitUnusedDeclared? ########## sdks/java/io/debezium/build.gradle: ########## @@ -18,6 +18,18 @@ import groovy.json.JsonOutput plugins { id 'org.apache.beam.module' } + Review Comment: please rebase (and possibly squash commits) onto the latest master and replace these chunk to ``` applyJavaNature( ...requireJavaVersion = JavaVersion.VERSION_17 ``` ########## sdks/java/io/debezium/expansion-service/build.gradle: ########## @@ -20,6 +20,17 @@ apply plugin: 'org.apache.beam.module' apply plugin: 'application' mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" +// The order is intended here - Debezium 3 requires Java 17, or later Review Comment: same, use `requireJavaVersion = JavaVersion.VERSION_17` and remove the version logics here and at the bottom of the file ########## sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java: ########## @@ -98,19 +122,22 @@ private void monitorEssentialMetrics() { rs.close(); Thread.sleep(4000); } else { - throw new IllegalArgumentException("OIOI"); + throw new IllegalArgumentException( + "Illegal Argument Exception in monitorEssentialMetrics."); } } - } catch (InterruptedException | SQLException ex) { - throw new IllegalArgumentException("Oi", ex); + } catch (SQLException ex) { + LOG.error("SQL error in monitoring thread. Shutting down.", ex); Review Comment: The exception is suppressed here. Is it intended? The test will pass even with exception here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org