Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-07-12 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1675726294


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveOutputFormatFactory.java:
##
@@ -43,19 +51,28 @@ public HiveOutputFormatFactory(HiveWriterFactory factory) {
 
 @Override
 public HiveOutputFormat createOutputFormat(Path path) {
-return new HiveOutputFormat(
-
factory.createRecordWriter(HadoopFileSystem.toHadoopPath(path)),
-factory.createRowConverter());
+HiveConf hiveConf = HiveConfUtils.create(factory.getJobConf());
+final String thriftURL = 
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname);

Review Comment:
   if we are using the HMS can we get more metadata from it to store in 
lineage? do we store the Hadoop path?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-07-12 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1675733139


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java:
##
@@ -60,7 +62,7 @@
  */
 @PublicEvolving
 public class HiveSource extends AbstractFileSource
-implements DynamicParallelismInference {

Review Comment:
   shouldn't the hive connector changes do into 
[https://github.com/apache/flink-connector-hive](https://github.com/apache/flink-connector-hive)
 . I assume the hive connector has moved out of Flink core and the hive 
connector folder in core is not being maintained. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-07-12 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1675726294


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveOutputFormatFactory.java:
##
@@ -43,19 +51,28 @@ public HiveOutputFormatFactory(HiveWriterFactory factory) {
 
 @Override
 public HiveOutputFormat createOutputFormat(Path path) {
-return new HiveOutputFormat(
-
factory.createRecordWriter(HadoopFileSystem.toHadoopPath(path)),
-factory.createRowConverter());
+HiveConf hiveConf = HiveConfUtils.create(factory.getJobConf());
+final String thriftURL = 
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname);

Review Comment:
   if we are using the HMS can we get more metadata from it to store in lineage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-07-12 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1675721067


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java:
##
@@ -242,21 +249,33 @@ public  HiveSource 
buildWithBulkFormat(BulkFormat bulk
 hiveTablePartitionBytes = 
HivePartitionUtils.serializeHiveTablePartition(partitions);
 }
 
-return new HiveSource<>(
-new Path[1],
-new HiveSourceFileEnumerator.Provider(
-hiveTablePartitionBytes, new JobConfWrapper(jobConf)),
-splitAssigner,
-bulkFormat,
-continuousSourceSettings,
-jobConf,
-tablePath,
-partitionKeys,
-hiveVersion,
-dynamicFilterPartitionKeys,
-hiveTablePartitionBytes,
-fetcher,
-fetcherContext);
+DefaultSourceLineageVertex vertex =

Review Comment:
   I like the idea of adding a default source vertex and a default vertex. I 
wonder if there are any Hive specifics we could  add to the vertex that would 
warrant not using the default. Maybe the type of table, e.g. Hive or Iceberg 
etc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-07-12 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1675721067


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java:
##
@@ -242,21 +249,33 @@ public  HiveSource 
buildWithBulkFormat(BulkFormat bulk
 hiveTablePartitionBytes = 
HivePartitionUtils.serializeHiveTablePartition(partitions);
 }
 
-return new HiveSource<>(
-new Path[1],
-new HiveSourceFileEnumerator.Provider(
-hiveTablePartitionBytes, new JobConfWrapper(jobConf)),
-splitAssigner,
-bulkFormat,
-continuousSourceSettings,
-jobConf,
-tablePath,
-partitionKeys,
-hiveVersion,
-dynamicFilterPartitionKeys,
-hiveTablePartitionBytes,
-fetcher,
-fetcherContext);
+DefaultSourceLineageVertex vertex =

Review Comment:
   I like the idea of adding a default source vertex and a default vertex. I 
wonder if there are any Hive specifis we could  add to the vertex that would 
warrant not using the default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-07-12 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1675718687


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java:
##
@@ -60,7 +62,7 @@
  */
 @PublicEvolving
 public class HiveSource extends AbstractFileSource
-implements DynamicParallelismInference {
+implements DynamicParallelismInference, LineageVertexProvider {

Review Comment:
   Can we support sink as well as source?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-07-12 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1675715727


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java:
##
@@ -88,6 +94,7 @@ public FileSystemLookupFunction(
 }
 this.reloadInterval = reloadInterval;
 this.serializer = InternalSerializers.create(rowType);
+this.lineageVertex = new 
DefaultSourceLineageVertex(Boundedness.BOUNDED);

Review Comment:
   I wonder if it would be better to wrap the Default lineage vertices with 
Factories, so the caller is not hard coding a dependancy on the default but 
deals with the interface. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-05-24 Thread via GitHub


davidradl commented on PR #24835:
URL: https://github.com/apache/flink/pull/24835#issuecomment-2129222164

   Shouldn't this pr go in after 
[https://github.com/apache/flink/pull/24618](https://github.com/apache/flink/pull/24618)
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-05-24 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1613118293


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Default implementation for {@link LineageDataset}. */
+@Internal

Review Comment:
   why is this @Internal - is it not the case that connectors that do not live 
in this repository are likely to use these implementation classes. I suggest 
using @PublicEvolving in line with the interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-05-24 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1613112277


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Default implementation for {@link LineageDataset}. */
+@Internal

Review Comment:
   I wonder whether these names should be LineageDatasetImpl instead of Default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-05-24 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1613106837


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Default implementation for {@link LineageDataset}. */
+@Internal
+public class DefaultLineageDataset implements LineageDataset {

Review Comment:
   we should have unit tests for these new classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-05-24 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1613096352


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Default implementation for {@link LineageDataset}. */
+@Internal
+public class DefaultLineageDataset implements LineageDataset {

Review Comment:
   why are these classes called Default. I see there exists a 
DefaultLineageGraph, which seems a reasonable default.
I wonder whether it would be better to call this HiveLineageDataset and use 
a `Hive` prefix instead of `Default ` .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-05-24 Thread via GitHub


davidradl commented on code in PR #24835:
URL: https://github.com/apache/flink/pull/24835#discussion_r1613096352


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/DefaultLineageDataset.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.lineage;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Default implementation for {@link LineageDataset}. */
+@Internal
+public class DefaultLineageDataset implements LineageDataset {

Review Comment:
   why are these classes called Default. I see there exists a 
DefaultLineageGraph, which seems a reasonable default.
I wonder whether it would be better to call this HiveLineageDataset and use 
a `Hive` prefix instead of `Default ` .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35326][hive] add lineage integration for Hive connector [flink]

2024-05-23 Thread via GitHub


flinkbot commented on PR #24835:
URL: https://github.com/apache/flink/pull/24835#issuecomment-2127917873

   
   ## CI report:
   
   * e23f6d5c6a133afc8f7e8fadda7fbd64570c806f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org