godfreyhe commented on a change in pull request #15117:
URL: https://github.com/apache/flink/pull/15117#discussion_r594003569
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala
##########
@@ -59,10 +60,13 @@ abstract class Expand(
if (projects.size() <= 1) {
return litmus.fail("Expand should output more than one rows, otherwise
use Project.")
}
- if (projects.exists(_.size != outputRowType.getFieldCount)) {
+ if (null == outputFieldNames) {
+ return litmus.fail("Expand should have none empty output field names.")
+ }
Review comment:
give the default name if `outputFieldNames` is null
and I think it's better we do not give `outputFieldNames` and derive names
in `deriveRowType`
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala
##########
@@ -79,7 +83,27 @@ abstract class Expand(
litmus.succeed()
}
- override def deriveRowType(): RelDataType = outputRowType
+ override def deriveRowType(): RelDataType = {
+ val fieldNullableMap = mutable.Map[Int, Boolean]()
+ projects.map {
+ project =>
+ project.zipWithIndex.map {
+ f => {
+ val nullable =
fieldNullableMap.get(f._2).getOrElse(f._1.getType.isNullable)
+ fieldNullableMap.put(f._2, nullable || f._1.getType.isNullable)
+ }
+ }
+ }
+ val typeList = ListBuffer[RelDataType]()
+ val typeFactory = cluster.getTypeFactory
+ projects.get(0).zipWithIndex.map {
Review comment:
another case we should consider is Expand can be more generic (not only
used for distinct agg, but also for other case), after that the fields in the
Expand node may be different type, so we should check whether a column's types
in all expanded rows are the same or we should derive the common type for each
column.
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala
##########
@@ -19,16 +19,11 @@
package org.apache.flink.table.planner.plan.utils
import org.apache.flink.table.planner.calcite.FlinkRelBuilder
-import org.apache.flink.table.planner.plan.nodes.calcite.{Expand,
LogicalExpand}
Review comment:
nit: reformat imports order
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
##########
@@ -223,7 +223,7 @@ class FlinkRelMdDistinctRowCount private extends
MetadataHandler[BuiltInMetadata
val groupKeyOfCurrentProject = new JArrayList[Int]()
groupKeySkipExpandId.foreach { key =>
project.get(key) match {
- case literal: RexLiteral if literal.isNull => // do nothing
+ case literal: RexLiteral if literal.isNull ||
literal.isAlwaysFalse => // do nothing
Review comment:
there are some corner case ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala
##########
@@ -190,15 +172,12 @@ object ExpandUtil {
def createExpandProjects(
rexBuilder: RexBuilder,
inputType: RelDataType,
- outputType: RelDataType,
Review comment:
the java doc should also be updated
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]