[ https://issues.apache.org/jira/browse/BEAM-4076?focusedWorklogId=424287&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-424287 ]
ASF GitHub Bot logged work on BEAM-4076: ---------------------------------------- Author: ASF GitHub Bot Created on: 17/Apr/20 16:51 Start Date: 17/Apr/20 16:51 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #11041: [BEAM-4076] Use beam join api in sql URL: https://github.com/apache/beam/pull/11041#discussion_r410347297 ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java ########## @@ -45,111 +41,35 @@ /** Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation. */ public class BeamJoinTransforms { - /** A {@code SimpleFunction} to extract join fields from the specified row. */ - public static class ExtractJoinFields extends SimpleFunction<Row, KV<Row, Row>> { - private final List<SerializableRexNode> joinColumns; - private final Schema schema; - private int leftRowColumnCount; - - public ExtractJoinFields( - boolean isLeft, - List<Pair<RexNode, RexNode>> joinColumns, - Schema schema, - int leftRowColumnCount) { - this.joinColumns = - joinColumns.stream() - .map(pair -> SerializableRexNode.builder(isLeft ? pair.left : pair.right).build()) - .collect(toList()); - this.schema = schema; - this.leftRowColumnCount = leftRowColumnCount; - } - - @Override - public KV<Row, Row> apply(Row input) { - Row row = - joinColumns.stream() - .map(v -> getValue(v, input, leftRowColumnCount)) - .collect(toRow(schema)); - return KV.of(row, input); - } - - @SuppressWarnings("unused") - private Schema.Field toField(Schema schema, Integer fieldIndex) { - Schema.Field original = schema.getField(fieldIndex); - return original.withName("c" + fieldIndex); - } - - private Object getValue( - SerializableRexNode serializableRexNode, Row input, int leftRowColumnCount) { - if (serializableRexNode instanceof SerializableRexInputRef) { - return input.getValue( - ((SerializableRexInputRef) serializableRexNode).getIndex() - leftRowColumnCount); - } else { // It can only be SerializableFieldAccess. - List<Integer> indexes = ((SerializableRexFieldAccess) serializableRexNode).getIndexes(); - // retrieve row based on the first column reference. - Row rowField = input.getValue(indexes.get(0) - leftRowColumnCount); - for (int i = 1; i < indexes.size() - 1; i++) { - rowField = rowField.getRow(indexes.get(i)); - } - return rowField.getValue(indexes.get(indexes.size() - 1)); - } - } + public static FieldAccessDescriptor getJoinColumns( + boolean isLeft, + List<Pair<RexNode, RexNode>> joinColumns, + int leftRowColumnCount, + Schema schema) { + List<SerializableRexNode> joinColumnsBuilt = + joinColumns.stream() + .map(pair -> SerializableRexNode.builder(isLeft ? pair.left : pair.right).build()) + .collect(toList()); + return FieldAccessDescriptor.union( + joinColumnsBuilt.stream() + .map(v -> getJoinColumn(v, leftRowColumnCount).resolve(schema)) + .collect(Collectors.toList())); } - /** A {@code DoFn} which implement the sideInput-JOIN. */ - public static class SideInputJoinDoFn extends DoFn<KV<Row, Row>, Row> { - private final PCollectionView<Map<Row, Iterable<Row>>> sideInputView; - private final JoinRelType joinType; - private final Row rightNullRow; - private final boolean swap; - private final Schema schema; - - public SideInputJoinDoFn( - JoinRelType joinType, - Row rightNullRow, - PCollectionView<Map<Row, Iterable<Row>>> sideInputView, - boolean swap, - Schema schema) { - this.joinType = joinType; - this.rightNullRow = rightNullRow; - this.sideInputView = sideInputView; - this.swap = swap; - this.schema = schema; - } - - @ProcessElement - public void processElement(ProcessContext context) { - Row key = context.element().getKey(); - Row leftRow = context.element().getValue(); - Map<Row, Iterable<Row>> key2Rows = context.sideInput(sideInputView); - Iterable<Row> rightRowsIterable = key2Rows.get(key); - - if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) { - for (Row aRightRowsIterable : rightRowsIterable) { - context.output(combineTwoRowsIntoOne(leftRow, aRightRowsIterable, swap, schema)); - } - } else { - if (joinType == JoinRelType.LEFT) { - context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap, schema)); - } + private static FieldAccessDescriptor getJoinColumn( Review comment: I noticed this, because in the current codebase on master we could inline and delete `SerializableRexNode` entirely. So I went looking for how we encoding a full expression to be joined on. I didn't see rules that precomputed all of them (in which case input refs would suffice). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 424287) Time Spent: 23h 10m (was: 23h) > Schema followups > ---------------- > > Key: BEAM-4076 > URL: https://issues.apache.org/jira/browse/BEAM-4076 > Project: Beam > Issue Type: Improvement > Components: beam-model, dsl-sql, sdk-java-core > Reporter: Kenneth Knowles > Priority: Major > Time Spent: 23h 10m > Remaining Estimate: 0h > > This umbrella bug contains subtasks with followups for Beam schemas, which > were moved from SQL to the core Java SDK and made to be type-name-based > rather than coder based. -- This message was sent by Atlassian Jira (v8.3.4#803005)