Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2169#discussion_r69045985
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
---
@@ -236,6 +236,32 @@ case class Aggregate(
}
}
+case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean)
extends BinaryNode {
+ override def output: Seq[Attribute] = left.output
+
+ override protected[logical] def construct(relBuilder: RelBuilder):
RelBuilder = {
+ left.construct(relBuilder)
+ right.construct(relBuilder)
+ relBuilder.minus(all)
+ }
+
+ override def validate(tableEnv: TableEnvironment): LogicalNode = {
+ val resolvedMinus = super.validate(tableEnv).asInstanceOf[SetMinus]
+ if (left.output.length != right.output.length) {
+ failValidation(s"Set minus two table of different column sizes:" +
+ s" ${left.output.size} and ${right.output.size}")
+ }
+ val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
+ l.resultType == r.resultType && l.name == r.name }
--- End diff --
I think @wuchong refers exactly to the last case. If number of fields and
field types are identical, it should be possible to do a `EXCEPT` even if the
field names are not the same.
The situation for `UNION` is a bit special due to Flink's internals and the
way the `RowTypeInfo` is implemented but I think we can remove that restriction
in the future.
So, we must keep the checks for number of fields and field types but can
remove the check of the field names, IMO.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---