[ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=167675&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-167675
 ]

ASF GitHub Bot logged work on BEAM-4461:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Nov/18 08:45
            Start Date: 20/Nov/18 08:45
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #6832: [BEAM-4461] 
CoGroup transforms for schemas.
URL: https://github.com/apache/beam/pull/6832
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
index 2e3265387e8..bbdf29bac26 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
@@ -39,6 +39,8 @@
 /**
  * Used inside of a {@link org.apache.beam.sdk.transforms.DoFn} to describe 
which fields in a schema
  * type need to be accessed for processing.
+ *
+ * <p>This class always puts the selected fields in a deterministic order.
  */
 @Experimental(Kind.SCHEMAS)
 @AutoValue
@@ -57,6 +59,8 @@ abstract Builder setNestedFieldsAccessedById(
     abstract Builder setNestedFieldsAccessedByName(
         Map<String, FieldAccessDescriptor> nestedFieldsAccessedByName);
 
+    abstract Builder setFieldInsertionOrder(boolean insertionOrder);
+
     abstract FieldAccessDescriptor build();
   }
 
@@ -70,11 +74,14 @@ abstract Builder setNestedFieldsAccessedByName(
 
   abstract Map<String, FieldAccessDescriptor> getNestedFieldsAccessedByName();
 
+  abstract boolean getFieldInsertionOrder();
+
   abstract Builder toBuilder();
 
   static Builder builder() {
     return new AutoValue_FieldAccessDescriptor.Builder()
         .setAllFields(false)
+        .setFieldInsertionOrder(false)
         .setFieldIdsAccessed(Collections.emptySet())
         .setFieldNamesAccessed(Collections.emptySet())
         .setNestedFieldsAccessedById(Collections.emptyMap())
@@ -105,7 +112,7 @@ public static FieldAccessDescriptor 
withFieldNames(String... names) {
    * in a recursive {@link FieldAccessDescriptor}.
    */
   public static FieldAccessDescriptor withFieldNames(Iterable<String> 
fieldNames) {
-    return 
builder().setFieldNamesAccessed(Sets.newTreeSet(fieldNames)).build();
+    return 
builder().setFieldNamesAccessed(Sets.newLinkedHashSet(fieldNames)).build();
   }
 
   /**
@@ -127,7 +134,7 @@ public static FieldAccessDescriptor withFieldIds(Integer... 
ids) {
    * in a recursive {@link FieldAccessDescriptor}.
    */
   public static FieldAccessDescriptor withFieldIds(Iterable<Integer> ids) {
-    return builder().setFieldIdsAccessed(Sets.newTreeSet(ids)).build();
+    return builder().setFieldIdsAccessed(Sets.newLinkedHashSet(ids)).build();
   }
 
   /** Return an empty {@link FieldAccessDescriptor}. */
@@ -165,6 +172,14 @@ public FieldAccessDescriptor withNestedField(
     return 
toBuilder().setNestedFieldsAccessedByName(newNestedFieldAccess).build();
   }
 
+  /**
+   * By default, fields are sorted by name. If this is set, they will instead 
be sorted by insertion
+   * order. All sorting happens in the {@link #resolve(Schema)} method.
+   */
+  public FieldAccessDescriptor withOrderByFieldInsertionOrder() {
+    return toBuilder().setFieldInsertionOrder(true).build();
+  }
+
   public boolean allFields() {
     return getAllFields();
   }
@@ -177,6 +192,7 @@ public boolean allFields() {
     return getNestedFieldsAccessedById();
   }
 
+  // After resolution, fields are always ordered by their field name.
   public FieldAccessDescriptor resolve(Schema schema) {
     Set<Integer> resolvedFieldIdsAccessed = resolveFieldIdsAccessed(schema);
     Map<Integer, FieldAccessDescriptor> resolvedNestedFieldsAccessed =
@@ -198,7 +214,13 @@ public FieldAccessDescriptor resolve(Schema schema) {
   }
 
   private Set<Integer> resolveFieldIdsAccessed(Schema schema) {
-    Set<Integer> fieldIds = Sets.newTreeSet();
+    Set<Integer> fieldIds;
+    if (getFieldInsertionOrder()) {
+      fieldIds = Sets.newLinkedHashSet();
+    } else {
+      fieldIds = Sets.newTreeSet();
+    }
+
     for (int fieldId : getFieldIdsAccessed()) {
       fieldIds.add(validateFieldId(schema, fieldId));
     }
@@ -233,7 +255,12 @@ private FieldAccessDescriptor resolvedNestedFieldsHelper(
   }
 
   private Map<Integer, FieldAccessDescriptor> 
resolveNestedFieldsAccessed(Schema schema) {
-    Map<Integer, FieldAccessDescriptor> nestedFields = Maps.newTreeMap();
+    Map<Integer, FieldAccessDescriptor> nestedFields;
+    if (getFieldInsertionOrder()) {
+      nestedFields = Maps.newLinkedHashMap();
+    } else {
+      nestedFields = Maps.newTreeMap();
+    }
 
     nestedFields.putAll(
         getNestedFieldsAccessedByName()
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 85dfd3a8b61..90ec371cadb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -209,6 +209,25 @@ public boolean equals(Object o) {
         && Objects.equals(getFields(), other.getFields());
   }
 
+  /** Returns true if two schemas are equal ignoring field names and 
descriptions. */
+  public boolean typesEqual(Schema other) {
+    if (uuid != null && other.uuid != null && Objects.equals(uuid, 
other.uuid)) {
+      return true;
+    }
+    if (getFieldCount() != other.getFieldCount()) {
+      return false;
+    }
+    if (!Objects.equals(fieldIndices.values(), other.fieldIndices.values())) {
+      return false;
+    }
+    for (int i = 0; i < getFieldCount(); ++i) {
+      if (!getField(i).typesEqual(other.getField(i))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   enum EquivalenceNullablePolicy {
     SAME,
     WEAKEN,
@@ -547,6 +566,29 @@ public boolean equals(Object o) {
           && Arrays.equals(getMetadata(), other.getMetadata());
     }
 
+    /** Returns true if two FieldTypes are equal. */
+    public boolean typesEqual(FieldType other) {
+      if (!Objects.equals(getTypeName(), other.getTypeName())) {
+        return false;
+      }
+      if (!Arrays.equals(getMetadata(), other.getMetadata())) {
+        return false;
+      }
+      if (getTypeName() == TypeName.ARRAY
+          && 
!getCollectionElementType().typesEqual(other.getCollectionElementType())) {
+        return false;
+      }
+      if (getTypeName() == TypeName.MAP
+          && (!getMapValueType().typesEqual(other.getMapValueType())
+              || !getMapKeyType().typesEqual(other.getMapKeyType()))) {
+        return false;
+      }
+      if (getTypeName() == TypeName.ROW && 
!getRowSchema().typesEqual(other.getRowSchema())) {
+        return false;
+      }
+      return true;
+    }
+
     private boolean equivalent(FieldType other) {
       if (!other.getTypeName().equals(getTypeName())) {
         return false;
@@ -671,6 +713,12 @@ public boolean equals(Object o) {
           && Objects.equals(getNullable(), other.getNullable());
     }
 
+    /** Returns true if two fields are equal, ignoring name and description. */
+    public boolean typesEqual(Field other) {
+      return getType().typesEqual(other.getType())
+          && Objects.equals(getNullable(), other.getNullable());
+    }
+
     private boolean equivalent(Field otherField, EquivalenceNullablePolicy 
nullablePolicy) {
       if (nullablePolicy == EquivalenceNullablePolicy.SAME
           && !otherField.getNullable().equals(getNullable())) {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
new file mode 100644
index 00000000000..48b475c3081
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * A transform that performs equijoins across multiple schema {@link 
PCollection}s.
+ *
+ * <p>This transform has similarites to {@link CoGroupByKey}, however works on 
PCollections that
+ * have schemas. This allows users of the transform to simply specify schema 
fields to join on. The
+ * output type of the transform is a {@literal KV<Row, Row>} where the value 
contains one field for
+ * every input PCollection and the key represents the fields that were joined 
on. By default the
+ * cross product is not expanded, so all fields in the output row are array 
fields.
+ *
+ * <p>For example, the following demonstrates joining three PCollections on 
the "user" and "country"
+ * fields.
+ *
+ * <pre>{@code
+ * TupleTag<Input1Type> input1Tag = new TupleTag<>("input1");
+ * TupleTag<Input2Type> input2Tag = new TupleTag<>("input2");
+ * TupleTag<Input3Type> input3Tag = new TupleTag<>("input3");
+ * PCollection<KV<Row, Row>> joined = PCollectionTuple
+ *     .of(input1Tag, input1)
+ *     .and(input2Tag, input2)
+ *     .and(input3Tag, input3)
+ *   .apply(CoGroup.byFieldNames("user", "country"));
+ * }</pre>
+ *
+ * <p>In the above case, the key schema will contain the two string fields 
"user" and "country"; in
+ * this case, the schemas for Input1, Input2, Input3 must all have fields 
named "user" and
+ * "country". The value schema will contain three array of Row fields named 
"input1" "input2" and
+ * "input3". The value Row contains all inputs that came in on any of the 
inputs for that key.
+ * Standard join types (inner join, outer join, etc.) can be accomplished by 
expanding the cross
+ * product of these arrays in various ways.
+ *
+ * <p>To put it in other words, the key schema is convertible to the following 
POJO:
+ *
+ * <pre>{@code
+ * {@literal @}DefaultSchema(JavaFieldSchema.class)
+ * public class JoinedKey {
+ *   public String user;
+ *   public String country;
+ * }
+ *
+ * PCollection<JoinedKey> keys = joined
+ *     .apply(Keys.create())
+ *     .apply(Convert.to(JoinedKey.class));
+ * }</pre>
+ *
+ * The value schema is convertible to the following POJO:
+ *
+ * <pre>{@code
+ * {@literal @}DefaultSchema(JavaFieldSchema.class)
+ * public class JoinedValue {
+ *   // The below lists contain all values from each of the three inputs that 
match on the given
+ *   // key.
+ *   public List<Input1Type> input1;
+ *   public List<Input2Type> input2;
+ *   public List<Input3Type> input3;
+ * }
+ *
+ * PCollection<JoinedValue> values = joined
+ *     .apply(Values.create())
+ *     .apply(Convert.to(JoinedValue.class));
+ * }</pre>
+ *
+ * <p>It's also possible to join between different fields in two inputs, as 
long as the types of
+ * those fields match. In this case, fields must be specified for every input 
PCollection. For
+ * example:
+ *
+ * <pre>{@code
+ * PCollection<KV<Row, Row>> joined = PCollectionTuple
+ *     .of(input1Tag, input1)
+ *     .and(input2Tag, input2)
+ *   .apply(CoGroup
+ *     .byFieldNames(input1Tag, "referringUser"))
+ *     .byFieldNames(input2Tag, "user"));
+ * }</pre>
+ */
+public class CoGroup {
+  /**
+   * Join by the following field names.
+   *
+   * <p>The same field names are used in all input PCollections.
+   */
+  public static Inner byFieldNames(String... fieldNames) {
+    return 
byFieldAccessDescriptor(FieldAccessDescriptor.withFieldNames(fieldNames));
+  }
+
+  /**
+   * Join by the following field ids.
+   *
+   * <p>The same field ids are used in all input PCollections.
+   */
+  public static Inner byFieldIds(Integer... fieldIds) {
+    return 
byFieldAccessDescriptor(FieldAccessDescriptor.withFieldIds(fieldIds));
+  }
+
+  /**
+   * Join by the following {@link FieldAccessDescriptor}.
+   *
+   * <p>The same access descriptor is used in all input PCollections.
+   */
+  public static Inner byFieldAccessDescriptor(FieldAccessDescriptor 
fieldAccessDescriptor) {
+    return new Inner(fieldAccessDescriptor);
+  }
+
+  /**
+   * Select the following field names for the specified PCollection.
+   *
+   * <p>Each PCollection in the input must have fields specified for the join 
key.
+   */
+  public static Inner byFieldNames(TupleTag<?> tag, String... fieldNames) {
+    return byFieldAccessDescriptor(tag, 
FieldAccessDescriptor.withFieldNames(fieldNames));
+  }
+
+  /**
+   * Select the following field ids for the specified PCollection.
+   *
+   * <p>Each PCollection in the input must have fields specified for the join 
key.
+   */
+  public static Inner byFieldIds(TupleTag<?> tag, Integer... fieldIds) {
+    return byFieldAccessDescriptor(tag, 
FieldAccessDescriptor.withFieldIds(fieldIds));
+  }
+
+  /**
+   * Select the following fields for the specified PCollection using {@link 
FieldAccessDescriptor}.
+   *
+   * <p>Each PCollection in the input must have fields specified for the join 
key.
+   */
+  public static Inner byFieldAccessDescriptor(
+      TupleTag<?> tag, FieldAccessDescriptor fieldAccessDescriptor) {
+    return new Inner().byFieldAccessDescriptor(tag, fieldAccessDescriptor);
+  }
+
+  /** The implementing PTransform. */
+  public static class Inner extends PTransform<PCollectionTuple, 
PCollection<KV<Row, Row>>> {
+    @Nullable private final FieldAccessDescriptor 
allInputsFieldAccessDescriptor;
+    private final Map<TupleTag<?>, FieldAccessDescriptor> 
fieldAccessDescriptorMap;
+
+    private Inner() {
+      this(Collections.emptyMap());
+    }
+
+    private Inner(Map<TupleTag<?>, FieldAccessDescriptor> 
fieldAccessDescriptorMap) {
+      this.allInputsFieldAccessDescriptor = null;
+      this.fieldAccessDescriptorMap = fieldAccessDescriptorMap;
+    }
+
+    private Inner(FieldAccessDescriptor allInputsFieldAccessDescriptor) {
+      this.allInputsFieldAccessDescriptor = allInputsFieldAccessDescriptor;
+      this.fieldAccessDescriptorMap = Collections.emptyMap();
+    }
+
+    /**
+     * Join by the following field names.
+     *
+     * <p>The same field names are used in all input PCollections.
+     */
+    public Inner byFieldNames(TupleTag<?> tag, String... fieldNames) {
+      return byFieldAccessDescriptor(tag, 
FieldAccessDescriptor.withFieldNames(fieldNames));
+    }
+
+    /**
+     * Select the following field ids for the specified PCollection.
+     *
+     * <p>Each PCollection in the input must have fields specified for the 
join key.
+     */
+    public Inner byFieldIds(TupleTag<?> tag, Integer... fieldIds) {
+      return byFieldAccessDescriptor(tag, 
FieldAccessDescriptor.withFieldIds(fieldIds));
+    }
+
+    /**
+     * Select the following fields for the specified PCollection using {@link
+     * FieldAccessDescriptor}.
+     *
+     * <p>Each PCollection in the input must have fields specified for the 
join key.
+     */
+    public Inner byFieldAccessDescriptor(
+        TupleTag<?> tag, FieldAccessDescriptor fieldAccessDescriptor) {
+      if (allInputsFieldAccessDescriptor != null) {
+        throw new IllegalStateException("Cannot set both a global and per-tag 
fields.");
+      }
+      return new Inner(
+          new ImmutableMap.Builder<TupleTag<?>, FieldAccessDescriptor>()
+              .putAll(fieldAccessDescriptorMap)
+              .put(tag, fieldAccessDescriptor)
+              .build());
+    }
+
+    @Nullable
+    private FieldAccessDescriptor getFieldAccessDescriptor(TupleTag<?> tag) {
+      return (allInputsFieldAccessDescriptor != null)
+          ? allInputsFieldAccessDescriptor
+          : fieldAccessDescriptorMap.get(tag);
+    }
+
+    @Override
+    public PCollection<KV<Row, Row>> expand(PCollectionTuple input) {
+      KeyedPCollectionTuple<Row> keyedPCollectionTuple =
+          KeyedPCollectionTuple.empty(input.getPipeline());
+      List<TupleTag<Row>> sortedTags =
+          input
+              .getAll()
+              .keySet()
+              .stream()
+              .sorted(Comparator.comparing(TupleTag::getId))
+              .map(t -> new TupleTag<Row>(t.getId() + "_ROW"))
+              .collect(Collectors.toList());
+
+      // Keep this in a TreeMap so that it's sorted. This way we get a 
deterministic output
+      // schema.
+      TreeMap<String, Schema> componentSchemas = Maps.newTreeMap();
+      Map<String, SerializableFunction<Object, Row>> toRows = 
Maps.newHashMap();
+
+      Schema keySchema = null;
+      for (Map.Entry<TupleTag<?>, PCollection<?>> entry : 
input.getAll().entrySet()) {
+        TupleTag<?> tag = entry.getKey();
+        PCollection<?> pc = entry.getValue();
+        Schema schema = pc.getSchema();
+        componentSchemas.put(tag.getId(), schema);
+        TupleTag<Row> rowTag = new TupleTag<>(tag.getId() + "_ROW");
+        toRows.put(rowTag.getId(), (SerializableFunction<Object, Row>) 
pc.getToRowFunction());
+        FieldAccessDescriptor fieldAccessDescriptor = 
getFieldAccessDescriptor(tag);
+        if (fieldAccessDescriptor == null) {
+          throw new IllegalStateException("No fields were set for input " + 
tag);
+        }
+        // Resolve the key schema, keeping the fields in the order specified 
by the user.
+        // Otherwise, if different field names are specified for different 
PCollections, they
+        // might not match up.
+        // The key schema contains the field names from the first PCollection 
specified.
+        FieldAccessDescriptor resolved =
+            
fieldAccessDescriptor.withOrderByFieldInsertionOrder().resolve(schema);
+        Schema currentKeySchema = Select.getOutputSchema(schema, resolved);
+        if (keySchema == null) {
+          keySchema = currentKeySchema;
+        } else {
+          if (!currentKeySchema.typesEqual(keySchema)) {
+            throw new IllegalStateException("All keys must have the same 
schema");
+          }
+        }
+
+        PCollection<KV<Row, Row>> keyedPCollection =
+            extractKey(pc, schema, keySchema, resolved, tag.getId());
+        keyedPCollectionTuple = keyedPCollectionTuple.and(rowTag, 
keyedPCollection);
+      }
+
+      // Construct the output schema. It contains one field for each input 
PCollection, of type
+      // ARRAY[ROW].
+      Schema.Builder joinedSchemaBuilder = Schema.builder();
+      for (Map.Entry<String, Schema> entry : componentSchemas.entrySet()) {
+        joinedSchemaBuilder.addArrayField(entry.getKey(), 
FieldType.row(entry.getValue()));
+      }
+      Schema joinedSchema = joinedSchemaBuilder.build();
+
+      return keyedPCollectionTuple
+          .apply("CoGroupByKey", CoGroupByKey.create())
+          .apply("ConvertToRow", ParDo.of(new ConvertToRow(sortedTags, toRows, 
joinedSchema)))
+          .setCoder(KvCoder.of(SchemaCoder.of(keySchema), 
SchemaCoder.of(joinedSchema)));
+    }
+
+    private static class ConvertToRow extends DoFn<KV<Row, CoGbkResult>, 
KV<Row, Row>> {
+      List<TupleTag<Row>> sortedTags;
+      Map<String, SerializableFunction<Object, Row>> toRows = 
Maps.newHashMap();
+      Schema joinedSchema;
+
+      public ConvertToRow(
+          List<TupleTag<Row>> sortedTags,
+          Map<String, SerializableFunction<Object, Row>> toRows,
+          Schema joinedSchema) {
+        this.sortedTags = sortedTags;
+        this.toRows = toRows;
+        this.joinedSchema = joinedSchema;
+      }
+
+      @ProcessElement
+      public void process(@Element KV<Row, CoGbkResult> kv, 
OutputReceiver<KV<Row, Row>> o) {
+        Row key = kv.getKey();
+        CoGbkResult result = kv.getValue();
+        List<Object> fields = 
Lists.newArrayListWithExpectedSize(sortedTags.size());
+        for (TupleTag<?> tag : sortedTags) {
+          // TODO: This forces the entire join to materialize in memory. We 
should create a
+          // lazy Row interface on top of the iterable returned by 
CoGbkResult. This will
+          // allow the data to be streamed in.
+          SerializableFunction<Object, Row> toRow = toRows.get(tag.getId());
+          List<Row> joined = Lists.newArrayList();
+          for (Object item : result.getAll(tag)) {
+            joined.add(toRow.apply(item));
+          }
+          fields.add(joined);
+        }
+        o.output(KV.of(key, 
Row.withSchema(joinedSchema).addValues(fields).build()));
+      }
+    }
+
+    private static <T> PCollection<KV<Row, Row>> extractKey(
+        PCollection<T> pCollection,
+        Schema schema,
+        Schema keySchema,
+        FieldAccessDescriptor keyFields,
+        String tag) {
+      return pCollection
+          .apply(
+              "extractKey" + tag,
+              ParDo.of(
+                  new DoFn<T, KV<Row, Row>>() {
+                    @ProcessElement
+                    public void process(@Element Row row, 
OutputReceiver<KV<Row, Row>> o) {
+                      o.output(KV.of(Select.selectRow(row, keyFields, schema, 
keySchema), row));
+                    }
+                  }))
+          .setCoder(KvCoder.of(SchemaCoder.of(keySchema), 
SchemaCoder.of(schema)));
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
new file mode 100644
index 00000000000..b3e61218b8a
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CoGroupTest.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.sdk.TestUtils.KvMatcher;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+/** Tests for {@link CoGroup}. */
+public class CoGroupTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  private static final Schema CG_SCHEMA_1 =
+      Schema.builder()
+          .addStringField("user")
+          .addInt32Field("count")
+          .addStringField("country")
+          .build();
+
+  private static final Schema SIMPLE_CG_KEY_SCHEMA =
+      
Schema.builder().addStringField("user").addStringField("country").build();
+  private static final Schema SIMPLE_CG_OUTPUT_SCHEMA =
+      Schema.builder()
+          .addArrayField("pc1", FieldType.row(CG_SCHEMA_1))
+          .addArrayField("pc2", FieldType.row(CG_SCHEMA_1))
+          .addArrayField("pc3", FieldType.row(CG_SCHEMA_1))
+          .build();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCoGroupByFieldNames() {
+    PCollection<Row> pc1 =
+        pipeline
+            .apply(
+                "Create1",
+                Create.of(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 1, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 2, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 3, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 4, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 5, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 6, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 7, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 8, 
"ar").build()))
+            .setRowSchema(CG_SCHEMA_1);
+    PCollection<Row> pc2 =
+        pipeline
+            .apply(
+                "Create2",
+                Create.of(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 9, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 10, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 11, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 12, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 13, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 14, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 15, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 16, 
"ar").build()))
+            .setRowSchema(CG_SCHEMA_1);
+    PCollection<Row> pc3 =
+        pipeline
+            .apply(
+                "Create3",
+                Create.of(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 17, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 18, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 19, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 20, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 21, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 22, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 23, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 24, 
"ar").build()))
+            .setRowSchema(CG_SCHEMA_1);
+
+    Row key1 = Row.withSchema(SIMPLE_CG_KEY_SCHEMA).addValues("user1", 
"us").build();
+    Row key1Joined =
+        Row.withSchema(SIMPLE_CG_OUTPUT_SCHEMA)
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 1, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 2, 
"us").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 9, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 10, 
"us").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 17, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 18, 
"us").build()))
+            .build();
+
+    Row key2 = Row.withSchema(SIMPLE_CG_KEY_SCHEMA).addValues("user1", 
"il").build();
+    Row key2Joined =
+        Row.withSchema(SIMPLE_CG_OUTPUT_SCHEMA)
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 3, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 4, 
"il").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 11, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 12, 
"il").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 19, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 20, 
"il").build()))
+            .build();
+
+    Row key3 = Row.withSchema(SIMPLE_CG_KEY_SCHEMA).addValues("user2", 
"fr").build();
+    Row key3Joined =
+        Row.withSchema(SIMPLE_CG_OUTPUT_SCHEMA)
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 5, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 6, 
"fr").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 13, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 14, 
"fr").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 21, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 22, 
"fr").build()))
+            .build();
+
+    Row key4 = Row.withSchema(SIMPLE_CG_KEY_SCHEMA).addValues("user2", 
"ar").build();
+    Row key4Joined =
+        Row.withSchema(SIMPLE_CG_OUTPUT_SCHEMA)
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 7, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 8, 
"ar").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 15, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 16, 
"ar").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 23, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 24, 
"ar").build()))
+            .build();
+
+    PCollection<KV<Row, Row>> joined =
+        PCollectionTuple.of(new TupleTag<>("pc1"), pc1)
+            .and(new TupleTag<>("pc2"), pc2)
+            .and(new TupleTag<>("pc3"), pc3)
+            .apply("CoGroup", CoGroup.byFieldNames("user", "country"));
+    List<KV<Row, Row>> expected =
+        ImmutableList.of(
+            KV.of(key1, key1Joined),
+            KV.of(key2, key2Joined),
+            KV.of(key3, key3Joined),
+            KV.of(key4, key4Joined));
+    PAssert.that(joined).satisfies(actual -> containsJoinedFields(expected, 
actual));
+    pipeline.run();
+  }
+
+  private static final Schema CG_SCHEMA_2 =
+      Schema.builder()
+          .addStringField("user2")
+          .addInt32Field("count2")
+          .addStringField("country2")
+          .build();
+
+  private static final Schema CG_SCHEMA_3 =
+      Schema.builder()
+          .addStringField("user3")
+          .addInt32Field("count3")
+          .addStringField("country3")
+          .build();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCoGroupByDifferentFields() {
+    PCollection<Row> pc1 =
+        pipeline
+            .apply(
+                "Create1",
+                Create.of(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 1, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 2, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 3, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 4, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 5, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 6, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 7, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 8, 
"ar").build()))
+            .setRowSchema(CG_SCHEMA_1);
+    PCollection<Row> pc2 =
+        pipeline
+            .apply(
+                "Create2",
+                Create.of(
+                    Row.withSchema(CG_SCHEMA_2).addValues("user1", 9, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user1", 10, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user1", 11, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user1", 12, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user2", 13, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user2", 14, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user2", 15, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user2", 16, 
"ar").build()))
+            .setRowSchema(CG_SCHEMA_2);
+    PCollection<Row> pc3 =
+        pipeline
+            .apply(
+                "Create3",
+                Create.of(
+                    Row.withSchema(CG_SCHEMA_3).addValues("user1", 17, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user1", 18, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user1", 19, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user1", 20, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user2", 21, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user2", 22, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user2", 23, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user2", 24, 
"ar").build()))
+            .setRowSchema(CG_SCHEMA_3);
+
+    Row key1 = Row.withSchema(SIMPLE_CG_KEY_SCHEMA).addValues("user1", 
"us").build();
+    Row key1Joined =
+        Row.withSchema(SIMPLE_CG_OUTPUT_SCHEMA)
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 1, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 2, 
"us").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_2).addValues("user1", 9, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user1", 10, 
"us").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_3).addValues("user1", 17, 
"us").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user1", 18, 
"us").build()))
+            .build();
+
+    Row key2 = Row.withSchema(SIMPLE_CG_KEY_SCHEMA).addValues("user1", 
"il").build();
+    Row key2Joined =
+        Row.withSchema(SIMPLE_CG_OUTPUT_SCHEMA)
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 3, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user1", 4, 
"il").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_2).addValues("user1", 11, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user1", 12, 
"il").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_3).addValues("user1", 19, 
"il").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user1", 20, 
"il").build()))
+            .build();
+
+    Row key3 = Row.withSchema(SIMPLE_CG_KEY_SCHEMA).addValues("user2", 
"fr").build();
+    Row key3Joined =
+        Row.withSchema(SIMPLE_CG_OUTPUT_SCHEMA)
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 5, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 6, 
"fr").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_2).addValues("user2", 13, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user2", 14, 
"fr").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_3).addValues("user2", 21, 
"fr").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user2", 22, 
"fr").build()))
+            .build();
+
+    Row key4 = Row.withSchema(SIMPLE_CG_KEY_SCHEMA).addValues("user2", 
"ar").build();
+    Row key4Joined =
+        Row.withSchema(SIMPLE_CG_OUTPUT_SCHEMA)
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 7, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_1).addValues("user2", 8, 
"ar").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_2).addValues("user2", 15, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_2).addValues("user2", 16, 
"ar").build()))
+            .addValue(
+                Lists.newArrayList(
+                    Row.withSchema(CG_SCHEMA_3).addValues("user2", 23, 
"ar").build(),
+                    Row.withSchema(CG_SCHEMA_3).addValues("user2", 24, 
"ar").build()))
+            .build();
+
+    TupleTag<Row> pc1Tag = new TupleTag<>("pc1");
+    TupleTag<Row> pc2Tag = new TupleTag<>("pc2");
+    TupleTag<Row> pc3Tag = new TupleTag<>("pc3");
+
+    PCollection<KV<Row, Row>> joined =
+        PCollectionTuple.of(pc1Tag, pc1)
+            .and(pc2Tag, pc2)
+            .and(pc3Tag, pc3)
+            .apply(
+                "CoGroup",
+                CoGroup.byFieldNames(pc1Tag, "user", "country")
+                    .byFieldNames(pc2Tag, "user2", "country2")
+                    .byFieldNames(pc3Tag, "user3", "country3"));
+
+    List<KV<Row, Row>> expected =
+        ImmutableList.of(
+            KV.of(key1, key1Joined),
+            KV.of(key2, key2Joined),
+            KV.of(key3, key3Joined),
+            KV.of(key4, key4Joined));
+    PAssert.that(joined).satisfies(actual -> containsJoinedFields(expected, 
actual));
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testUnderspecifiedCoGroup() {
+    PCollection<Row> pc1 =
+        pipeline
+            .apply(
+                "Create1",
+                Create.of(Row.withSchema(CG_SCHEMA_1).addValues("user1", 1, 
"us").build()))
+            .setRowSchema(CG_SCHEMA_1);
+    PCollection<Row> pc2 =
+        pipeline
+            .apply(
+                "Create2",
+                Create.of(Row.withSchema(CG_SCHEMA_2).addValues("user1", 9, 
"us").build()))
+            .setRowSchema(CG_SCHEMA_2);
+    PCollection<Row> pc3 =
+        pipeline.apply(
+            "Create3", 
Create.of(Row.withSchema(CG_SCHEMA_3).addValues("user1", 17, "us").build()));
+    TupleTag<Row> pc1Tag = new TupleTag<>("pc1");
+    TupleTag<Row> pc2Tag = new TupleTag<>("pc2");
+    TupleTag<Row> pc3Tag = new TupleTag<>("pc3");
+
+    thrown.expect(IllegalStateException.class);
+    PCollection<KV<Row, Row>> joined =
+        PCollectionTuple.of(pc1Tag, pc1)
+            .and(pc2Tag, pc2)
+            .and(pc3Tag, pc3)
+            .apply(
+                "CoGroup",
+                CoGroup.byFieldNames(pc1Tag, "user", "country")
+                    .byFieldNames(pc2Tag, "user2", "country2"));
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testMismatchingKeys() {
+    PCollection<Row> pc1 =
+        pipeline
+            .apply(
+                "Create1",
+                Create.of(Row.withSchema(CG_SCHEMA_1).addValues("user1", 1, 
"us").build()))
+            .setRowSchema(CG_SCHEMA_1);
+    PCollection<Row> pc2 =
+        pipeline
+            .apply(
+                "Create2",
+                Create.of(Row.withSchema(CG_SCHEMA_1).addValues("user1", 9, 
"us").build()))
+            .setRowSchema(CG_SCHEMA_1);
+
+    TupleTag<Row> pc1Tag = new TupleTag<>("pc1");
+    TupleTag<Row> pc2Tag = new TupleTag<>("pc2");
+    thrown.expect(IllegalStateException.class);
+    PCollection<KV<Row, Row>> joined =
+        PCollectionTuple.of(pc1Tag, pc1)
+            .and(pc2Tag, pc2)
+            .apply("CoGroup", CoGroup.byFieldNames(pc1Tag, 
"user").byFieldNames(pc2Tag, "count"));
+    pipeline.run();
+  }
+
+  private static Void containsJoinedFields(
+      List<KV<Row, Row>> expected, Iterable<KV<Row, Row>> actual) {
+    List<Matcher<? super KV<Row, Row>>> matchers = Lists.newArrayList();
+    for (KV<Row, Row> row : expected) {
+      List<Matcher> fieldMatchers = Lists.newArrayList();
+      Row value = row.getValue();
+      Schema valueSchema = value.getSchema();
+      for (int i = 0; i < valueSchema.getFieldCount(); ++i) {
+        assertEquals(TypeName.ARRAY, 
valueSchema.getField(i).getType().getTypeName());
+        fieldMatchers.add(new ArrayFieldMatchesAnyOrder(i, value.getArray(i)));
+      }
+      matchers.add(
+          KvMatcher.isKv(equalTo(row.getKey()), 
allOf(fieldMatchers.toArray(new Matcher[0]))));
+    }
+    assertThat(actual, containsInAnyOrder(matchers.toArray(new Matcher[0])));
+    return null;
+  }
+
+  static class ArrayFieldMatchesAnyOrder extends BaseMatcher<Row> {
+    int fieldIndex;
+    Row[] expected;
+
+    ArrayFieldMatchesAnyOrder(int fieldIndex, List<Row> expected) {
+      this.fieldIndex = fieldIndex;
+      this.expected = expected.toArray(new Row[0]);
+    }
+
+    @Override
+    public boolean matches(Object item) {
+      if (!(item instanceof Row)) {
+        return false;
+      }
+      Row row = (Row) item;
+      List<Row> actual = row.getArray(fieldIndex);
+      return containsInAnyOrder(expected).matches(actual);
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("arrayFieldMatchesAnyOrder");
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 167675)
    Time Spent: 18h 50m  (was: 18h 40m)

> Create a library of useful transforms that use schemas
> ------------------------------------------------------
>
>                 Key: BEAM-4461
>                 URL: https://issues.apache.org/jira/browse/BEAM-4461
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 18h 50m
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to