rohitsinha54 commented on code in PR #33062:
URL: https://github.com/apache/beam/pull/33062#discussion_r1838967018
##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java:
##########
@@ -2603,11 +2624,28 @@ public void setup() {
private Connection getConnection() throws SQLException {
Connection connection = this.connection;
if (connection == null) {
- connection = checkStateNotNull(dataSource).getConnection();
+ DataSource validSource = checkStateNotNull(dataSource);
+ connection = validSource.getConnection();
connection.setAutoCommit(false);
preparedStatement =
connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());
this.connection = connection;
+
+ // report Lineage if haven't done so
+ String table = spec.getTable();
+ if (Strings.isNullOrEmpty(table) && spec.getStatement() != null) {
+ table =
JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get());
+ }
+ if (!Objects.equals(table, reportedLineage)) {
+ JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource);
+ if (fqn == null) {
+ fqn = JdbcUtil.FQNComponents.of(connection);
+ }
+ if (fqn != null) {
Review Comment:
can we log the query/connection when fqn remained null which will tell us
cases during debugging where we are not able to parse in current
implementation. Same above.
##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java:
##########
@@ -563,4 +577,232 @@ public KV<Long, KV<DateTime, DateTime>> mapRow(ResultSet
resultSet) throws Excep
}
}
});
+
+ @AutoValue
+ abstract static class JdbcUrl {
Review Comment:
This string processing seem brittle. Although given the different types of
JDBC connections we can see here I understand it will not be possible for us to
readily use any existing library.
See a above to add log for patterns which can't be parsed.
##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java:
##########
@@ -39,6 +39,7 @@
import java.util.Collection;
Review Comment:
Should we update the documentation of the IO to specifically say the lineage
metrics are only collected in these cases to set customer expectations
correctly.
##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java:
##########
@@ -563,4 +577,232 @@ public KV<Long, KV<DateTime, DateTime>> mapRow(ResultSet
resultSet) throws Excep
}
}
});
+
+ @AutoValue
+ abstract static class JdbcUrl {
+ abstract String getScheme();
+
+ abstract @Nullable String getHostAndPort();
+
+ abstract String getDatabase();
+
+ static @Nullable JdbcUrl of(String url) {
+ if (Strings.isNullOrEmpty(url) || !url.startsWith("jdbc:")) {
Review Comment:
is the url already sanitized here? like for example not to contain leading
whitespace etc
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]