Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed
is as follows. The orgUsersTable is later filtered into one table and
aggregated and another table and aggregated. The planner seems to duplicate
orgUsersTable into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(

// helper function
def splatRoles(
table: Table,
columnPrefix: String,
tableEnv: TableEnvironment
): Table = {
// Flink does not have a contains function so we have to splat out our role
array's contents
// and join it to the originating table.
val func = new SplatRolesFunc()
val splatted = table
.map(func($"roles", $"id"))
// FIRST_VALUE is only available in SQL - so this is SQL.
// Rationale: We have to group by after a map to preserve the pk inference,
otherwise flink will
// toss it out and all future joins will not have a unique key.
tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
val grouped = tableEnv.sqlQuery(s"""
FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
FROM ${columnPrefix}_splatted
GROUP BY id_splatted
return table
.join(grouped, $"id" === $"id_splatted")

output = new DataTypeHint(
"(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
class SplatRolesFunc extends ScalarFunction {
def eval(roles: Array[String], id: java.lang.Long): Row = {
val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
return Row.of(id, isAdmin, isTeacher, isStudent, isParent)

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =

>> Hello,
>> I'm running into an issue where my execution plan is creating the same
>> exact join operator multiple times simply because the subsequent operator
>> filters on a different boolean value. This is a massive duplication of
>> storage and work. The filtered operators which follow result in only a
>> small set of elements filtered out per set too.
>> eg. of two separate operators that are equal
>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>> Which are entirely the same datasets being processed.
>> The first one points to
>> GroupAggregate(groupBy=[user_id], select=[user_id,
>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>> TMP_0.f0 AS admin_organization_ids])
>> The second one points to
>> GroupAggregate(groupBy=[user_id], select=[user_id,
>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>> TMP_0.f0 AS teacher_organization_ids])
>> And these are both intersecting sets of data though slightly different. I
>> don't see why that would make the 1 join from before split into 2 though.
>> There's even a case where I'm seeing a join tripled.
>> Is there a good reason why this should happen? Is there a way to tell
>> flink to not duplicate operators where it doesn't need to?
>> Thanks!
