godfreyhe commented on a change in pull request #15117:
URL: https://github.com/apache/flink/pull/15117#discussion_r594003569



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala
##########
@@ -59,10 +60,13 @@ abstract class Expand(
     if (projects.size() <= 1) {
       return litmus.fail("Expand should output more than one rows, otherwise 
use Project.")
     }
-    if (projects.exists(_.size != outputRowType.getFieldCount)) {
+    if (null == outputFieldNames) {
+      return litmus.fail("Expand should have none empty output field names.")
+    }

Review comment:
       give the default name if `outputFieldNames` is null
   
   and I think it's better we do not give `outputFieldNames` and derive names 
in `deriveRowType`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala
##########
@@ -79,7 +83,27 @@ abstract class Expand(
     litmus.succeed()
   }
 
-  override def deriveRowType(): RelDataType = outputRowType
+  override def deriveRowType(): RelDataType = {
+    val fieldNullableMap = mutable.Map[Int, Boolean]()
+    projects.map {
+      project =>
+        project.zipWithIndex.map {
+          f => {
+            val nullable = 
fieldNullableMap.get(f._2).getOrElse(f._1.getType.isNullable)
+            fieldNullableMap.put(f._2, nullable || f._1.getType.isNullable)
+          }
+        }
+    }
+    val typeList = ListBuffer[RelDataType]()
+    val typeFactory = cluster.getTypeFactory
+    projects.get(0).zipWithIndex.map {

Review comment:
       another case we should consider is Expand can be more generic (not only 
used for distinct agg, but also for other case), after that the fields in the 
Expand node may be different type, so we should check whether a column's types 
in all expanded rows are the same or we should derive the common type for each 
column. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala
##########
@@ -19,16 +19,11 @@
 package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder
-import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, 
LogicalExpand}

Review comment:
       nit: reformat imports order

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdDistinctRowCount.scala
##########
@@ -223,7 +223,7 @@ class FlinkRelMdDistinctRowCount private extends 
MetadataHandler[BuiltInMetadata
         val groupKeyOfCurrentProject = new JArrayList[Int]()
         groupKeySkipExpandId.foreach { key =>
           project.get(key) match {
-            case literal: RexLiteral if literal.isNull => // do nothing
+            case literal: RexLiteral if literal.isNull || 
literal.isAlwaysFalse => // do nothing

Review comment:
       there are some corner case ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExpandUtil.scala
##########
@@ -190,15 +172,12 @@ object ExpandUtil {
   def createExpandProjects(
       rexBuilder: RexBuilder,
       inputType: RelDataType,
-      outputType: RelDataType,

Review comment:
       the java doc should also be updated




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to