Re: KV with AutoValueSchema

2024-04-09 Thread Ruben Vargas
Awesome, thanks for the info! It worked like a charm!

On Thu, Apr 4, 2024 at 9:49 PM Reuven Lax  wrote:
>
> There are ways you can manually force the coder here. However I would first 
> try to split up the KV creation into two operations. Have ProcessEvents just 
> create a PCollection, and then a following operation to 
> create the KV. Something like this:
>
> input.apply(ParDo.of(New ProcessEvents()))
> .apply(WithKeys.of((SerializableFunction) 
> ExtractKeyFunction).withKeyType(TypeDescriptors.longs()));
>
> I suspect that this will allow the mechanism to better infer the final Coder. 
> If that doesn't work, you could always brute force it like this:
>
> PCollection coreEvents = input.apply(ParDo.of(New 
> ProcessEvents()));
> coreEvents.apply(WithKeys.of((SerializableFunction) 
> ExtractKeyFunction).withKeyType(TypeDescriptors.longs()))
>  .setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder()))
>  .apply(Reshuffle.of())
>  ... etc.
>
>
> On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas  wrote:
>>
>> ProcessEvents receive as an input a Session object and créate a KV> SharedCoreEvent> as an output
>>
>> El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user 
>>  escribió:
>>>
>>> There are some sharp edges unfortunately around auto-inference of KV coders 
>>> and schemas. Is there a previous PCollection of type SharedCoreEvent, or is 
>>> the SharedCoreEvent created in ProcessEvents?
>>>
>>> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas  wrote:

 Hello guys

 I have a question, is it possible to use KV along with AutoValueSchema
 objects? I'm having troubles when I try to use it together.

 I have an object like the following

 @AutoValue
 @DefaultSchema(AutoValueSchema.class)
 public abstract class SharedCoreEvent {

 @JsonProperty("subscriptionId")
 public abstract String getSubscription();

 
 }

 Then I have a pipeline like the following:

  input.apply(ParDo.of(new ProcessEvents()))
 .apply(Reshuffle.of()).apply(Values.create()).apply(output);

 My input is a single object and my ProcessEvents will produce tons of
 events, in a fan-out fashion. that is why I used Reshuffle here

 But when I run this pipeline it throws the following error:

 lang.IllegalStateException: Unable to return a default Coder for
 MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
 [PCollection@2131266396]. Correct one of the following root causes:
   No Coder has been manually specified;  you may do so using .setCoder().

   Inferring a Coder from the CoderRegistry failed: Cannot provide
 coder for parameterized type
 org.apache.beam.sdk.values.KV:
 Unable to provide a Coder for events.SharedCoreEvent
   Building a Coder using a registered CoderProvider failed.


 Something similar happens with my source when I use KafkaIO and the
 source produces a KV  PCollection.

 Is there any reason for this? Maybe I'm misusing the schemas?

 Really appreciate your help

 Thanks
 Ruben


Re: How to handle Inheritance with AutoValueSchema

2024-04-09 Thread Reuven Lax via user
I don't see any unit tests for inherited AutoValue accessors, so I suspect
it simply does not work today with AutoValueSchema. This is something
that's probably fixable (though such a fix does risk breaking some users).

On Mon, Apr 8, 2024 at 11:21 PM Ruben Vargas 
wrote:

> Hello Guys
>
> I have a PCollection with a "Session" object, inside that object I
> have a list of events
>
> For each event, I have different types, EventA, EventB, EventC and so
> on.. all of them extend from Event, which will contain common fields.
>
> According to the AutoValue documentation, inheritance from another
> AutoValue class is not supported. but extend to have the fields is.
> (
> https://github.com/google/auto/blob/main/value/userguide/howto.md#-have-autovalue-also-implement-abstract-methods-from-my-supertypes
> )
>
> But when I run my pipeline, it fails with an NPE.
>
> Caused by: java.lang.NullPointerException
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.createGetter(JavaBeanUtils.java:153)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$1(JavaBeanUtils.java:143)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> ~[?:?]
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
> ~[?:?]
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> ~[?:?]
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> ~[?:?]
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> ~[?:?]
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> ~[?:?]
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> ~[?:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$2(JavaBeanUtils.java:144)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
> ~[?:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.getGetters(JavaBeanUtils.java:138)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.AutoValueSchema.fieldValueGetters(AutoValueSchema.java:93)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:145)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:130)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
> ~[beam-sdks-java-core-2.55.0.jar:?]
>
>
> Is this not supported? or is it a Bug?  should I file an issue in GH then?
>
> Thanks
>


Is Pulsar IO Connector Officially Supported?

2024-04-09 Thread Vince Castello via user
I see that a Pulsar connector was made available as of BEAM 2.38.0 release
but I don't see Pulsar as an official connector on the page below. Is the
Pulsar IO connector official or not? If official then can someone please
update the page since it gives the idea that a Pulsar IO connector is not
available.


https://beam.apache.org/documentation/io/connectors/


Thanks


--Vince