If I’m reading a parquet or avro file though, I don’t have a KV<K,Data1>, I 
have a Data. Do I need to run a pardo just to extract the keys for this to work?

PCollection<GenericRecord> data;
PCollection<KV<String,GenericRecord>> keyedData = “data par do’ed to create KV 
for each GenericRecord, extracting possibly multiple field PKs encoded as a 
string”

Then do the stuff below. This seems pretty expensive (serialization wise) 
compared with the flink Keyextractor for example or is it similar in practice?

Thanks Thomas.

From: Thomas Groh [mailto:[email protected]]
Sent: Wednesday, March 22, 2017 1:53 PM
To: [email protected]
Subject: Re: Apache Beam cogroup help

This would be implemented via a CoGroupByKey 
(https://beam.apache.org/documentation/sdks/javadoc/0.6.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_sdks_javadoc_0.6.0_org_apache_beam_sdk_transforms_join_CoGroupByKey.html&d=DgMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=VCTyC80DAMVHxYsuHzJWAjE2ZNyla5UuH5hq_n11cz4&s=P1-5J5zUCr8YHmuQLDNbYWP4SrmkoG7KZBR9UbWVw7Q&e=>)

Your transform logic will be mostly the same; after applying the extraction 
(the right side of k1 and k2 in your example), you should have two PCollections 
of KVs -

PCollection<KV<K, Data1>> k1;
PCollection<KV<K, Data2>> k2;

You can construct a KeyedPCollectionTuple containing the two PCollections:

final TupleTag<Data1> data1Tag = new TupleTag<>();
final TupleTag<Data2> data2Tag = new TupleTag<>();
KeyedPCollectionTuple<K> coGroupTuple = KeyedPCollectionTuple.of(data1Tag, 
k1).and(data2Tag, k2);

Then apply the CoGroupByKey:

PColection<KV<K, CoGroupByKeyResult>> coGrouped = 
coGroupTuple.apply(CoGroupByKey.<K>create());

Then you can run an arbitrary ParDo to combine the elements as appropriate. 
You'll need to reuse the TupleTags created above to extract out the 
per-PCollection outputs. As a simple example where the elements have a shared 
supertype CombinedData, and you'd like to add them to a single output list:

PCollection<KV<K, List<CombinedData>> combined = coGrouped.apply(ParDo.of(new 
DoFn<KV<K, CoGroupByKeyResult>, KV<K, List<CombinedData>>>() {
  @ProcessElement
  public void process(ProcessContext context) {
    List<CombinedData> all = new ArrayList<>();
    for (Data1 d1 : context.element().value().getAll(data1Tag)) {
      all.add(d1);
    }
    for (Data2 d2 : context.element().value().getAll(data2Tag)) {
      all.add(d2);
    }
    context.output(all);
  }
}));

On Wed, Mar 22, 2017 at 10:35 AM, Newport, Billy 
<[email protected]<mailto:[email protected]>> wrote:
Trying to port flink code to Apache Beam but I’m having trouble decoding the 
documentation.

I have flink code which looks like:

DataSet<GenericRecord> d1 = Read parquet
DataSet<GenericRecord> d2 = Read Avro
KeyExtractor<GenericRecord> k1 = … (extracts an object containing the key 
fields from d1 records)
KeyExtractor<GenericRecord> k2 = … (extracts an object containing the key 
fields from d2 records)

CoGroup<GenericRecord,GenericRecord,GenericRecord> grouper = (combines values 
for equal keys in to a combined list for that key)

DataSet<GenericRecord> combined = 
d1.coGroup(d2).where(k1).equalTo(k2).with(grouper)

Whats the beam equivalent?

Thanks




Reply via email to