godfreyhe commented on a change in pull request #17118:
URL: https://github.com/apache/flink/pull/17118#discussion_r704247305
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala
##########
@@ -21,16 +21,14 @@ package
org.apache.flink.table.planner.plan.nodes.physical.common
import org.apache.flink.table.api.TableException
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
-import org.apache.flink.table.planner.plan.utils.JoinUtil
+import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinUtil}
import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
import
org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
-
Review comment:
please revert these changes and check your scala checkstyle setting in
your idea
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
##########
@@ -119,4 +122,22 @@ public void apply(DynamicTableSource tableSource,
SourceAbilityContext context)
tableSource.getClass().getName()));
}
}
+
+ @Override
+ public String getDigests(SourceAbilityContext context) {
+ final List<String> expressionStrs = new ArrayList<>();
+ final RowType souorceRowType = context.getSourceRowType();
Review comment:
typo: souorce
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/PartitionPushDownSpec.java
##########
@@ -60,4 +60,12 @@ public void apply(DynamicTableSource tableSource,
SourceAbilityContext context)
tableSource.getClass().getName()));
}
}
+
+ @Override
+ public String getDigests(SourceAbilityContext context) {
+ return "partitions=["
+ + String.join(
+ ", ",
this.partitions.stream().map(Object::toString).toArray(String[]::new))
Review comment:
nit: we can use Collectors.joining() to simply this line
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
##########
@@ -183,4 +185,25 @@ public void onPeriodicEmit(WatermarkOutput output) {
}
}
}
+
+ @Override
+ public String getDigests(SourceAbilityContext context) {
Review comment:
nit: it's better to move this method close the `apply` method
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
##########
@@ -183,4 +185,25 @@ public void onPeriodicEmit(WatermarkOutput output) {
}
}
}
+
+ @Override
+ public String getDigests(SourceAbilityContext context) {
+ final String expressionStr =
+ FlinkRexUtil.getExpressionString(
+ watermarkExpr,
+ JavaScalaConversionUtil.toScala(
+ context.getSourceRowType().getFieldNames()));
+
+ // final String expressionStr =
+ // FlinkRexUtil.getExpressionString(
+ // watermarkExpr,
+ //
+ //
JavaScalaConversionUtil.toScala(getProducedType().get().getFieldNames()));
+
+ if (idleTimeoutMillis == -1L) {
Review comment:
idleTimeoutMillis > 0
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -65,8 +68,28 @@ class TableSourceTable(
override def getQualifiedName: util.List[String] = {
val builder = ImmutableList.builder[String]()
- .addAll(super.getQualifiedName)
- extraDigests.foreach(builder.add)
+ .addAll(super.getQualifiedName)
+
+ if(abilitySpecs != null && abilitySpecs.length != 0){
+// var newProducedType = catalogTable
+// .getResolvedSchema
+// .toSourceRowDataType
+// .getLogicalType
+// .asInstanceOf[RowType]
Review comment:
remove the useless code
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
##########
@@ -19,27 +19,27 @@
package org.apache.flink.table.planner.plan.metadata
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
-import org.apache.flink.table.api.{DataTypes, TableException, TableSchema}
+import org.apache.flink.table.api.{DataTypes, TableConfig, TableException,
TableSchema}
import org.apache.flink.table.catalog.{CatalogTable, Column, ObjectIdentifier,
ResolvedCatalogTable, ResolvedSchema, UniqueConstraint}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.connector.source.{DynamicTableSource,
ScanTableSource}
import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
-import org.apache.flink.table.planner.calcite.{FlinkTypeFactory,
FlinkTypeSystem}
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkContextImpl,
FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase,
TableSourceTable}
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
import org.apache.flink.table.types.logical.{BigIntType, DoubleType, IntType,
LocalZonedTimestampType, LogicalType, TimestampKind, TimestampType, VarCharType}
-
import org.apache.calcite.config.CalciteConnectionConfig
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.schema.Schema.TableType
import org.apache.calcite.schema.{Schema, SchemaPlus, Table}
import org.apache.calcite.sql.{SqlCall, SqlNode}
-
import java.util
import java.util.Collections
+import org.apache.flink.table.utils.CatalogManagerMocks
Review comment:
nit: move this import close to other `org.apache.flink.table.xx`
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
##########
@@ -284,7 +284,12 @@ object MetadataTestUtil {
new TestTableSource(),
true,
new ResolvedCatalogTable(catalogTable, resolvedSchema),
- Array("project=[a, c, d]"))
+ new FlinkContextImpl(
Review comment:
create variable field to replace all `new FlinkContextImpl(...)`
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -65,8 +68,28 @@ class TableSourceTable(
override def getQualifiedName: util.List[String] = {
val builder = ImmutableList.builder[String]()
- .addAll(super.getQualifiedName)
- extraDigests.foreach(builder.add)
+ .addAll(super.getQualifiedName)
+
+ if(abilitySpecs != null && abilitySpecs.length != 0){
+// var newProducedType = catalogTable
+// .getResolvedSchema
+// .toSourceRowDataType
+// .getLogicalType
+// .asInstanceOf[RowType]
+
+ var newProducedType = DynamicSourceUtils.createProducedType(
+ catalogTable.getResolvedSchema,
+ tableSource)
+
+ for (spec <- abilitySpecs) {
+ val sourceAbilityContext = new SourceAbilityContext(flinkContext,
newProducedType)
+
+ builder.add(spec.getDigests(sourceAbilityContext))
+ if (spec.getProducedType.isPresent) {
+ newProducedType = spec.getProducedType.get
+ }
Review comment:
nit: these lines can be simplified as ` newProducedType =
spec.getProducedType.orElse(newProducedType)`
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -41,8 +44,8 @@ import java.util
* @param tableSource The [[DynamicTableSource]] for which is converted to a
Calcite Table
* @param isStreamingMode A flag that tells if the current table is in stream
mode
* @param catalogTable Resolved catalog table where this table source table
comes from
- * @param extraDigests The extra digests which will be added into
`getQualifiedName`
- * as a part of table digest
+ * @param flinkContext The flink context
+ * @param abilitySpecs The abilitySpec applied to the source
*/
Review comment:
abilitySpec -> abilitySpecs
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -41,8 +44,8 @@ import java.util
* @param tableSource The [[DynamicTableSource]] for which is converted to a
Calcite Table
* @param isStreamingMode A flag that tells if the current table is in stream
mode
* @param catalogTable Resolved catalog table where this table source table
comes from
- * @param extraDigests The extra digests which will be added into
`getQualifiedName`
- * as a part of table digest
+ * @param flinkContext The flink context
Review comment:
it's better we can explain why we need flinkContext here
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -20,15 +20,18 @@ package org.apache.flink.table.planner.plan.schema
import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable}
import org.apache.flink.table.connector.source.DynamicTableSource
-import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec
+import
org.apache.flink.table.planner.plan.abilities.source.{SourceAbilityContext,
SourceAbilitySpec}
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
-
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.RelOptSchema
import org.apache.calcite.rel.`type`.RelDataType
-
import java.util
+import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkTypeFactory}
Review comment:
remove the useless imports
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/WatermarkPushDownSpec.java
##########
@@ -183,4 +185,25 @@ public void onPeriodicEmit(WatermarkOutput output) {
}
}
}
+
+ @Override
+ public String getDigests(SourceAbilityContext context) {
+ final String expressionStr =
+ FlinkRexUtil.getExpressionString(
+ watermarkExpr,
+ JavaScalaConversionUtil.toScala(
+ context.getSourceRowType().getFieldNames()));
+
+ // final String expressionStr =
+ // FlinkRexUtil.getExpressionString(
+ // watermarkExpr,
+ //
+ //
JavaScalaConversionUtil.toScala(getProducedType().get().getFieldNames()));
Review comment:
remove these code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]