Re: Possible bug in GetterBasedSchemaProvider.fromRowFunction

2021-09-15 Thread Cristian Constantinescu
I'll give it another shot and let you know if it works. I could update the
documentation accordingly afterwards.

On Wed, Sep 15, 2021 at 6:08 PM Reuven Lax  wrote:

> Yes you would - that's the rationale for adding support for generic types
> in schema inference.
>
> On Wed, Sep 15, 2021 at 3:06 PM Cristian Constantinescu 
> wrote:
>
>> I think I tried that, but can't remember for sure (I'm like 80% sure,
>> sorry for the uncertainty, I've been trying many things for various
>> problems). And it didn't work. However, if I understand this solution
>> correctly, that means that I would have to create these join classes for
>> every type I want to join. Is that right?
>>
>> On Wed, Sep 15, 2021 at 5:56 PM Reuven Lax  wrote:
>>
>>> Could you actually fill in the generic type for Iterable? e.g.
>>> Iterable lhs; I think without that, the schema won't match.
>>>
>>> On Wed, Sep 15, 2021 at 2:52 PM Cristian Constantinescu <
>>> zei...@gmail.com> wrote:
>>>
 Hi Reuven,

 Thanks for getting back to me.

 To answer your question my initial Joined pojo is:

 @DefaultSchema(JavaFieldSchema.class)
   public class JoinedValue {
 public JoinedKey key;

 public Iterable lhs;
 public Iterable rhs;
   }


 Which is exactly the same as the documentation page, minus the field
 names. This is my concern mainly, following the steps documentation does
 not work when running the pipeline. I'll try to set up a sample project to
 illustrate this if you think it would be helpful.

 On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax  wrote:

>
>
> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu <
> zei...@gmail.com> wrote:
>
>> Hello everyone,
>>
>> As I'm continuing to remove my usage of Row and replacing it with
>> Pojos, I'm following the documentation for the CoGroup transform [1].
>>
>> As per the documentation, I have created a JoinedKey and a
>> JoinedValue, exactly as the examples given in the documentation except 
>> that
>> the key has propA. B and C.
>>
>> I then execute this code:
>> PCollectionTyple.of("lhs", lhs).and("rhs",
>> rhs).apply(Cogroupby...).apply(Convert.to(Joined.class))
>>
>> And I get this:
>> Exception in thread "main" java.lang.RuntimeException: Cannot convert
>> between types that don't have equivalent schemas. input schema: Fields:
>> Field{name=key, description=, type=ROW> propC STRING> NOT NULL, options={{}}}
>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Encoding positions:
>> {lhs=1, rhs=2, key=0}
>> Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema:
>> Fields:
>> Field{name=key, description=, type=ROW> propC STRING> NOT NULL, options={{}}}
>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Encoding positions:
>> {lhs=1, rhs=2, key=0}
>>
>> This is probably because lhs and rhs are Iterable and it's trying to
>> compare the schemas from the CoGroup Rows for lhs and rhs and the 
>> Iterable
>> properties from the Joined pojo. We should update the documentation as it
>> doesn't reflect how the code actually behaves (Unless I missed 
>> something).
>>
>
> I'm not sure I  understand the issue here. What exactly does your
> Joined pojo look like?
>
>>
>> My next step was to try to make the Joined Pojo generic. Like this:
>> @DefaultSchema(JavaFieldSchema.class)
>> public class Joined {
>> public JoinedKey key;
>> public Iterable lhs;
>> public Iterable rhs;
>> }
>>
>
> Unfortunately schema inference doesn't work today with generic
> classes. I believe that it's possible to fix this (e.g. we do support 
> Coder
> inference in such cases), but today this won't work.
>
>
>>
>> And then execute this code:
>>
>> var joinedTypeDescriptor = new TypeDescriptor> MyRhsPojo>>(){};
>>
>>var keyCoder = SchemaCoder.of(keySchema,
>> TypeDescriptor.of(JoinedKey.class), new
>> JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new
>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)));
>> var valueCoder = SchemaCoder.of(keySchema,
>> joinedTypeDescriptor, new
>> JavaFieldSchema().toRowFunction(joinedTypeDescriptor), new
>> JavaFieldSchema().fromRowFunction(joinedTypeDescriptor));
>> var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
>> .apply(Cogroupby...)
>> .apply(Convert.to(joinedTypeDescriptor))
>>
>> But this give me:
>> Exception in thread "main" java.lang.ClassCastExcept

Re: Possible bug in GetterBasedSchemaProvider.fromRowFunction

