[
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=145181&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145181
]
ASF GitHub Bot logged work on BEAM-4461:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Sep/18 02:34
Start Date: 18/Sep/18 02:34
Worklog Time Spent: 10m
Work Description: reuvenlax closed pull request #6316: [BEAM-4461] Add
Unnest transform.
URL: https://github.com/apache/beam/pull/6316
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 43d7a0f813d..86a0f4653d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -167,6 +167,10 @@ public Schema(List<Field> fields) {
this.fields = fields;
int index = 0;
for (Field field : fields) {
+ if (fieldIndices.get(field.getName()) != null) {
+ throw new IllegalArgumentException(
+ "Duplicate field " + field.getName() + " added to schema");
+ }
fieldIndices.put(field.getName(), index++);
}
this.hashCode = Objects.hash(fieldIndices, fields);
@@ -528,17 +532,18 @@ public int hashCode() {
public abstract Builder toBuilder();
+ /** Builder for {@link Field}. */
@AutoValue.Builder
- abstract static class Builder {
- abstract Builder setName(String name);
+ public abstract static class Builder {
+ public abstract Builder setName(String name);
- abstract Builder setDescription(String description);
+ public abstract Builder setDescription(String description);
- abstract Builder setType(FieldType fieldType);
+ public abstract Builder setType(FieldType fieldType);
- abstract Builder setNullable(Boolean nullable);
+ public abstract Builder setNullable(Boolean nullable);
- abstract Field build();
+ public abstract Field build();
}
/** Return's a field with the give name and type. */
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
new file mode 100644
index 00000000000..9c3381ef4e1
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Unnest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+/**
+ * A {@link PTransform} to unnest nested rows.
+ *
+ * <p>For example, consider a Row with the following nestedschema:
+ *
+ * <p>UserEvent Schema: userid: INT64 timestamp: DATETIME location: LatLong
+ *
+ * <p>LatLong Schema: latitude: DOUBLE longitude: DOUBLE
+ *
+ * <p>After unnesting, all of the rows will be converted to rows satisfying
the following schema:
+ *
+ * <p>UserEvent Schema: userid: INT64 timestamp: DATETIME location.latitude:
DOUBLE
+ * location.longitude: DOUBLE
+ *
+ * <p>By default nested names are concatenated to generated the unnested name,
however {@link
+ * Unnest.Inner#withFieldNameFunction} can be used to specify a custom naming
policy.
+ *
+ * <p>Note that currently array and map values are not unnested.
+ */
+@Experimental(Kind.SCHEMAS)
+public class Unnest {
+ public static <T> Inner<T> create() {
+ return new
AutoValue_Unnest_Inner.Builder<T>().setFieldNameFunction(CONCAT_FIELD_NAMES).build();
+ }
+ /**
+ * This is the default naming policy for naming fields. Every field name in
the path to a given
+ * field is concated with _ characters.
+ */
+ public static final SerializableFunction<List<String>, String>
CONCAT_FIELD_NAMES =
+ l -> {
+ return String.join("_", l);
+ };
+ /**
+ * This policy keeps the raw nested field name. If two differently-nested
fields have the same
+ * name, unnesting will fail with this policy.
+ */
+ public static final SerializableFunction<List<String>, String>
KEEP_NESTED_NAME =
+ l -> {
+ return l.get(l.size() - 1);
+ };
+ /** Returns the result of unnesting the given schema. The default naming
policy is used. */
+ static Schema getUnnestedSchema(Schema schema) {
+ List<String> nameComponents = Lists.newArrayList();
+ return getUnnestedSchema(schema, nameComponents, CONCAT_FIELD_NAMES);
+ }
+ /** Returns the result of unnesting the given schema with the given naming
policy. */
+ static Schema getUnnestedSchema(Schema schema,
SerializableFunction<List<String>, String> fn) {
+ List<String> nameComponents = Lists.newArrayList();
+ return getUnnestedSchema(schema, nameComponents, fn);
+ }
+
+ private static Schema getUnnestedSchema(
+ Schema schema, List<String> nameComponents,
SerializableFunction<List<String>, String> fn) {
+ Schema.Builder builder = Schema.builder();
+ for (Field field : schema.getFields()) {
+ nameComponents.add(field.getName());
+ if (field.getType().getTypeName().isCompositeType()) {
+ Schema nestedSchema =
getUnnestedSchema(field.getType().getRowSchema(), nameComponents, fn);
+ for (Field nestedField : nestedSchema.getFields()) {
+ builder.addField(nestedField);
+ }
+ } else {
+ String name = fn.apply(nameComponents);
+ Field newField = field.toBuilder().setName(name).build();
+ builder.addField(newField);
+ }
+ nameComponents.remove(nameComponents.size() - 1);
+ }
+ return builder.build();
+ }
+ /** Unnest a row. */
+ static Row unnestRow(Row input, Schema unnestedSchema) {
+ Row.Builder builder = Row.withSchema(unnestedSchema);
+ unnestRow(input, builder);
+ return builder.build();
+ }
+
+ private static void unnestRow(Row input, Row.Builder output) {
+ for (int i = 0; i < input.getSchema().getFieldCount(); ++i) {
+ Field field = input.getSchema().getField(i);
+ if (field.getType().getTypeName().isCompositeType()) {
+ unnestRow(input.getRow(i), output);
+ } else {
+ output.addValue(input.getValue(i));
+ }
+ }
+ }
+ /** A {@link PTransform} that unnests nested row. */
+ @AutoValue
+ public abstract static class Inner<T> extends PTransform<PCollection<T>,
PCollection<Row>> {
+ abstract Builder<T> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T>
setFieldNameFunction(SerializableFunction<List<String>, String> fn);
+
+ abstract Inner<T> build();
+ };
+
+ abstract SerializableFunction<List<String>, String> getFieldNameFunction();
+ /**
+ * Sets a policy for naming deeply-nested fields.
+ *
+ * <p>This is needed to prevent name collisions when differently-nested
fields have the same
+ * name. The default is to use the {@link #CONCAT_FIELD_NAMES} strategy
that concatenates all
+ * names in the path to generate the unnested name. For example, an
unnested name might be
+ * field1_field2_field3. In some cases the {@link #KEEP_NESTED_NAME}
strategy can be used to
+ * keep only the most-deeply nested name. However if this results in
conflicting names (e.g. if
+ * a schema has two subrows that each have the same schema this will
happen), the pipeline will
+ * fail at construction time.
+ *
+ * <p>An example of using this function to customize the separator
character:
+ *
+ * <pre>{@code
+ * pc.apply(Unnest.<Type>create().withFieldNameFunction(l ->
Strings.join("+", l)));
+ * }</pre>
+ */
+ public Inner<T> withFieldNameFunction(SerializableFunction<List<String>,
String> fn) {
+ return toBuilder().setFieldNameFunction(fn).build();
+ }
+
+ @Override
+ public PCollection<Row> expand(PCollection<T> input) {
+ Schema inputSchema = input.getSchema();
+ Schema outputSchema = getUnnestedSchema(inputSchema,
getFieldNameFunction());
+ return input
+ .apply(
+ ParDo.of(
+ new DoFn<T, Row>() {
+ @ProcessElement
+ public void processElement(@Element Row row,
OutputReceiver<Row> o) {
+ o.output(unnestRow(row, outputSchema));
+ }
+ }))
+ .setRowSchema(outputSchema);
+ }
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
index 20145dcff9e..c533b1e9e1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java
@@ -287,6 +287,19 @@
return withCoder(SchemaCoder.of(schema, toRowFunction, fromRowFunction));
}
+ /**
+ * Returns a {@link Create.Values} PTransform like this one that uses the
given {@code Schema}
+ * to represent objects.
+ */
+ @Experimental(Kind.SCHEMAS)
+ public Values<T> withRowSchema(Schema schema) {
+ return withCoder(
+ SchemaCoder.of(
+ schema,
+ (SerializableFunction<T, Row>)
SerializableFunctions.<Row>identity(),
+ (SerializableFunction<Row, T>)
SerializableFunctions.<Row>identity()));
+ }
+
/**
* Returns a {@link Create.Values} PTransform like this one that uses the
given {@code
* TypeDescriptor<T>} to determine the {@code Coder} to use to decode each
of the objects into a
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java
new file mode 100644
index 00000000000..5cdf56ddfaf
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/UnnestTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+/** Tests for {@link org.apache.beam.sdk.schemas.transforms.Unnest}. */
+public class UnnestTest implements Serializable {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+ static final Schema SIMPLE_SCHEMA =
+
Schema.builder().addInt32Field("field1").addStringField("field2").build();
+ static final Schema NESTED_SCHEMA =
+ Schema.builder()
+ .addRowField("nested1", SIMPLE_SCHEMA)
+ .addRowField("nested2", SIMPLE_SCHEMA)
+ .build();
+ static final Schema UNNESTED_SCHEMA =
+ Schema.builder()
+ .addInt32Field("nested1_field1")
+ .addStringField("nested1_field2")
+ .addInt32Field("nested2_field1")
+ .addStringField("nested2_field2")
+ .build();
+ static final Schema NESTED_SCHEMA2 =
+ Schema.builder().addRowField("nested", SIMPLE_SCHEMA).build();
+ static final Schema DOUBLE_NESTED_SCHEMA =
+ Schema.builder().addRowField("nested", NESTED_SCHEMA).build();
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFlatSchema() {
+ List<Row> rows =
+ IntStream.rangeClosed(0, 2)
+ .mapToObj(i -> Row.withSchema(SIMPLE_SCHEMA).addValues(i,
Integer.toString(i)).build())
+ .collect(Collectors.toList());
+ PCollection<Row> unnested =
+
pipeline.apply(Create.of(rows).withRowSchema(SIMPLE_SCHEMA)).apply(Unnest.create());
+ PAssert.that(unnested).containsInAnyOrder(rows);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testSimpleUnnesting() {
+ List<Row> bottomRow =
+ IntStream.rangeClosed(0, 2)
+ .mapToObj(i -> Row.withSchema(SIMPLE_SCHEMA).addValues(i,
Integer.toString(i)).build())
+ .collect(Collectors.toList());
+ List<Row> rows =
+ bottomRow
+ .stream()
+ .map(r -> Row.withSchema(NESTED_SCHEMA).addValues(r, r).build())
+ .collect(Collectors.toList());
+ PCollection<Row> unnested =
+
pipeline.apply(Create.of(rows).withRowSchema(NESTED_SCHEMA)).apply(Unnest.create());
+ assertEquals(UNNESTED_SCHEMA, unnested.getSchema());
+ List<Row> expected =
+ bottomRow
+ .stream()
+ .map(
+ r ->
+ Row.withSchema(UNNESTED_SCHEMA)
+ .addValues(r.getValue(0), r.getValue(1),
r.getValue(0), r.getValue(1))
+ .build())
+ .collect(Collectors.toList());
+ ;
+ PAssert.that(unnested).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ static final Schema ONE_LEVEL_UNNESTED_SCHEMA =
+ Schema.builder()
+ .addRowField("nested_nested1", SIMPLE_SCHEMA)
+ .addRowField("nested_nested2", SIMPLE_SCHEMA)
+ .build();
+
+ static final Schema UNNESTED2_SCHEMA_ALTERNATE =
+
Schema.builder().addInt32Field("field1").addStringField("field2").build();
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testAlternateNamePolicy() {
+ List<Row> bottomRow =
+ IntStream.rangeClosed(0, 2)
+ .mapToObj(i -> Row.withSchema(SIMPLE_SCHEMA).addValues(i,
Integer.toString(i)).build())
+ .collect(Collectors.toList());
+ List<Row> rows =
+ bottomRow
+ .stream()
+ .map(r -> Row.withSchema(NESTED_SCHEMA2).addValues(r).build())
+ .collect(Collectors.toList());
+ PCollection<Row> unnested =
+ pipeline
+ .apply(Create.of(rows).withRowSchema(NESTED_SCHEMA2))
+
.apply(Unnest.<Row>create().withFieldNameFunction(Unnest.KEEP_NESTED_NAME));
+ assertEquals(UNNESTED2_SCHEMA_ALTERNATE, unnested.getSchema());
+ List<Row> expected =
+ bottomRow
+ .stream()
+ .map(
+ r ->
+ Row.withSchema(UNNESTED2_SCHEMA_ALTERNATE)
+ .addValues(r.getValue(0), r.getValue(1))
+ .build())
+ .collect(Collectors.toList());
+ ;
+ PAssert.that(unnested).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testClashingNamePolicy() {
+ List<Row> bottomRow =
+ IntStream.rangeClosed(0, 2)
+ .mapToObj(i -> Row.withSchema(SIMPLE_SCHEMA).addValues(i,
Integer.toString(i)).build())
+ .collect(Collectors.toList());
+ thrown.expect(IllegalArgumentException.class);
+ List<Row> rows =
+ bottomRow
+ .stream()
+ .map(r -> Row.withSchema(NESTED_SCHEMA).addValues(r, r).build())
+ .collect(Collectors.toList());
+ PCollection<Row> unnested =
+ pipeline
+ .apply(Create.of(rows).withRowSchema(NESTED_SCHEMA))
+
.apply(Unnest.<Row>create().withFieldNameFunction(Unnest.KEEP_NESTED_NAME));
+ pipeline.run();
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index a36c9eadb4b..cf82ad416c3 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -147,8 +147,8 @@ private boolean isSideInputJoin() {
if (isSideInputJoin()) {
checkArgument(pinput.size() == 1, "More than one input received for
side input join");
- return joinAsLookup(leftRelNode, rightRelNode, pinput.get(0))
- .setRowSchema(CalciteUtils.toSchema(getRowType()));
+ Schema schema = CalciteUtils.toSchema(getRowType());
+ return joinAsLookup(leftRelNode, rightRelNode, pinput.get(0),
schema).setRowSchema(schema);
}
Schema leftSchema = CalciteUtils.toSchema(left.getRowType());
@@ -170,24 +170,29 @@ private boolean isSideInputJoin() {
// build the extract key type
// the name of the join field is not important
- Schema extractKeySchema =
+ Schema extractKeySchemaLeft =
pairs.stream().map(pair ->
leftSchema.getField(pair.getKey())).collect(toSchema());
+ Schema extractKeySchemaRight =
+ pairs.stream().map(pair ->
rightSchema.getField(pair.getValue())).collect(toSchema());
- SchemaCoder<Row> extractKeyRowCoder = SchemaCoder.of(extractKeySchema);
+ SchemaCoder<Row> extractKeyRowCoder =
SchemaCoder.of(extractKeySchemaLeft);
// BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
PCollection<KV<Row, Row>> extractedLeftRows =
leftRows
.apply(
"left_ExtractJoinFields",
- MapElements.via(new
BeamJoinTransforms.ExtractJoinFields(true, pairs)))
+ MapElements.via(
+ new BeamJoinTransforms.ExtractJoinFields(true, pairs,
extractKeySchemaLeft)))
.setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
PCollection<KV<Row, Row>> extractedRightRows =
rightRows
.apply(
"right_ExtractJoinFields",
- MapElements.via(new
BeamJoinTransforms.ExtractJoinFields(false, pairs)))
+ MapElements.via(
+ new BeamJoinTransforms.ExtractJoinFields(
+ false, pairs, extractKeySchemaRight)))
.setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
// a regular join
@@ -311,11 +316,13 @@ private boolean triggersOncePerWindow(WindowingStrategy
windowingStrategy) {
break;
}
+ Schema schema = CalciteUtils.toSchema(getRowType());
PCollection<Row> ret =
joinedRows
.apply(
- "JoinParts2WholeRow", MapElements.via(new
BeamJoinTransforms.JoinParts2WholeRow()))
- .setRowSchema(CalciteUtils.toSchema(getRowType()));
+ "JoinParts2WholeRow",
+ MapElements.via(new
BeamJoinTransforms.JoinParts2WholeRow(schema)))
+ .setRowSchema(schema);
return ret;
}
@@ -359,14 +366,15 @@ private boolean triggersOncePerWindow(WindowingStrategy
windowingStrategy) {
boolean swapped) {
final PCollectionView<Map<Row, Iterable<Row>>> rowsView =
rightRows.apply(View.asMultimap());
+ Schema schema = CalciteUtils.toSchema(getRowType());
PCollection<Row> ret =
leftRows
.apply(
ParDo.of(
new BeamJoinTransforms.SideInputJoinDoFn(
- joinType, rightNullRow, rowsView, swapped))
+ joinType, rightNullRow, rowsView, swapped, schema))
.withSideInputs(rowsView))
- .setRowSchema(CalciteUtils.toSchema(getRowType()));
+ .setRowSchema(schema);
return ret;
}
@@ -428,7 +436,10 @@ private Schema buildNullSchema(Schema schema) {
}
private PCollection<Row> joinAsLookup(
- BeamRelNode leftRelNode, BeamRelNode rightRelNode, PCollection<Row>
factStream) {
+ BeamRelNode leftRelNode,
+ BeamRelNode rightRelNode,
+ PCollection<Row> factStream,
+ Schema outputSchema) {
BeamIOSourceRel srcRel = (BeamIOSourceRel) rightRelNode;
BeamSqlSeekableTable seekableTable = (BeamSqlSeekableTable)
srcRel.getBeamSqlTable();
@@ -438,6 +449,7 @@ private Schema buildNullSchema(Schema schema) {
condition,
seekableTable,
CalciteUtils.toSchema(rightRelNode.getRowType()),
+ outputSchema,
CalciteUtils.toSchema(leftRelNode.getRowType()).getFieldCount()));
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 2492a2406b0..b81df6ddb44 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -19,7 +19,6 @@
package org.apache.beam.sdk.extensions.sql.impl.transform;
import static java.util.stream.Collectors.toList;
-import static org.apache.beam.sdk.schemas.Schema.toSchema;
import static org.apache.beam.sdk.values.Row.toRow;
import java.util.ArrayList;
@@ -47,22 +46,18 @@
/** A {@code SimpleFunction} to extract join fields from the specified row.
*/
public static class ExtractJoinFields extends SimpleFunction<Row, KV<Row,
Row>> {
private final List<Integer> joinColumns;
+ private final Schema schema;
- public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>>
joinColumns) {
+ public ExtractJoinFields(
+ boolean isLeft, List<Pair<Integer, Integer>> joinColumns, Schema
schema) {
this.joinColumns =
joinColumns.stream().map(pair -> isLeft ? pair.left :
pair.right).collect(toList());
+ this.schema = schema;
}
@Override
public KV<Row, Row> apply(Row input) {
- Schema schema =
- joinColumns
- .stream()
- .map(fieldIndex -> toField(input.getSchema(), fieldIndex))
- .collect(toSchema());
-
Row row =
joinColumns.stream().map(input::getValue).collect(toRow(schema));
-
return KV.of(row, input);
}
@@ -78,16 +73,19 @@ public ExtractJoinFields(boolean isLeft, List<Pair<Integer,
Integer>> joinColumn
private final JoinRelType joinType;
private final Row rightNullRow;
private final boolean swap;
+ private final Schema schema;
public SideInputJoinDoFn(
JoinRelType joinType,
Row rightNullRow,
PCollectionView<Map<Row, Iterable<Row>>> sideInputView,
- boolean swap) {
+ boolean swap,
+ Schema schema) {
this.joinType = joinType;
this.rightNullRow = rightNullRow;
this.sideInputView = sideInputView;
this.swap = swap;
+ this.schema = schema;
}
@ProcessElement
@@ -99,11 +97,11 @@ public void processElement(ProcessContext context) {
if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext())
{
for (Row aRightRowsIterable : rightRowsIterable) {
- context.output(combineTwoRowsIntoOne(leftRow, aRightRowsIterable,
swap));
+ context.output(combineTwoRowsIntoOne(leftRow, aRightRowsIterable,
swap, schema));
}
} else {
if (joinType == JoinRelType.LEFT) {
- context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap));
+ context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap,
schema));
}
}
}
@@ -111,33 +109,34 @@ public void processElement(ProcessContext context) {
/** A {@code SimpleFunction} to combine two rows into one. */
public static class JoinParts2WholeRow extends SimpleFunction<KV<Row,
KV<Row, Row>>, Row> {
+ private final Schema schema;
+
+ public JoinParts2WholeRow(Schema schema) {
+ this.schema = schema;
+ }
+
@Override
public Row apply(KV<Row, KV<Row, Row>> input) {
KV<Row, Row> parts = input.getValue();
Row leftRow = parts.getKey();
Row rightRow = parts.getValue();
- return combineTwoRowsIntoOne(leftRow, rightRow, false);
+ return combineTwoRowsIntoOne(leftRow, rightRow, false, schema);
}
}
/** As the method name suggests: combine two rows into one wide row. */
- private static Row combineTwoRowsIntoOne(Row leftRow, Row rightRow, boolean
swap) {
+ private static Row combineTwoRowsIntoOne(
+ Row leftRow, Row rightRow, boolean swap, Schema outputSchema) {
if (swap) {
- return combineTwoRowsIntoOneHelper(rightRow, leftRow);
+ return combineTwoRowsIntoOneHelper(rightRow, leftRow, outputSchema);
} else {
- return combineTwoRowsIntoOneHelper(leftRow, rightRow);
+ return combineTwoRowsIntoOneHelper(leftRow, rightRow, outputSchema);
}
}
/** As the method name suggests: combine two rows into one wide row. */
- private static Row combineTwoRowsIntoOneHelper(Row leftRow, Row rightRow) {
- // build the type
- List<Schema.Field> fields = new ArrayList<>(leftRow.getFieldCount() +
rightRow.getFieldCount());
- fields.addAll(leftRow.getSchema().getFields());
- fields.addAll(rightRow.getSchema().getFields());
- Schema type = Schema.builder().addFields(fields).build();
-
- return Row.withSchema(type)
+ private static Row combineTwoRowsIntoOneHelper(Row leftRow, Row rightRow,
Schema ouputSchema) {
+ return Row.withSchema(ouputSchema)
.addValues(leftRow.getValues())
.addValues(rightRow.getValues())
.build();
@@ -145,19 +144,21 @@ private static Row combineTwoRowsIntoOneHelper(Row
leftRow, Row rightRow) {
/** Transform to execute Join as Lookup. */
public static class JoinAsLookup extends PTransform<PCollection<Row>,
PCollection<Row>> {
-
- BeamSqlSeekableTable seekableTable;
- Schema lkpSchema;
- Schema joinSubsetType;
- List<Integer> factJoinIdx;
+ private final BeamSqlSeekableTable seekableTable;
+ private final Schema lkpSchema;
+ private Schema joinSubsetType;
+ private final Schema outputSchema;
+ private List<Integer> factJoinIdx;
public JoinAsLookup(
RexNode joinCondition,
BeamSqlSeekableTable seekableTable,
Schema lkpSchema,
+ Schema outputSchema,
int factTableColSize) {
this.seekableTable = seekableTable;
this.lkpSchema = lkpSchema;
+ this.outputSchema = outputSchema;
joinFieldsMapping(joinCondition, factTableColSize);
}
@@ -205,7 +206,7 @@ public void processElement(ProcessContext context) {
Row joinSubRow = extractJoinSubRow(factRow);
List<Row> lookupRows = seekableTable.seekRow(joinSubRow);
for (Row lr : lookupRows) {
- context.output(combineTwoRowsIntoOneHelper(factRow,
lr));
+ context.output(combineTwoRowsIntoOneHelper(factRow,
lr, outputSchema));
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 145181)
Time Spent: 10h 20m (was: 10h 10m)
> Create a library of useful transforms that use schemas
> ------------------------------------------------------
>
> Key: BEAM-4461
> URL: https://issues.apache.org/jira/browse/BEAM-4461
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core
> Reporter: Reuven Lax
> Assignee: Reuven Lax
> Priority: Major
> Time Spent: 10h 20m
> Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)