gemini-code-assist[bot] commented on code in PR #38868:
URL: https://github.com/apache/beam/pull/38868#discussion_r3398622735


##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java:
##########
@@ -193,11 +194,41 @@ private PCollection<Row> standardJoin(
     }
 
     // Flatten the lhs and rhs fields into a single row.
+    FieldAccessDescriptor flattenFields =
+        FieldAccessDescriptor.withFieldNames(
+            org.apache.beam.sdk.schemas.transforms.Join.LHS_TAG + ".*",
+            org.apache.beam.sdk.schemas.transforms.Join.RHS_TAG + ".*");
+
+    // Reconcile the desired output schema (which carries the Calcite-derived 
field names and the
+    // correct top-level, outer-join-aware nullability) with the types the 
data actually carries.
+    // Calcite's join row-type derivation can report a different nullability 
than the rows hold --
+    // in particular it can mark the fields nested inside a struct column as 
nullable even when the
+    // joined rows still keep them NOT NULL. Forcing the Calcite schema 
verbatim then trips Select's
+    // type-equality guard. The flatten emits the lhs struct's fields followed 
by the rhs struct's
+    // fields, so walk the Calcite output positionally against those data 
fields, keeping Calcite's
+    // names and top-level nullability but adopting the data's (possibly 
deeper) field types.
+    Schema calciteSchema = CalciteUtils.toSchema(getRowType());
+    Schema joinedSchema = joinedRows.getSchema();
+    List<Schema.Field> dataFields = new java.util.ArrayList<>();
+    dataFields.addAll(
+        
Preconditions.checkArgumentNotNull(joinedSchema.getField(0).getType().getRowSchema())
+            .getFields());
+    dataFields.addAll(
+        
Preconditions.checkArgumentNotNull(joinedSchema.getField(1).getType().getRowSchema())
+            .getFields());
+    Schema.Builder reconciled = Schema.builder();
+    for (int i = 0; i < calciteSchema.getFieldCount(); i++) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To prevent an unexpected `IndexOutOfBoundsException` during the loop, it is 
safer to defensively check that the field count of `calciteSchema` matches the 
size of `dataFields` before iterating.
   
   ```suggestion
       if (calciteSchema.getFieldCount() != dataFields.size()) {
         throw new IllegalStateException(
             String.format(
                 "Field count mismatch: Calcite schema has %d fields, but data 
schema has %d fields",
                 calciteSchema.getFieldCount(), dataFields.size()));
       }
   
       Schema.Builder reconciled = Schema.builder();
       for (int i = 0; i < calciteSchema.getFieldCount(); i++) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to