2021-09-15 Thread Cristian Constantinescu
I think I tried that, but can't remember for sure (I'm like 80% sure, sorry
for the uncertainty, I've been trying many things for various problems).
And it didn't work. However, if I understand this solution correctly, that
means that I would have to create these join classes for every type I want
to join. Is that right?

On Wed, Sep 15, 2021 at 5:56 PM Reuven Lax  wrote:

> Could you actually fill in the generic type for Iterable? e.g.
> Iterable lhs; I think without that, the schema won't match.
>
> On Wed, Sep 15, 2021 at 2:52 PM Cristian Constantinescu 
> wrote:
>
>> Hi Reuven,
>>
>> Thanks for getting back to me.
>>
>> To answer your question my initial Joined pojo is:
>>
>> @DefaultSchema(JavaFieldSchema.class)
>>   public class JoinedValue {
>> public JoinedKey key;
>>
>> public Iterable lhs;
>> public Iterable rhs;
>>   }
>>
>>
>> Which is exactly the same as the documentation page, minus the field
>> names. This is my concern mainly, following the steps documentation does
>> not work when running the pipeline. I'll try to set up a sample project to
>> illustrate this if you think it would be helpful.
>>
>> On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax  wrote:
>>
>>>
>>>
>>> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu <
>>> zei...@gmail.com> wrote:
>>>
 Hello everyone,

 As I'm continuing to remove my usage of Row and replacing it with
 Pojos, I'm following the documentation for the CoGroup transform [1].

 As per the documentation, I have created a JoinedKey and a JoinedValue,
 exactly as the examples given in the documentation except that the key has
 propA. B and C.

 I then execute this code:
 PCollectionTyple.of("lhs", lhs).and("rhs",
 rhs).apply(Cogroupby...).apply(Convert.to(Joined.class))

 And I get this:
 Exception in thread "main" java.lang.RuntimeException: Cannot convert
 between types that don't have equivalent schemas. input schema: Fields:
 Field{name=key, description=, type=ROW>>> propC STRING> NOT NULL, options={{}}}
 Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
 Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
 Encoding positions:
 {lhs=1, rhs=2, key=0}
 Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema:
 Fields:
 Field{name=key, description=, type=ROW>>> propC STRING> NOT NULL, options={{}}}
 Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
 Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
 Encoding positions:
 {lhs=1, rhs=2, key=0}

 This is probably because lhs and rhs are Iterable and it's trying to
 compare the schemas from the CoGroup Rows for lhs and rhs and the Iterable
 properties from the Joined pojo. We should update the documentation as it
 doesn't reflect how the code actually behaves (Unless I missed something).

>>>
>>> I'm not sure I  understand the issue here. What exactly does your Joined
>>> pojo look like?
>>>

 My next step was to try to make the Joined Pojo generic. Like this:
 @DefaultSchema(JavaFieldSchema.class)
 public class Joined {
 public JoinedKey key;
 public Iterable lhs;
 public Iterable rhs;
 }

>>>
>>> Unfortunately schema inference doesn't work today with generic classes.
>>> I believe that it's possible to fix this (e.g. we do support Coder
>>> inference in such cases), but today this won't work.
>>>
>>>

 And then execute this code:

 var joinedTypeDescriptor = new TypeDescriptor>>> MyRhsPojo>>(){};

var keyCoder = SchemaCoder.of(keySchema,
 TypeDescriptor.of(JoinedKey.class), new
 JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new
 JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)));
 var valueCoder = SchemaCoder.of(keySchema,
 joinedTypeDescriptor, new
 JavaFieldSchema().toRowFunction(joinedTypeDescriptor), new
 JavaFieldSchema().fromRowFunction(joinedTypeDescriptor));
 var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
 .apply(Cogroupby...)
 .apply(Convert.to(joinedTypeDescriptor))

 But this give me:
 Exception in thread "main" java.lang.ClassCastException: class
 org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
 cannot be cast to class java.lang.Class
 (org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
 is in unnamed module of loader 'app'; java.lang.Class is in module
 java.base of loader 'bootstrap')
 at
 org.apache.beam.sdk.schemas.GetterBasedSchemaProvider.fromRowFunction(GetterBasedSchemaProvider.java:105)
 (irrelevant stacktrace omitted for brevity)

 It looks like GetterBasedSchemaProvider.fromRowFunction has an explicit
 cast to

Re: Possible bug in GetterBasedSchemaProvider.fromRowFunction

2021-09-15 Thread Cristian Constantinescu
Hi Reuven,

Thanks for getting back to me.

To answer your question my initial Joined pojo is:

@DefaultSchema(JavaFieldSchema.class)
  public class JoinedValue {
public JoinedKey key;

public Iterable lhs;
public Iterable rhs;
  }


