PatrickRen commented on code in PR #24618:
URL: https://github.com/apache/flink/pull/24618#discussion_r1575873093


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.lineage;
+
+import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.DefaultCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.listener.CatalogContext;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Implementation for TableLineageDataSet. */
+public class TableLineageDatasetImpl implements TableLineageDataset {
+    @JsonProperty private String name;
+    @JsonProperty private String namespace;
+    private CatalogContext catalogContext;
+    private CatalogBaseTable catalogBaseTable;
+    @JsonProperty private ObjectPath objectPath;
+    @JsonProperty private Map<String, LineageDatasetFacet> facets;
+
+    public TableLineageDatasetImpl(ContextResolvedTable contextResolvedTable) {
+        this.name = contextResolvedTable.getIdentifier().asSummaryString();
+        this.namespace = 
inferNamespace(contextResolvedTable.getTable()).orElse("");

Review Comment:
   I'm not sure if the implementation here matches the definition on the 
interface. From Javadoc of LineageDataset: 
   
   
https://github.com/apache/flink/blob/0b2e98803542365136a212580a8a61078f7acaca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java#L32
   
   Let's take JDBC connector as an example. My assumption is that the namespace 
should describe the URL of the database, or at least some identifier that can 
tell difference between difference DB instances. Here the implementation only 
writes `jdbc` as the namespace. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -123,6 +124,7 @@ public class StreamGraph implements Pipeline {
     private CheckpointStorage checkpointStorage;
     private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
     private InternalTimeServiceManager.Provider timerServiceProvider;
+    private LineageGraph lineageGraph;

Review Comment:
   The only usage of this field is for tests. Is it possible not to introduce 
it in `StreamGraph`? 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableDataSetSchemaFacet.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.lineage;
+
+import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet;
+import org.apache.flink.streaming.api.lineage.DatasetSchemaField;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+/** Default implementation for DatasetSchemaFacet. */
+public class TableDataSetSchemaFacet implements DatasetSchemaFacet {

Review Comment:
   Actually we don't need facets for table schema and table config. They are 
already included in `TableLineageDataset#table`.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableDataSetSchemaFacet.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.lineage;
+
+import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet;
+import org.apache.flink.streaming.api.lineage.DatasetSchemaField;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+/** Default implementation for DatasetSchemaFacet. */
+public class TableDataSetSchemaFacet implements DatasetSchemaFacet {

Review Comment:
   `DataSet` -> `Dataset`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java:
##########
@@ -90,6 +90,7 @@ protected Transformation<RowData> 
createConversionTransformationIfNeeded(
         final RowType outputType = (RowType) getOutputType();
         final Transformation<RowData> transformation;
         final int[] fieldIndexes = computeIndexMapping(true);
+

Review Comment:
   This is added by mistake I assume



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java:
##########
@@ -34,6 +35,7 @@
 public abstract class PhysicalTransformation<T> extends Transformation<T> {
 
     private boolean supportsConcurrentExecutionAttempts = true;
+    private LineageVertex lineageVertex;

Review Comment:
   It looks like only source and sink transformations have lineage vertex. What 
about we only add it to source / sink transformations?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java:
##########
@@ -133,6 +139,21 @@ protected Transformation<RowData> translateToPlanInternal(
                             outputTypeInfo,
                             sourceParallelism,
                             sourceParallelismConfigured);
+
+            LineageDataset tableLineageDataset =

Review Comment:
   Here I think we need to check if the source implements 
`LineageVertexProvider` first. If so we extract datasets from it then we add 
additional table layer information to it and construct the 
`TableLineageDataset`.
   
   
   My understanding is that without information from connector, it is not 
possible to build a `LineageVertex` purely from tables. From the table layer we 
can only tell the type of the external system by checking `connector` option, 
like Kafka, JDBC and so forth, but we can't identify which Kafka cluster / 
database instance the lineage dataset belongs to. 
   
   
   This is also the reason I'm confused by the `namespace` in 
`TableLineageDatasetImpl`. Table planner can only add additional info such as 
schema and object identifier, but the root lineage dataset should be provided 
by connectors.
   
   
   The same issue for `CommonExecSink`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to