[ 
https://issues.apache.org/jira/browse/BEAM-4076?focusedWorklogId=424287&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-424287
 ]

ASF GitHub Bot logged work on BEAM-4076:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Apr/20 16:51
            Start Date: 17/Apr/20 16:51
    Worklog Time Spent: 10m 
      Work Description: kennknowles commented on pull request #11041: 
[BEAM-4076] Use beam join api in sql
URL: https://github.com/apache/beam/pull/11041#discussion_r410347297
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
 ##########
 @@ -45,111 +41,35 @@
 /** Collections of {@code PTransform} and {@code DoFn} used to perform JOIN 
operation. */
 public class BeamJoinTransforms {
 
-  /** A {@code SimpleFunction} to extract join fields from the specified row. 
*/
-  public static class ExtractJoinFields extends SimpleFunction<Row, KV<Row, 
Row>> {
-    private final List<SerializableRexNode> joinColumns;
-    private final Schema schema;
-    private int leftRowColumnCount;
-
-    public ExtractJoinFields(
-        boolean isLeft,
-        List<Pair<RexNode, RexNode>> joinColumns,
-        Schema schema,
-        int leftRowColumnCount) {
-      this.joinColumns =
-          joinColumns.stream()
-              .map(pair -> SerializableRexNode.builder(isLeft ? pair.left : 
pair.right).build())
-              .collect(toList());
-      this.schema = schema;
-      this.leftRowColumnCount = leftRowColumnCount;
-    }
-
-    @Override
-    public KV<Row, Row> apply(Row input) {
-      Row row =
-          joinColumns.stream()
-              .map(v -> getValue(v, input, leftRowColumnCount))
-              .collect(toRow(schema));
-      return KV.of(row, input);
-    }
-
-    @SuppressWarnings("unused")
-    private Schema.Field toField(Schema schema, Integer fieldIndex) {
-      Schema.Field original = schema.getField(fieldIndex);
-      return original.withName("c" + fieldIndex);
-    }
-
-    private Object getValue(
-        SerializableRexNode serializableRexNode, Row input, int 
leftRowColumnCount) {
-      if (serializableRexNode instanceof SerializableRexInputRef) {
-        return input.getValue(
-            ((SerializableRexInputRef) serializableRexNode).getIndex() - 
leftRowColumnCount);
-      } else { // It can only be SerializableFieldAccess.
-        List<Integer> indexes = ((SerializableRexFieldAccess) 
serializableRexNode).getIndexes();
-        // retrieve row based on the first column reference.
-        Row rowField = input.getValue(indexes.get(0) - leftRowColumnCount);
-        for (int i = 1; i < indexes.size() - 1; i++) {
-          rowField = rowField.getRow(indexes.get(i));
-        }
-        return rowField.getValue(indexes.get(indexes.size() - 1));
-      }
-    }
+  public static FieldAccessDescriptor getJoinColumns(
+      boolean isLeft,
+      List<Pair<RexNode, RexNode>> joinColumns,
+      int leftRowColumnCount,
+      Schema schema) {
+    List<SerializableRexNode> joinColumnsBuilt =
+        joinColumns.stream()
+            .map(pair -> SerializableRexNode.builder(isLeft ? pair.left : 
pair.right).build())
+            .collect(toList());
+    return FieldAccessDescriptor.union(
+        joinColumnsBuilt.stream()
+            .map(v -> getJoinColumn(v, leftRowColumnCount).resolve(schema))
+            .collect(Collectors.toList()));
   }
 
-  /** A {@code DoFn} which implement the sideInput-JOIN. */
-  public static class SideInputJoinDoFn extends DoFn<KV<Row, Row>, Row> {
-    private final PCollectionView<Map<Row, Iterable<Row>>> sideInputView;
-    private final JoinRelType joinType;
-    private final Row rightNullRow;
-    private final boolean swap;
-    private final Schema schema;
-
-    public SideInputJoinDoFn(
-        JoinRelType joinType,
-        Row rightNullRow,
-        PCollectionView<Map<Row, Iterable<Row>>> sideInputView,
-        boolean swap,
-        Schema schema) {
-      this.joinType = joinType;
-      this.rightNullRow = rightNullRow;
-      this.sideInputView = sideInputView;
-      this.swap = swap;
-      this.schema = schema;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      Row key = context.element().getKey();
-      Row leftRow = context.element().getValue();
-      Map<Row, Iterable<Row>> key2Rows = context.sideInput(sideInputView);
-      Iterable<Row> rightRowsIterable = key2Rows.get(key);
-
-      if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) 
{
-        for (Row aRightRowsIterable : rightRowsIterable) {
-          context.output(combineTwoRowsIntoOne(leftRow, aRightRowsIterable, 
swap, schema));
-        }
-      } else {
-        if (joinType == JoinRelType.LEFT) {
-          context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap, 
schema));
-        }
+  private static FieldAccessDescriptor getJoinColumn(
 
 Review comment:
   I noticed this, because in the current codebase on master we could inline 
and delete `SerializableRexNode` entirely. So I went looking for how we 
encoding a full expression to be joined on. I didn't see rules that precomputed 
all of them (in which case input refs would suffice).
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 424287)
    Time Spent: 23h 10m  (was: 23h)

> Schema followups
> ----------------
>
>                 Key: BEAM-4076
>                 URL: https://issues.apache.org/jira/browse/BEAM-4076
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model, dsl-sql, sdk-java-core
>            Reporter: Kenneth Knowles
>            Priority: Major
>          Time Spent: 23h 10m
>  Remaining Estimate: 0h
>
> This umbrella bug contains subtasks with followups for Beam schemas, which 
> were moved from SQL to the core Java SDK and made to be type-name-based 
> rather than coder based.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to