rohitsinha54 commented on code in PR #33062:
URL: https://github.com/apache/beam/pull/33062#discussion_r1838967018


##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java:
##########
@@ -2603,11 +2624,28 @@ public void setup() {
     private Connection getConnection() throws SQLException {
       Connection connection = this.connection;
       if (connection == null) {
-        connection = checkStateNotNull(dataSource).getConnection();
+        DataSource validSource = checkStateNotNull(dataSource);
+        connection = validSource.getConnection();
         connection.setAutoCommit(false);
         preparedStatement =
             
connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());
         this.connection = connection;
+
+        // report Lineage if haven't done so
+        String table = spec.getTable();
+        if (Strings.isNullOrEmpty(table) && spec.getStatement() != null) {
+          table = 
JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get());
+        }
+        if (!Objects.equals(table, reportedLineage)) {
+          JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource);
+          if (fqn == null) {
+            fqn = JdbcUtil.FQNComponents.of(connection);
+          }
+          if (fqn != null) {

Review Comment:
   can we log the query/connection when fqn remained null which will tell us 
cases during debugging where we are not able to parse in current 
implementation. Same above. 



##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java:
##########
@@ -563,4 +577,232 @@ public KV<Long, KV<DateTime, DateTime>> mapRow(ResultSet 
resultSet) throws Excep
               }
             }
           });
+
+  @AutoValue
+  abstract static class JdbcUrl {

Review Comment:
   This string processing seem brittle. Although given the different types of 
JDBC connections we can see here I understand it will not be possible for us to 
readily use any existing library. 
   See a above to add log for patterns which can't be parsed. 



##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java:
##########
@@ -39,6 +39,7 @@
 import java.util.Collection;

Review Comment:
   Should we update the documentation of the IO to specifically say the lineage 
metrics are only collected in these cases to set customer expectations 
correctly. 
   
   



##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java:
##########
@@ -563,4 +577,232 @@ public KV<Long, KV<DateTime, DateTime>> mapRow(ResultSet 
resultSet) throws Excep
               }
             }
           });
+
+  @AutoValue
+  abstract static class JdbcUrl {
+    abstract String getScheme();
+
+    abstract @Nullable String getHostAndPort();
+
+    abstract String getDatabase();
+
+    static @Nullable JdbcUrl of(String url) {
+      if (Strings.isNullOrEmpty(url) || !url.startsWith("jdbc:")) {

Review Comment:
   is the url already sanitized here? like for example not to contain leading 
whitespace etc 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to