[ 
https://issues.apache.org/jira/browse/BEAM-4076?focusedWorklogId=424286&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-424286
 ]

ASF GitHub Bot logged work on BEAM-4076:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Apr/20 16:50
            Start Date: 17/Apr/20 16:50
    Worklog Time Spent: 10m 
      Work Description: kennknowles commented on pull request #11041: 
[BEAM-4076] Use beam join api in sql
URL: https://github.com/apache/beam/pull/11041#discussion_r410346652
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
 ##########
 @@ -45,111 +41,35 @@
 /** Collections of {@code PTransform} and {@code DoFn} used to perform JOIN 
operation. */
 public class BeamJoinTransforms {
 
-  /** A {@code SimpleFunction} to extract join fields from the specified row. 
*/
-  public static class ExtractJoinFields extends SimpleFunction<Row, KV<Row, 
Row>> {
-    private final List<SerializableRexNode> joinColumns;
-    private final Schema schema;
-    private int leftRowColumnCount;
-
-    public ExtractJoinFields(
-        boolean isLeft,
-        List<Pair<RexNode, RexNode>> joinColumns,
-        Schema schema,
-        int leftRowColumnCount) {
-      this.joinColumns =
-          joinColumns.stream()
-              .map(pair -> SerializableRexNode.builder(isLeft ? pair.left : 
pair.right).build())
-              .collect(toList());
-      this.schema = schema;
-      this.leftRowColumnCount = leftRowColumnCount;
-    }
-
-    @Override
-    public KV<Row, Row> apply(Row input) {
-      Row row =
-          joinColumns.stream()
-              .map(v -> getValue(v, input, leftRowColumnCount))
-              .collect(toRow(schema));
-      return KV.of(row, input);
-    }
-
-    @SuppressWarnings("unused")
-    private Schema.Field toField(Schema schema, Integer fieldIndex) {
-      Schema.Field original = schema.getField(fieldIndex);
-      return original.withName("c" + fieldIndex);
-    }
-
-    private Object getValue(
-        SerializableRexNode serializableRexNode, Row input, int 
leftRowColumnCount) {
-      if (serializableRexNode instanceof SerializableRexInputRef) {
-        return input.getValue(
-            ((SerializableRexInputRef) serializableRexNode).getIndex() - 
leftRowColumnCount);
-      } else { // It can only be SerializableFieldAccess.
-        List<Integer> indexes = ((SerializableRexFieldAccess) 
serializableRexNode).getIndexes();
-        // retrieve row based on the first column reference.
-        Row rowField = input.getValue(indexes.get(0) - leftRowColumnCount);
-        for (int i = 1; i < indexes.size() - 1; i++) {
-          rowField = rowField.getRow(indexes.get(i));
-        }
-        return rowField.getValue(indexes.get(indexes.size() - 1));
-      }
-    }
+  public static FieldAccessDescriptor getJoinColumns(
+      boolean isLeft,
+      List<Pair<RexNode, RexNode>> joinColumns,
+      int leftRowColumnCount,
+      Schema schema) {
+    List<SerializableRexNode> joinColumnsBuilt =
+        joinColumns.stream()
+            .map(pair -> SerializableRexNode.builder(isLeft ? pair.left : 
pair.right).build())
+            .collect(toList());
+    return FieldAccessDescriptor.union(
+        joinColumnsBuilt.stream()
+            .map(v -> getJoinColumn(v, leftRowColumnCount).resolve(schema))
+            .collect(Collectors.toList()));
   }
 
-  /** A {@code DoFn} which implement the sideInput-JOIN. */
-  public static class SideInputJoinDoFn extends DoFn<KV<Row, Row>, Row> {
-    private final PCollectionView<Map<Row, Iterable<Row>>> sideInputView;
-    private final JoinRelType joinType;
-    private final Row rightNullRow;
-    private final boolean swap;
-    private final Schema schema;
-
-    public SideInputJoinDoFn(
-        JoinRelType joinType,
-        Row rightNullRow,
-        PCollectionView<Map<Row, Iterable<Row>>> sideInputView,
-        boolean swap,
-        Schema schema) {
-      this.joinType = joinType;
-      this.rightNullRow = rightNullRow;
-      this.sideInputView = sideInputView;
-      this.swap = swap;
-      this.schema = schema;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      Row key = context.element().getKey();
-      Row leftRow = context.element().getValue();
-      Map<Row, Iterable<Row>> key2Rows = context.sideInput(sideInputView);
-      Iterable<Row> rightRowsIterable = key2Rows.get(key);
-
-      if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) 
{
-        for (Row aRightRowsIterable : rightRowsIterable) {
-          context.output(combineTwoRowsIntoOne(leftRow, aRightRowsIterable, 
swap, schema));
-        }
-      } else {
-        if (joinType == JoinRelType.LEFT) {
-          context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap, 
schema));
-        }
+  private static FieldAccessDescriptor getJoinColumn(
 
 Review comment:
   This actually hardcodes a bad assumption a bit further than it was before: 
that the join is only on columns. We want to move in the other direction, and 
allow join conditions to be more general RexNodes, many of which still work for 
CoGBK and side input lookup joins. This is BEAM-6112.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 424286)
    Time Spent: 23h  (was: 22h 50m)

> Schema followups
> ----------------
>
>                 Key: BEAM-4076
>                 URL: https://issues.apache.org/jira/browse/BEAM-4076
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model, dsl-sql, sdk-java-core
>            Reporter: Kenneth Knowles
>            Priority: Major
>          Time Spent: 23h
>  Remaining Estimate: 0h
>
> This umbrella bug contains subtasks with followups for Beam schemas, which 
> were moved from SQL to the core Java SDK and made to be type-name-based 
> rather than coder based.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to