cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <r...@remind101.com> wrote:

> Yes, the same exact input operators go into both joins.
>
> The chunk of code for the joins from the specific part of the plan I
> showed is as follows. The orgUsersTable is later filtered into one table
> and aggregated and another table and aggregated. The planner seems to
> duplicate orgUsersTable into 2 operators even though I create only 1 of it.
>
> // in the main function
> val orgUsersTable = splatRoles(
> this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
> OrgUsersRoleSplatPrefix,
> this.tableEnv
> )
>
> // helper function
> def splatRoles(
> table: Table,
> columnPrefix: String,
> tableEnv: TableEnvironment
> ): Table = {
> // Flink does not have a contains function so we have to splat out our
> role array's contents
> // and join it to the originating table.
> val func = new SplatRolesFunc()
> val splatted = table
> .map(func($"roles", $"id"))
> .as(
> "id_splatted",
> s"${columnPrefix}_is_admin",
> s"${columnPrefix}_is_teacher",
> s"${columnPrefix}_is_student",
> s"${columnPrefix}_is_parent"
> )
> // FIRST_VALUE is only available in SQL - so this is SQL.
> // Rationale: We have to group by after a map to preserve the pk
> inference, otherwise flink will
> // toss it out and all future joins will not have a unique key.
> tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
> val grouped = tableEnv.sqlQuery(s"""
> SELECT
> id_splatted,
> FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
> FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
> FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
> FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
> FROM ${columnPrefix}_splatted
> GROUP BY id_splatted
> """)
> return table
> .join(grouped, $"id" === $"id_splatted")
> .dropColumns($"id_splatted")
> .renameColumns($"roles".as(s"${columnPrefix}_roles"))
> }
>
> @FunctionHint(
> output = new DataTypeHint(
> "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
> BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
> )
> )
> class SplatRolesFunc extends ScalarFunction {
> def eval(roles: Array[String], id: java.lang.Long): Row = {
> val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
> val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
> val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
> val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
> return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
> }
>
> override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] =
> Types.ROW(
> Types.LONG,
> Types.BOOLEAN,
> Types.BOOLEAN,
> Types.BOOLEAN,
> Types.BOOLEAN
> )
> }
>
>
> On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yungao...@aliyun.com> wrote:
>
>> Hi Rex,
>>
>>     Could  you also attach one example for these sql / table ? And one
>> possible issue to confirm is that does the operators with the same names
>> also have the same inputs ?
>>
>> Best,
>> Yun
>>
>> ------------------Original Mail ------------------
>> *Sender:*Rex Fenley <r...@remind101.com>
>> *Send Date:*Fri Dec 4 02:55:41 2020
>> *Recipients:*user <user@flink.apache.org>
>> *Subject:*Duplicate operators generated by plan
>>
>>> Hello,
>>>
>>> I'm running into an issue where my execution plan is creating the same
>>> exact join operator multiple times simply because the subsequent operator
>>> filters on a different boolean value. This is a massive duplication of
>>> storage and work. The filtered operators which follow result in only a
>>> small set of elements filtered out per set too.
>>>
>>> eg. of two separate operators that are equal
>>>
>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>>>
>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>>>
>>> Which are entirely the same datasets being processed.
>>>
>>> The first one points to
>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>>> TMP_0.f0 AS admin_organization_ids])
>>>
>>> The second one points to
>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>>> TMP_0.f0 AS teacher_organization_ids])
>>>
>>> And these are both intersecting sets of data though slightly different.
>>> I don't see why that would make the 1 join from before split into 2 though.
>>> There's even a case where I'm seeing a join tripled.
>>>
>>> Is there a good reason why this should happen? Is there a way to tell
>>> flink to not duplicate operators where it doesn't need to?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to