Which is exactly the same as the documentation page, minus the field names.
This is my concern mainly, following the steps documentation does not work
when running the pipeline. I'll try to set up a sample project to
illustrate this if you think it would be helpful.

On Wed, Sep 15, 2021 at 1:08 PM Reuven Lax  wrote:

>
>
> On Sun, Sep 12, 2021 at 7:01 PM Cristian Constantinescu 
> wrote:
>
>> Hello everyone,
>>
>> As I'm continuing to remove my usage of Row and replacing it with Pojos,
>> I'm following the documentation for the CoGroup transform [1].
>>
>> As per the documentation, I have created a JoinedKey and a JoinedValue,
>> exactly as the examples given in the documentation except that the key has
>> propA. B and C.
>>
>> I then execute this code:
>> PCollectionTyple.of("lhs", lhs).and("rhs",
>> rhs).apply(Cogroupby...).apply(Convert.to(Joined.class))
>>
>> And I get this:
>> Exception in thread "main" java.lang.RuntimeException: Cannot convert
>> between types that don't have equivalent schemas. input schema: Fields:
>> Field{name=key, description=, type=ROW> STRING> NOT NULL, options={{}}}
>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Encoding positions:
>> {lhs=1, rhs=2, key=0}
>> Options:{{}}UUID: 40cef697-152b-4f3b-9110-e32a921915ca output schema:
>> Fields:
>> Field{name=key, description=, type=ROW> propC STRING> NOT NULL, options={{}}}
>> Field{name=lhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Field{name=rhs, description=, type=ITERABLE NOT NULL, options={{}}}
>> Encoding positions:
>> {lhs=1, rhs=2, key=0}
>>
>> This is probably because lhs and rhs are Iterable and it's trying to
>> compare the schemas from the CoGroup Rows for lhs and rhs and the Iterable
>> properties from the Joined pojo. We should update the documentation as it
>> doesn't reflect how the code actually behaves (Unless I missed something).
>>
>
> I'm not sure I  understand the issue here. What exactly does your Joined
> pojo look like?
>
>>
>> My next step was to try to make the Joined Pojo generic. Like this:
>> @DefaultSchema(JavaFieldSchema.class)
>> public class Joined {
>> public JoinedKey key;
>> public Iterable lhs;
>> public Iterable rhs;
>> }
>>
>
> Unfortunately schema inference doesn't work today with generic classes. I
> believe that it's possible to fix this (e.g. we do support Coder inference
> in such cases), but today this won't work.
>
>
>>
>> And then execute this code:
>>
>> var joinedTypeDescriptor = new TypeDescriptor> MyRhsPojo>>(){};
>>
>>var keyCoder = SchemaCoder.of(keySchema,
>> TypeDescriptor.of(JoinedKey.class), new
>> JavaFieldSchema().toRowFunction(TypeDescriptor.of(JoinedKey.class)), new
>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)));
>> var valueCoder = SchemaCoder.of(keySchema, joinedTypeDescriptor,
>> new JavaFieldSchema().toRowFunction(joinedTypeDescriptor), new
>> JavaFieldSchema().fromRowFunction(joinedTypeDescriptor));
>> var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
>> .apply(Cogroupby...)
>> .apply(Convert.to(joinedTypeDescriptor))
>>
>> But this give me:
>> Exception in thread "main" java.lang.ClassCastException: class
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
>> cannot be cast to class java.lang.Class
>> (org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.Types$ParameterizedTypeImpl
>> is in unnamed module of loader 'app'; java.lang.Class is in module
>> java.base of loader 'bootstrap')
>> at
>> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider.fromRowFunction(GetterBasedSchemaProvider.java:105)
>> (irrelevant stacktrace omitted for brevity)
>>
>> It looks like GetterBasedSchemaProvider.fromRowFunction has an explicit
>> cast to "Class" but there could be instances where a guava type is passed
>> in.
>>
>> So my workaround for now, as elegant as a roadkill, is to do things
>> manually as below. (actual class names replaced with RhsPojo)
>>
>>  var mixed = PCollectionTyple.of("lhs", lhs).and("rhs", rhs)
>> .apply(Cogroupby...)
>> //.apply(Convert.to(joinedTypeDescriptor))
>> .apply(MapElements.into(joinedTypeDescriptor).via(x -> {
>> var key = new
>> JavaFieldSchema().fromRowFunction(TypeDescriptor.of(JoinedKey.class)).apply(x.getRow("key"));
>>
>> var lhsList = new ArrayList();
>> var rowJoinedSerializableFunction = new
>> JavaFieldSchema().fromRowFunction(lhsTypeDescriptor);
>> for (var item : x.getIterable("lhs")) {
>>