PatrickRen commented on code in PR #24618: URL: https://github.com/apache/flink/pull/24618#discussion_r1575873093
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableLineageDatasetImpl.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.DefaultCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.listener.CatalogContext; +import org.apache.flink.table.factories.FactoryUtil; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** Implementation for TableLineageDataSet. */ +public class TableLineageDatasetImpl implements TableLineageDataset { + @JsonProperty private String name; + @JsonProperty private String namespace; + private CatalogContext catalogContext; + private CatalogBaseTable catalogBaseTable; + @JsonProperty private ObjectPath objectPath; + @JsonProperty private Map<String, LineageDatasetFacet> facets; + + public TableLineageDatasetImpl(ContextResolvedTable contextResolvedTable) { + this.name = contextResolvedTable.getIdentifier().asSummaryString(); + this.namespace = inferNamespace(contextResolvedTable.getTable()).orElse(""); Review Comment: I'm not sure if the implementation here matches the definition on the interface. From Javadoc of LineageDataset: https://github.com/apache/flink/blob/0b2e98803542365136a212580a8a61078f7acaca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/lineage/LineageDataset.java#L32 Let's take JDBC connector as an example. My assumption is that the namespace should describe the URL of the database, or at least some identifier that can tell difference between difference DB instances. Here the implementation only writes `jdbc` as the namespace. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ########## @@ -123,6 +124,7 @@ public class StreamGraph implements Pipeline { private CheckpointStorage checkpointStorage; private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs; private InternalTimeServiceManager.Provider timerServiceProvider; + private LineageGraph lineageGraph; Review Comment: The only usage of this field is for tests. Is it possible not to introduce it in `StreamGraph`? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableDataSetSchemaFacet.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet; +import org.apache.flink.streaming.api.lineage.DatasetSchemaField; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +/** Default implementation for DatasetSchemaFacet. */ +public class TableDataSetSchemaFacet implements DatasetSchemaFacet { Review Comment: Actually we don't need facets for table schema and table config. They are already included in `TableLineageDataset#table`. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/lineage/TableDataSetSchemaFacet.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.lineage; + +import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet; +import org.apache.flink.streaming.api.lineage.DatasetSchemaField; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +/** Default implementation for DatasetSchemaFacet. */ +public class TableDataSetSchemaFacet implements DatasetSchemaFacet { Review Comment: `DataSet` -> `Dataset` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java: ########## @@ -90,6 +90,7 @@ protected Transformation<RowData> createConversionTransformationIfNeeded( final RowType outputType = (RowType) getOutputType(); final Transformation<RowData> transformation; final int[] fieldIndexes = computeIndexMapping(true); + Review Comment: This is added by mistake I assume ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PhysicalTransformation.java: ########## @@ -34,6 +35,7 @@ public abstract class PhysicalTransformation<T> extends Transformation<T> { private boolean supportsConcurrentExecutionAttempts = true; + private LineageVertex lineageVertex; Review Comment: It looks like only source and sink transformations have lineage vertex. What about we only add it to source / sink transformations? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java: ########## @@ -133,6 +139,21 @@ protected Transformation<RowData> translateToPlanInternal( outputTypeInfo, sourceParallelism, sourceParallelismConfigured); + + LineageDataset tableLineageDataset = Review Comment: Here I think we need to check if the source implements `LineageVertexProvider` first. If so we extract datasets from it then we add additional table layer information to it and construct the `TableLineageDataset`. My understanding is that without information from connector, it is not possible to build a `LineageVertex` purely from tables. From the table layer we can only tell the type of the external system by checking `connector` option, like Kafka, JDBC and so forth, but we can't identify which Kafka cluster / database instance the lineage dataset belongs to. This is also the reason I'm confused by the `namespace` in `TableLineageDatasetImpl`. Table planner can only add additional info such as schema and object identifier, but the root lineage dataset should be provided by connectors. The same issue for `CommonExecSink`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org