abstractdog commented on code in PR #6456:
URL: https://github.com/apache/hive/pull/6456#discussion_r3372390095


##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.query;
+
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.plan.*;

Review Comment:
   we tend to extract wildcard imports to one-by-one



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.query;
+
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;

Review Comment:
   we tend to extract wildcard imports to one-by-one



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.query;
+
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link JoinOperationMetadataResolver}.
+ * <p>
+ * All Hive operator/plan objects are mocked so the test has no Hadoop/Hive
+ * runtime dependency.
+ */
+class TestJoinOperationMetadataResolver {
+
+  /**
+   * Tracks the intermediate "schema-parent" operator created by 
rsWithColumnKey/rsWithColumnKeys
+   * so that attachTso() can wire the TSO into the correct position in the 
chain.
+   */
+  private final Map<ReduceSinkOperator, Operator<OperatorDesc>> schemaParents 
= new HashMap<>();
+  private JoinOperationMetadataResolver resolver;
+
+  @BeforeEach
+  void setUp() {
+    resolver = new JoinOperationMetadataResolver();
+    schemaParents.clear();
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(null);
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames(), "keyNames should remain null when 
parent list is null");
+    assertNull(resolver.getTableAliases(), "tableAliases should remain null 
when parent list is null");
+  }
+
+  @Test
+  void resolveJoinMetadata_emptyParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames());
+    assertNull(resolver.getTableAliases());
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParentSlot_producesUnknown() {
+    JoinOperator join = mock(JoinOperator.class);
+    
when(join.getParentOperators()).thenReturn(Collections.singletonList(null));
+
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+    assertEquals("unknown", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveTableAlias_prefersTsoAlias_overTableName() {
+    RowSchema schema = mock(RowSchema.class);
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "orders");
+
+    JoinOperator join = joinOp(rs);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("a", resolver.getTableAliases()[0], "Should prefer the TSO 
alias over the physical table name");
+  }
+
+  @Test
+  void resolveTableAlias_fallsBackToTableName_whenAliasBlank() {
+    RowSchema schema = mock(RowSchema.class);
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "", "orders");
+
+    JoinOperator join = joinOp(rs);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("orders", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveTableAlias_unknownWhenNoTsoFound() {
+    // Parent is not a ReduceSinkOperator (no TS reachable)
+    Operator<?> oddParent = mock(Operator.class);
+    when(oddParent.getParentOperators()).thenReturn(Collections.emptyList());
+
+    JoinOperator join = joinOp(oddParent);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("unknown", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_resolvesToSchemaAlias() {
+    ColumnInfo ci = mock(ColumnInfo.class);
+    when(ci.getAlias()).thenReturn("key");
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+
+    // attach a TSO so table alias is set too
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("key", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_fallsBackToInternalName_whenNoSchemaAlias() {
+    ColumnInfo ci = mock(ColumnInfo.class);
+    when(ci.getAlias()).thenReturn(""); // empty alias
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("_col0", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_fallsBackToInternalName_whenColumnInfoNull() {
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(null);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("_col0", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_unknownWhenInternalNameNull() {
+    ExprNodeColumnDesc keyExpr = mock(ExprNodeColumnDesc.class);
+    when(keyExpr.getColumn()).thenReturn(null);
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr));
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_exprKey_usesExprString() {
+    ReduceSinkOperator rs = rsWithExprKey("upper(val)");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("upper(val)", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_exprKey_unknownWhenExprStringNull() {
+    ExprNodeDesc keyExpr = mock(ExprNodeDesc.class);
+    when(keyExpr.getExprString()).thenReturn(null);
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr));
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenParentNotReduceSink() {
+    Operator<?> filterOp = mock(Operator.class);
+    when(filterOp.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(filterOp));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenReduceSinkConfNull() {
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(null);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenKeyColsNull() {
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(null);
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenKeyColsEmpty() {
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.emptyList());
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_compoundKey_joinedWithCommaSpace() {
+    ColumnInfo ci1 = mock(ColumnInfo.class);
+    when(ci1.getAlias()).thenReturn("dept_id");
+    ColumnInfo ci2 = mock(ColumnInfo.class);
+    when(ci2.getAlias()).thenReturn("year");
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci1);
+    when(schema.getColumnInfo("_col1")).thenReturn(ci2);
+
+    ReduceSinkOperator rs = rsWithColumnKeys(Arrays.asList("_col0", "_col1"), 
schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("dept_id, year", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveJoinMetadata_twoPositions_populatesBothSlots() {
+    ColumnInfo ci0 = mock(ColumnInfo.class);
+    when(ci0.getAlias()).thenReturn("id");
+    RowSchema schema0 = mock(RowSchema.class);
+    when(schema0.getColumnInfo("_col0")).thenReturn(ci0);
+
+    ReduceSinkOperator rs0 = rsWithColumnKey("_col0", schema0);
+    attachTso(rs0, "a", "employees");
+
+    ColumnInfo ci1 = mock(ColumnInfo.class);
+    when(ci1.getAlias()).thenReturn("emp_id");
+    RowSchema schema1 = mock(RowSchema.class);
+    when(schema1.getColumnInfo("_col0")).thenReturn(ci1);
+
+    ReduceSinkOperator rs1 = rsWithColumnKey("_col0", schema1);
+    attachTso(rs1, "b", "salaries");
+
+    resolver.resolveJoinMetadata(joinOp(rs0, rs1));
+
+    assertArrayEquals(new String[] { "id", "emp_id" }, resolver.getKeyNames());
+    assertArrayEquals(new String[] { "a", "b" }, resolver.getTableAliases());
+  }
+
+  @Test
+  void resolveJoinMetadata_threePositions_populatesAllThreeSlots() {
+    String[] tables = { "employees", "salaries", "departments" };
+    String[] aliases = { "e", "s", "d" };
+    String[] internalCols = { "_col0", "_col0", "_col0" };
+    String[] expectedKeys = { "emp_id", "sid", "dept_id" };
+
+    ReduceSinkOperator[] rsList = new ReduceSinkOperator[3];
+    for (int i = 0; i < 3; i++) {

Review Comment:
   instead of 3, use expectedKeys array size



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java:
##########
@@ -222,6 +240,21 @@ private Set<Integer> getFetchInputAtCloseList() {
     return retval;
   }
 
+  private void initSkewJoinNames(int maxAlias) {
+    joinSkewKeyColumns = new String[maxAlias];
+    joinSkewTableAliases = new String[maxAlias];
+
+    String[] descKeyNames    = conf.getSkewJoinKeyNames();
+    String[] descTableAliases = conf.getSkewJoinTableAliases();
+
+    for (int pos = 0; pos < maxAlias; pos++) {
+      joinSkewKeyColumns[pos]  = (descKeyNames    != null && pos < 
descKeyNames.length

Review Comment:
   I can still see the same pattern here, can you double-check, please?
   
   ```
         joinSkewKeyColumns[pos]  = (descKeyNames    != null && pos < 
descKeyNames.length
             && descKeyNames[pos] != null)    ? descKeyNames[pos]    : 
"unknown";
   ```



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.query;
+
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link JoinOperationMetadataResolver}.
+ * <p>
+ * All Hive operator/plan objects are mocked so the test has no Hadoop/Hive
+ * runtime dependency.
+ */
+class TestJoinOperationMetadataResolver {
+
+  /**
+   * Tracks the intermediate "schema-parent" operator created by 
rsWithColumnKey/rsWithColumnKeys
+   * so that attachTso() can wire the TSO into the correct position in the 
chain.
+   */
+  private final Map<ReduceSinkOperator, Operator<OperatorDesc>> schemaParents 
= new HashMap<>();
+  private JoinOperationMetadataResolver resolver;
+
+  @BeforeEach
+  void setUp() {
+    resolver = new JoinOperationMetadataResolver();
+    schemaParents.clear();
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(null);
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames(), "keyNames should remain null when 
parent list is null");
+    assertNull(resolver.getTableAliases(), "tableAliases should remain null 
when parent list is null");
+  }
+
+  @Test
+  void resolveJoinMetadata_emptyParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames());
+    assertNull(resolver.getTableAliases());

Review Comment:
   similarly to `resolveJoinMetadata_nullParents_leavesArraysNull`, it might 
want to have comments for parity



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.query;
+
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link JoinOperationMetadataResolver}.
+ * <p>
+ * All Hive operator/plan objects are mocked so the test has no Hadoop/Hive
+ * runtime dependency.
+ */
+class TestJoinOperationMetadataResolver {
+
+  /**
+   * Tracks the intermediate "schema-parent" operator created by 
rsWithColumnKey/rsWithColumnKeys
+   * so that attachTso() can wire the TSO into the correct position in the 
chain.
+   */
+  private final Map<ReduceSinkOperator, Operator<OperatorDesc>> schemaParents 
= new HashMap<>();
+  private JoinOperationMetadataResolver resolver;
+
+  @BeforeEach
+  void setUp() {
+    resolver = new JoinOperationMetadataResolver();
+    schemaParents.clear();
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(null);
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames(), "keyNames should remain null when 
parent list is null");
+    assertNull(resolver.getTableAliases(), "tableAliases should remain null 
when parent list is null");
+  }
+
+  @Test
+  void resolveJoinMetadata_emptyParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames());
+    assertNull(resolver.getTableAliases());
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParentSlot_producesUnknown() {
+    JoinOperator join = mock(JoinOperator.class);
+    
when(join.getParentOperators()).thenReturn(Collections.singletonList(null));
+
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+    assertEquals("unknown", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveTableAlias_prefersTsoAlias_overTableName() {
+    RowSchema schema = mock(RowSchema.class);
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "orders");
+
+    JoinOperator join = joinOp(rs);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("a", resolver.getTableAliases()[0], "Should prefer the TSO 
alias over the physical table name");
+  }
+
+  @Test
+  void resolveTableAlias_fallsBackToTableName_whenAliasBlank() {
+    RowSchema schema = mock(RowSchema.class);
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "", "orders");
+
+    JoinOperator join = joinOp(rs);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("orders", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveTableAlias_unknownWhenNoTsoFound() {
+    // Parent is not a ReduceSinkOperator (no TS reachable)
+    Operator<?> oddParent = mock(Operator.class);
+    when(oddParent.getParentOperators()).thenReturn(Collections.emptyList());
+
+    JoinOperator join = joinOp(oddParent);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("unknown", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_resolvesToSchemaAlias() {
+    ColumnInfo ci = mock(ColumnInfo.class);
+    when(ci.getAlias()).thenReturn("key");
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+
+    // attach a TSO so table alias is set too
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("key", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_fallsBackToInternalName_whenNoSchemaAlias() {
+    ColumnInfo ci = mock(ColumnInfo.class);
+    when(ci.getAlias()).thenReturn(""); // empty alias
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("_col0", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_fallsBackToInternalName_whenColumnInfoNull() {
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(null);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("_col0", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_unknownWhenInternalNameNull() {
+    ExprNodeColumnDesc keyExpr = mock(ExprNodeColumnDesc.class);
+    when(keyExpr.getColumn()).thenReturn(null);
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr));
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_exprKey_usesExprString() {
+    ReduceSinkOperator rs = rsWithExprKey("upper(val)");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("upper(val)", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_exprKey_unknownWhenExprStringNull() {
+    ExprNodeDesc keyExpr = mock(ExprNodeDesc.class);
+    when(keyExpr.getExprString()).thenReturn(null);
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr));
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenParentNotReduceSink() {
+    Operator<?> filterOp = mock(Operator.class);
+    when(filterOp.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(filterOp));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenReduceSinkConfNull() {
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(null);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenKeyColsNull() {
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(null);
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenKeyColsEmpty() {
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.emptyList());
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_compoundKey_joinedWithCommaSpace() {
+    ColumnInfo ci1 = mock(ColumnInfo.class);
+    when(ci1.getAlias()).thenReturn("dept_id");
+    ColumnInfo ci2 = mock(ColumnInfo.class);
+    when(ci2.getAlias()).thenReturn("year");
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci1);
+    when(schema.getColumnInfo("_col1")).thenReturn(ci2);
+
+    ReduceSinkOperator rs = rsWithColumnKeys(Arrays.asList("_col0", "_col1"), 
schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("dept_id, year", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveJoinMetadata_twoPositions_populatesBothSlots() {

Review Comment:
   does this cover anything else than 
`resolveJoinMetadata_threePositions_populatesAllThreeSlots`? it can be removed 
if it doesn't



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.query;
+
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;

Review Comment:
   we tend to extract wildcard imports to one-by-one



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/SkewedMergeJoinMonitor.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SkewedMergeJoinMonitor implements SkewedJoinMonitor {
+
+  private transient long mergeJoinSkewThreshold;
+  private transient boolean mergeJoinSkewAbort;
+  private transient long mergeJoinSkewCheckInterval;
+  private transient boolean[] skewedKeyFlagged;

Review Comment:
   all of these can become final



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.query;
+
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link JoinOperationMetadataResolver}.
+ * <p>
+ * All Hive operator/plan objects are mocked so the test has no Hadoop/Hive
+ * runtime dependency.
+ */
+class TestJoinOperationMetadataResolver {
+
+  /**
+   * Tracks the intermediate "schema-parent" operator created by 
rsWithColumnKey/rsWithColumnKeys
+   * so that attachTso() can wire the TSO into the correct position in the 
chain.
+   */
+  private final Map<ReduceSinkOperator, Operator<OperatorDesc>> schemaParents 
= new HashMap<>();
+  private JoinOperationMetadataResolver resolver;
+
+  @BeforeEach
+  void setUp() {
+    resolver = new JoinOperationMetadataResolver();
+    schemaParents.clear();
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(null);
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames(), "keyNames should remain null when 
parent list is null");
+    assertNull(resolver.getTableAliases(), "tableAliases should remain null 
when parent list is null");
+  }
+
+  @Test
+  void resolveJoinMetadata_emptyParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames());
+    assertNull(resolver.getTableAliases());
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParentSlot_producesUnknown() {
+    JoinOperator join = mock(JoinOperator.class);
+    
when(join.getParentOperators()).thenReturn(Collections.singletonList(null));
+
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+    assertEquals("unknown", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveTableAlias_prefersTsoAlias_overTableName() {
+    RowSchema schema = mock(RowSchema.class);
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "orders");
+
+    JoinOperator join = joinOp(rs);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("a", resolver.getTableAliases()[0], "Should prefer the TSO 
alias over the physical table name");
+  }
+
+  @Test
+  void resolveTableAlias_fallsBackToTableName_whenAliasBlank() {
+    RowSchema schema = mock(RowSchema.class);
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "", "orders");
+
+    JoinOperator join = joinOp(rs);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("orders", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveTableAlias_unknownWhenNoTsoFound() {
+    // Parent is not a ReduceSinkOperator (no TS reachable)
+    Operator<?> oddParent = mock(Operator.class);
+    when(oddParent.getParentOperators()).thenReturn(Collections.emptyList());
+
+    JoinOperator join = joinOp(oddParent);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("unknown", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_resolvesToSchemaAlias() {
+    ColumnInfo ci = mock(ColumnInfo.class);
+    when(ci.getAlias()).thenReturn("key");
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+
+    // attach a TSO so table alias is set too
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("key", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_fallsBackToInternalName_whenNoSchemaAlias() {
+    ColumnInfo ci = mock(ColumnInfo.class);
+    when(ci.getAlias()).thenReturn(""); // empty alias
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("_col0", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_fallsBackToInternalName_whenColumnInfoNull() {
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(null);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("_col0", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_unknownWhenInternalNameNull() {
+    ExprNodeColumnDesc keyExpr = mock(ExprNodeColumnDesc.class);
+    when(keyExpr.getColumn()).thenReturn(null);
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr));
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_exprKey_usesExprString() {
+    ReduceSinkOperator rs = rsWithExprKey("upper(val)");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("upper(val)", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_exprKey_unknownWhenExprStringNull() {
+    ExprNodeDesc keyExpr = mock(ExprNodeDesc.class);
+    when(keyExpr.getExprString()).thenReturn(null);
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr));
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenParentNotReduceSink() {
+    Operator<?> filterOp = mock(Operator.class);

Review Comment:
   this looks like a duplicate of 
`resolveJoinMetadata_nullParentSlot_producesUnknown`



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.query;
+
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link JoinOperationMetadataResolver}.
+ * <p>
+ * All Hive operator/plan objects are mocked so the test has no Hadoop/Hive
+ * runtime dependency.
+ */
+class TestJoinOperationMetadataResolver {
+
+  /**
+   * Tracks the intermediate "schema-parent" operator created by 
rsWithColumnKey/rsWithColumnKeys
+   * so that attachTso() can wire the TSO into the correct position in the 
chain.
+   */
+  private final Map<ReduceSinkOperator, Operator<OperatorDesc>> schemaParents 
= new HashMap<>();
+  private JoinOperationMetadataResolver resolver;
+
+  @BeforeEach
+  void setUp() {
+    resolver = new JoinOperationMetadataResolver();
+    schemaParents.clear();
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(null);
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames(), "keyNames should remain null when 
parent list is null");
+    assertNull(resolver.getTableAliases(), "tableAliases should remain null 
when parent list is null");
+  }
+
+  @Test
+  void resolveJoinMetadata_emptyParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames());
+    assertNull(resolver.getTableAliases());
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParentSlot_producesUnknown() {
+    JoinOperator join = mock(JoinOperator.class);
+    
when(join.getParentOperators()).thenReturn(Collections.singletonList(null));
+
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+    assertEquals("unknown", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveTableAlias_prefersTsoAlias_overTableName() {
+    RowSchema schema = mock(RowSchema.class);
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "orders");
+
+    JoinOperator join = joinOp(rs);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("a", resolver.getTableAliases()[0], "Should prefer the TSO 
alias over the physical table name");
+  }
+
+  @Test
+  void resolveTableAlias_fallsBackToTableName_whenAliasBlank() {
+    RowSchema schema = mock(RowSchema.class);
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "", "orders");
+
+    JoinOperator join = joinOp(rs);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("orders", resolver.getTableAliases()[0]);

Review Comment:
   extra message like: "Should fall back to table name when alias is blank"



##########
ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.metainfo.query;
+
+import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.plan.*;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link JoinOperationMetadataResolver}.
+ * <p>
+ * All Hive operator/plan objects are mocked so the test has no Hadoop/Hive
+ * runtime dependency.
+ */
+class TestJoinOperationMetadataResolver {
+
+  /**
+   * Tracks the intermediate "schema-parent" operator created by 
rsWithColumnKey/rsWithColumnKeys
+   * so that attachTso() can wire the TSO into the correct position in the 
chain.
+   */
+  private final Map<ReduceSinkOperator, Operator<OperatorDesc>> schemaParents 
= new HashMap<>();
+  private JoinOperationMetadataResolver resolver;
+
+  @BeforeEach
+  void setUp() {
+    resolver = new JoinOperationMetadataResolver();
+    schemaParents.clear();
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(null);
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames(), "keyNames should remain null when 
parent list is null");
+    assertNull(resolver.getTableAliases(), "tableAliases should remain null 
when parent list is null");
+  }
+
+  @Test
+  void resolveJoinMetadata_emptyParents_leavesArraysNull() {
+    JoinOperator join = mock(JoinOperator.class);
+    when(join.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(join);
+
+    assertNull(resolver.getKeyNames());
+    assertNull(resolver.getTableAliases());
+  }
+
+  @Test
+  void resolveJoinMetadata_nullParentSlot_producesUnknown() {
+    JoinOperator join = mock(JoinOperator.class);
+    
when(join.getParentOperators()).thenReturn(Collections.singletonList(null));
+
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+    assertEquals("unknown", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveTableAlias_prefersTsoAlias_overTableName() {
+    RowSchema schema = mock(RowSchema.class);
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "orders");
+
+    JoinOperator join = joinOp(rs);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("a", resolver.getTableAliases()[0], "Should prefer the TSO 
alias over the physical table name");
+  }
+
+  @Test
+  void resolveTableAlias_fallsBackToTableName_whenAliasBlank() {
+    RowSchema schema = mock(RowSchema.class);
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "", "orders");
+
+    JoinOperator join = joinOp(rs);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("orders", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveTableAlias_unknownWhenNoTsoFound() {
+    // Parent is not a ReduceSinkOperator (no TS reachable)
+    Operator<?> oddParent = mock(Operator.class);
+    when(oddParent.getParentOperators()).thenReturn(Collections.emptyList());
+
+    JoinOperator join = joinOp(oddParent);
+    resolver.resolveJoinMetadata(join);
+
+    assertEquals("unknown", resolver.getTableAliases()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_resolvesToSchemaAlias() {
+    ColumnInfo ci = mock(ColumnInfo.class);
+    when(ci.getAlias()).thenReturn("key");
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+
+    // attach a TSO so table alias is set too
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("key", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_fallsBackToInternalName_whenNoSchemaAlias() {
+    ColumnInfo ci = mock(ColumnInfo.class);
+    when(ci.getAlias()).thenReturn(""); // empty alias
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("_col0", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_fallsBackToInternalName_whenColumnInfoNull() {
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(null);
+
+    ReduceSinkOperator rs = rsWithColumnKey("_col0", schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("_col0", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_columnRef_unknownWhenInternalNameNull() {
+    ExprNodeColumnDesc keyExpr = mock(ExprNodeColumnDesc.class);
+    when(keyExpr.getColumn()).thenReturn(null);
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr));
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_exprKey_usesExprString() {
+    ReduceSinkOperator rs = rsWithExprKey("upper(val)");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("upper(val)", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_exprKey_unknownWhenExprStringNull() {
+    ExprNodeDesc keyExpr = mock(ExprNodeDesc.class);
+    when(keyExpr.getExprString()).thenReturn(null);
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr));
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenParentNotReduceSink() {
+    Operator<?> filterOp = mock(Operator.class);
+    when(filterOp.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(filterOp));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenReduceSinkConfNull() {
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(null);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenKeyColsNull() {
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(null);
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_unknownWhenKeyColsEmpty() {
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.emptyList());
+
+    ReduceSinkOperator rs = mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    when(rs.getParentOperators()).thenReturn(Collections.emptyList());
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("unknown", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveKeyName_compoundKey_joinedWithCommaSpace() {
+    ColumnInfo ci1 = mock(ColumnInfo.class);
+    when(ci1.getAlias()).thenReturn("dept_id");
+    ColumnInfo ci2 = mock(ColumnInfo.class);
+    when(ci2.getAlias()).thenReturn("year");
+
+    RowSchema schema = mock(RowSchema.class);
+    when(schema.getColumnInfo("_col0")).thenReturn(ci1);
+    when(schema.getColumnInfo("_col1")).thenReturn(ci2);
+
+    ReduceSinkOperator rs = rsWithColumnKeys(Arrays.asList("_col0", "_col1"), 
schema);
+    attachTso(rs, "a", "t");
+
+    resolver.resolveJoinMetadata(joinOp(rs));
+
+    assertEquals("dept_id, year", resolver.getKeyNames()[0]);
+  }
+
+  @Test
+  void resolveJoinMetadata_twoPositions_populatesBothSlots() {
+    ColumnInfo ci0 = mock(ColumnInfo.class);
+    when(ci0.getAlias()).thenReturn("id");
+    RowSchema schema0 = mock(RowSchema.class);
+    when(schema0.getColumnInfo("_col0")).thenReturn(ci0);
+
+    ReduceSinkOperator rs0 = rsWithColumnKey("_col0", schema0);
+    attachTso(rs0, "a", "employees");
+
+    ColumnInfo ci1 = mock(ColumnInfo.class);
+    when(ci1.getAlias()).thenReturn("emp_id");
+    RowSchema schema1 = mock(RowSchema.class);
+    when(schema1.getColumnInfo("_col0")).thenReturn(ci1);
+
+    ReduceSinkOperator rs1 = rsWithColumnKey("_col0", schema1);
+    attachTso(rs1, "b", "salaries");
+
+    resolver.resolveJoinMetadata(joinOp(rs0, rs1));
+
+    assertArrayEquals(new String[] { "id", "emp_id" }, resolver.getKeyNames());
+    assertArrayEquals(new String[] { "a", "b" }, resolver.getTableAliases());
+  }
+
+  @Test
+  void resolveJoinMetadata_threePositions_populatesAllThreeSlots() {
+    String[] tables = { "employees", "salaries", "departments" };
+    String[] aliases = { "e", "s", "d" };
+    String[] internalCols = { "_col0", "_col0", "_col0" };
+    String[] expectedKeys = { "emp_id", "sid", "dept_id" };
+
+    ReduceSinkOperator[] rsList = new ReduceSinkOperator[3];
+    for (int i = 0; i < 3; i++) {
+      ColumnInfo ci = mock(ColumnInfo.class);
+      when(ci.getAlias()).thenReturn(expectedKeys[i]);
+
+      RowSchema schema = mock(RowSchema.class);
+      when(schema.getColumnInfo(internalCols[i])).thenReturn(ci);
+
+      ReduceSinkOperator rs = rsWithColumnKey(internalCols[i], schema);
+      attachTso(rs, aliases[i], tables[i]);
+      rsList[i] = rs;
+    }
+
+    resolver.resolveJoinMetadata(joinOp(rsList[0], rsList[1], rsList[2]));
+
+    assertArrayEquals(expectedKeys, resolver.getKeyNames());
+    assertArrayEquals(aliases, resolver.getTableAliases());
+  }
+
+  @Test
+  void resolveJoinMetadata_arraySizeMatchesParentCount() {
+    ReduceSinkOperator rs0 = rsWithExprKey("id");
+    ReduceSinkOperator rs1 = rsWithExprKey("id");
+    ReduceSinkOperator rs2 = rsWithExprKey("id");
+
+    resolver.resolveJoinMetadata(joinOp(rs0, rs1, rs2));
+
+    assertEquals(3, resolver.getKeyNames().length);
+    assertEquals(3, resolver.getTableAliases().length);
+  }
+
+  // helpers
+
+  /**
+   * Build a mocked ReduceSinkOperator whose key list is a single 
ExprNodeColumnDesc.
+   * <p>
+   * The RS parent is a generic operator that carries {@code inputSchema}.
+   * When {@link #attachTso} is later called on the RS, it places the TSO as 
the
+   * parent of <em>that schema operator</em> (not directly as parent of RS), so
+   * that both the schema and the upstream TSO remain reachable for traversal.
+   */
+  private ReduceSinkOperator rsWithColumnKey(String internalColName, RowSchema 
inputSchema) {
+    ExprNodeColumnDesc keyExpr = mock(ExprNodeColumnDesc.class);
+    when(keyExpr.getColumn()).thenReturn(internalColName);
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr));
+
+    // schemaParent is the direct parent of RS; it exposes the input schema and
+    // will have the TSO attached as its own parent by attachTso().
+    @SuppressWarnings("unchecked") Operator<OperatorDesc> schemaParent = 
mock(Operator.class);
+    when(schemaParent.getSchema()).thenReturn(inputSchema);
+    
when(schemaParent.getParentOperators()).thenReturn(Collections.emptyList());
+
+    @SuppressWarnings("unchecked") ReduceSinkOperator rs = 
mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    
when(rs.getParentOperators()).thenReturn(Collections.singletonList(schemaParent));
+    // store schemaParent so attachTso() can wire the TSO into it
+    schemaParents.put(rs, schemaParent);
+    return rs;
+  }
+
+  /**
+   * Build a mocked ReduceSinkOperator with multiple key columns.
+   */
+  private ReduceSinkOperator rsWithColumnKeys(List<String> internalColNames, 
RowSchema inputSchema) {
+    List<ExprNodeDesc> keyExprs = new java.util.ArrayList<>();
+    for (String name : internalColNames) {
+      ExprNodeColumnDesc keyExpr = mock(ExprNodeColumnDesc.class);
+      when(keyExpr.getColumn()).thenReturn(name);
+      keyExprs.add(keyExpr);
+    }
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(keyExprs);
+
+    @SuppressWarnings("unchecked") Operator<OperatorDesc> schemaParent = 
mock(Operator.class);
+    when(schemaParent.getSchema()).thenReturn(inputSchema);
+    
when(schemaParent.getParentOperators()).thenReturn(Collections.emptyList());
+
+    @SuppressWarnings("unchecked") ReduceSinkOperator rs = 
mock(ReduceSinkOperator.class);
+    when(rs.getConf()).thenReturn(rsConf);
+    
when(rs.getParentOperators()).thenReturn(Collections.singletonList(schemaParent));
+    schemaParents.put(rs, schemaParent);
+    return rs;
+  }
+
+  /**
+   * Build a mocked ReduceSinkOperator whose key is a generic (non-column) 
expression.
+   */
+  private ReduceSinkOperator rsWithExprKey(String exprString) {
+    ExprNodeDesc keyExpr = mock(ExprNodeDesc.class);
+    when(keyExpr.getExprString()).thenReturn(exprString);
+
+    ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class);
+    when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr));
+
+    @SuppressWarnings("unchecked") ReduceSinkOperator rs = 
mock(ReduceSinkOperator.class);

Review Comment:
   most style guides and formatters (including Google Java Format) will place 
the annotation on its own line for readability, I recommend doing the same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to