Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed
is as follows. The orgUsersTable is later filtered into one table and
aggregated and another table and aggregated. The planner seems to duplicate
orgUsersTable into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
OrgUsersRoleSplatPrefix,
this.tableEnv
)

// helper function
def splatRoles(
table: Table,
columnPrefix: String,
tableEnv: TableEnvironment
): Table = {
// Flink does not have a contains function so we have to splat out our role
array's contents
// and join it to the originating table.
val func = new SplatRolesFunc()
val splatted = table
.map(func($"roles", $"id"))
.as(
"id_splatted",
s"${columnPrefix}_is_admin",
s"${columnPrefix}_is_teacher",
s"${columnPrefix}_is_student",
s"${columnPrefix}_is_parent"
)
// FIRST_VALUE is only available in SQL - so this is SQL.
// Rationale: We have to group by after a map to preserve the pk inference,
otherwise flink will
// toss it out and all future joins will not have a unique key.
tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
val grouped = tableEnv.sqlQuery(s"""
SELECT
id_splatted,
FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
FROM ${columnPrefix}_splatted
GROUP BY id_splatted
""")
return table
.join(grouped, $"id" === $"id_splatted")
.dropColumns($"id_splatted")
.renameColumns($"roles".as(s"${columnPrefix}_roles"))
}

@FunctionHint(
output = new DataTypeHint(
"(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
)
)
class SplatRolesFunc extends ScalarFunction {
def eval(roles: Array[String], id: java.lang.Long): Row = {
val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
}

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(
Types.LONG,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN
)
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Rex,
>
>     Could  you also attach one example for these sql / table ? And one
> possible issue to confirm is that does the operators with the same names
> also have the same inputs ?
>
> Best,
> Yun
>
> ------------------Original Mail ------------------
> *Sender:*Rex Fenley <r...@remind101.com>
> *Send Date:*Fri Dec 4 02:55:41 2020
> *Recipients:*user <user@flink.apache.org>
> *Subject:*Duplicate operators generated by plan
>
>> Hello,
>>
>> I'm running into an issue where my execution plan is creating the same
>> exact join operator multiple times simply because the subsequent operator
>> filters on a different boolean value. This is a massive duplication of
>> storage and work. The filtered operators which follow result in only a
>> small set of elements filtered out per set too.
>>
>> eg. of two separate operators that are equal
>>
>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>>
>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>>
>> Which are entirely the same datasets being processed.
>>
>> The first one points to
>> GroupAggregate(groupBy=[user_id], select=[user_id,
>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>> TMP_0.f0 AS admin_organization_ids])
>>
>> The second one points to
>> GroupAggregate(groupBy=[user_id], select=[user_id,
>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>> TMP_0.f0 AS teacher_organization_ids])
>>
>> And these are both intersecting sets of data though slightly different. I
>> don't see why that would make the 1 join from before split into 2 though.
>> There's even a case where I'm seeing a join tripled.
>>
>> Is there a good reason why this should happen? Is there a way to tell
>> flink to not duplicate operators where it doesn't need to?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to