cc Brad On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <r...@remind101.com> wrote:
> Yes, the same exact input operators go into both joins. > > The chunk of code for the joins from the specific part of the plan I > showed is as follows. The orgUsersTable is later filtered into one table > and aggregated and another table and aggregated. The planner seems to > duplicate orgUsersTable into 2 operators even though I create only 1 of it. > > // in the main function > val orgUsersTable = splatRoles( > this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS), > OrgUsersRoleSplatPrefix, > this.tableEnv > ) > > // helper function > def splatRoles( > table: Table, > columnPrefix: String, > tableEnv: TableEnvironment > ): Table = { > // Flink does not have a contains function so we have to splat out our > role array's contents > // and join it to the originating table. > val func = new SplatRolesFunc() > val splatted = table > .map(func($"roles", $"id")) > .as( > "id_splatted", > s"${columnPrefix}_is_admin", > s"${columnPrefix}_is_teacher", > s"${columnPrefix}_is_student", > s"${columnPrefix}_is_parent" > ) > // FIRST_VALUE is only available in SQL - so this is SQL. > // Rationale: We have to group by after a map to preserve the pk > inference, otherwise flink will > // toss it out and all future joins will not have a unique key. > tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted) > val grouped = tableEnv.sqlQuery(s""" > SELECT > id_splatted, > FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin, > FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher, > FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student, > FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent > FROM ${columnPrefix}_splatted > GROUP BY id_splatted > """) > return table > .join(grouped, $"id" === $"id_splatted") > .dropColumns($"id_splatted") > .renameColumns($"roles".as(s"${columnPrefix}_roles")) > } > > @FunctionHint( > output = new DataTypeHint( > "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student > BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)" > ) > ) > class SplatRolesFunc extends ScalarFunction { > def eval(roles: Array[String], id: java.lang.Long): Row = { > val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue) > val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue) > val isStudent: java.lang.Boolean = roles.contains(Student.rawValue) > val isParent: java.lang.Boolean = roles.contains(Parent.rawValue) > return Row.of(id, isAdmin, isTeacher, isStudent, isParent) > } > > override def getResultType(signature: Array[Class[_]]): > TypeInformation[_] = > Types.ROW( > Types.LONG, > Types.BOOLEAN, > Types.BOOLEAN, > Types.BOOLEAN, > Types.BOOLEAN > ) > } > > > On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yungao...@aliyun.com> wrote: > >> Hi Rex, >> >> Could you also attach one example for these sql / table ? And one >> possible issue to confirm is that does the operators with the same names >> also have the same inputs ? >> >> Best, >> Yun >> >> ------------------Original Mail ------------------ >> *Sender:*Rex Fenley <r...@remind101.com> >> *Send Date:*Fri Dec 4 02:55:41 2020 >> *Recipients:*user <user@flink.apache.org> >> *Subject:*Duplicate operators generated by plan >> >>> Hello, >>> >>> I'm running into an issue where my execution plan is creating the same >>> exact join operator multiple times simply because the subsequent operator >>> filters on a different boolean value. This is a massive duplication of >>> storage and work. The filtered operators which follow result in only a >>> small set of elements filtered out per set too. >>> >>> eg. of two separate operators that are equal >>> >>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, >>> organization_id, user_id, roles, id_splatted, org_user_is_admin, >>> org_user_is_teacher, org_user_is_student, org_user_is_parent], >>> leftInputSpec=[JoinKeyContainsUniqueKey], >>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, >>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin, >>> org_user_is_teacher, org_user_is_student, org_user_is_parent] >>> >>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, >>> organization_id, user_id, roles, id_splatted, org_user_is_admin, >>> org_user_is_teacher, org_user_is_student, org_user_is_parent], >>> leftInputSpec=[JoinKeyContainsUniqueKey], >>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, >>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin, >>> org_user_is_teacher, org_user_is_student, org_user_is_parent]) >>> >>> Which are entirely the same datasets being processed. >>> >>> The first one points to >>> GroupAggregate(groupBy=[user_id], select=[user_id, >>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, >>> TMP_0.f0 AS admin_organization_ids]) >>> >>> The second one points to >>> GroupAggregate(groupBy=[user_id], select=[user_id, >>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, >>> TMP_0.f0 AS teacher_organization_ids]) >>> >>> And these are both intersecting sets of data though slightly different. >>> I don't see why that would make the 1 join from before split into 2 though. >>> There's even a case where I'm seeing a join tripled. >>> >>> Is there a good reason why this should happen? Is there a way to tell >>> flink to not duplicate operators where it doesn't need to? >>> >>> Thanks! >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>