Hello everyone,

As I'm continuing to remove my usage of Row and replacing it with Pojos,
I'm following the documentation for the CoGroup transform [1].

As per the documentation, I have created a JoinedKey and a JoinedValue,
exactly as the examples given in the documentation except that the key has
propA. B and C.

I then execute this code:
PCollectionTyple.of("lhs", lhs).and("rhs",
rhs).apply(Cogroupby...).apply(Convert.to(Joined.class))

And I get this:
Exception in thread "main" java.lang.RuntimeException: Cannot convert
between types that don't have equivalent schemas. input schema: Fields:
Field{name=key, description=, type=ROW<propA STRING, propB STRING, propC
STRING> NOT NULL, options={{}}}
Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
Encoding positions:
{lhs=1, rhs=2, key=0}
Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema:
Fields:
Field{name=key, description=, type=ROW<propA STRING, propB STRING,
propC STRING> NOT NULL, options={{}}}
Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
Encoding positions:
{lhs=1, rhs=2, key=0}

This is probably because lhs and rhs are Iterable and it's trying to
compare the schemas from the CoGroup Rows for lhs and rhs and the Iterable
properties from the Joined pojo. We should update the documentation as it
doesn't reflect how the code actually behaves (Unless I missed something).

My next step was to try to make the Joined Pojo generic. Like this:
@DefaultSchema(JavaFieldSchema.class)
public class Joined<Lhs, Rhs> {
    public JoinedKey key;
    public Iterable<Lhs> lhs;
    public Iterable<Rhs> rhs;
}

And then execute this code:

var joinedTypeDescriptor = new TypeDescriptor<Joined<MyLhsPojo,
MyRhsPojo>>(){};

   var keyCoder = SchemaCoder.of(keySchema,
TypeDescriptor.of(JoinedKey.class), new
JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new
JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)));
        var valueCoder = SchemaCoder.of(keySchema, joinedTypeDescriptor,
new JavaFieldSchema().toRowFunction(joinedTypeDescriptor), new
JavaFieldSchema().fromRowFunction(joinedTypeDescriptor));
        var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
                .apply(Cogroupby...)
                .apply(Convert.to(joinedTypeDescriptor))

But this give me:
Exception in thread "main" java.lang.ClassCastException: class
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
cannot be cast to class java.lang.Class
(org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
is in unnamed module of loader 'app'; java.lang.Class is in module
java.base of loader 'bootstrap')
at
org.apache.beam.sdk.schemas.GetterBasedSchemaProvider.fromRowFunction(GetterBasedSchemaProvider.java:105)
(irrelevant stacktrace omitted for brevity)

It looks like GetterBasedSchemaProvider.fromRowFunction has an explicit
cast to "Class" but there could be instances where a guava type is passed
in.

So my workaround for now, as elegant as a roadkill, is to do things
manually as below. (actual class names replaced with RhsPojo)

 var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
                .apply(Cogroupby...)
//                .apply(Convert.to(joinedTypeDescriptor))
                .apply(MapElements.into(joinedTypeDescriptor).via(x -> {
                    var key = new
JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)).apply(x.getRow("key"));

                    var lhsList = new ArrayList<LhsType>();
                    var rowJoinedSerializableFunction = new
JavaFieldSchema().fromRowFunction(lhsTypeDescriptor);
                    for (var item : x.<Row>getIterable("lhs")) {
                        var lhsItem =
rowJoinedSerializableFunction.apply(item);
                        lhsList.add(lhsItem);
                    }

                    var rhsList = new ArrayList<RhsPojo>();
                    var rhsFromRowFn = new
JavaFieldSchema().fromRowFunction(TypeDescriptor.of(RhsPojo.class));
                    for (var item : x.<Row>getIterable("rhs")) {
                        var rhsItem = rhsFromRowFn.apply(item);
                        rhsList.add(rhsItem);
                    }

                    var joined = new Joined<LhsType, RhsPojo>();

                    joined.key = key;
                    joined.lhs = lhsList;
                    joined.rhs = rhsList;
                    return joined;
                }));

Please let me know how this whole thing should behave. As with the other
email I sent earlier, I'll be happy to fix things (if they need fixing)
once I deliver some stuff for work. BTW, this is using Beam 2.31.

Thank you for your time reading these walls of texts I'm sending and advice!

Cheers,
Cristian

[1]
http://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/schemas/transforms/CoGroup.html

Reply via email to