Re: KV with AutoValueSchema
Awesome, thanks for the info! It worked like a charm! On Thu, Apr 4, 2024 at 9:49 PM Reuven Lax wrote: > > There are ways you can manually force the coder here. However I would first > try to split up the KV creation into two operations. Have ProcessEvents just > create a PCollection, and then a following operation to > create the KV. Something like this: > > input.apply(ParDo.of(New ProcessEvents())) > .apply(WithKeys.of((SerializableFunction) > ExtractKeyFunction).withKeyType(TypeDescriptors.longs())); > > I suspect that this will allow the mechanism to better infer the final Coder. > If that doesn't work, you could always brute force it like this: > > PCollection coreEvents = input.apply(ParDo.of(New > ProcessEvents())); > coreEvents.apply(WithKeys.of((SerializableFunction) > ExtractKeyFunction).withKeyType(TypeDescriptors.longs())) > .setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder())) > .apply(Reshuffle.of()) > ... etc. > > > On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas wrote: >> >> ProcessEvents receive as an input a Session object and créate a KV> SharedCoreEvent> as an output >> >> El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user >> escribió: >>> >>> There are some sharp edges unfortunately around auto-inference of KV coders >>> and schemas. Is there a previous PCollection of type SharedCoreEvent, or is >>> the SharedCoreEvent created in ProcessEvents? >>> >>> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas wrote: Hello guys I have a question, is it possible to use KV along with AutoValueSchema objects? I'm having troubles when I try to use it together. I have an object like the following @AutoValue @DefaultSchema(AutoValueSchema.class) public abstract class SharedCoreEvent { @JsonProperty("subscriptionId") public abstract String getSubscription(); } Then I have a pipeline like the following: input.apply(ParDo.of(new ProcessEvents())) .apply(Reshuffle.of()).apply(Values.create()).apply(output); My input is a single object and my ProcessEvents will produce tons of events, in a fan-out fashion. that is why I used Reshuffle here But when I run this pipeline it throws the following error: lang.IllegalStateException: Unable to return a default Coder for MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output [PCollection@2131266396]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV: Unable to provide a Coder for events.SharedCoreEvent Building a Coder using a registered CoderProvider failed. Something similar happens with my source when I use KafkaIO and the source produces a KV PCollection. Is there any reason for this? Maybe I'm misusing the schemas? Really appreciate your help Thanks Ruben
Re: How to handle Inheritance with AutoValueSchema
I don't see any unit tests for inherited AutoValue accessors, so I suspect it simply does not work today with AutoValueSchema. This is something that's probably fixable (though such a fix does risk breaking some users). On Mon, Apr 8, 2024 at 11:21 PM Ruben Vargas wrote: > Hello Guys > > I have a PCollection with a "Session" object, inside that object I > have a list of events > > For each event, I have different types, EventA, EventB, EventC and so > on.. all of them extend from Event, which will contain common fields. > > According to the AutoValue documentation, inheritance from another > AutoValue class is not supported. but extend to have the fields is. > ( > https://github.com/google/auto/blob/main/value/userguide/howto.md#-have-autovalue-also-implement-abstract-methods-from-my-supertypes > ) > > But when I run my pipeline, it fails with an NPE. > > Caused by: java.lang.NullPointerException > at > org.apache.beam.sdk.schemas.utils.JavaBeanUtils.createGetter(JavaBeanUtils.java:153) > ~[beam-sdks-java-core-2.55.0.jar:?] > at > org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$1(JavaBeanUtils.java:143) > ~[beam-sdks-java-core-2.55.0.jar:?] > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > ~[?:?] > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > ~[?:?] > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > ~[?:?] > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > ~[?:?] > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) > ~[?:?] > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > ~[?:?] > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) > ~[?:?] > at > org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$2(JavaBeanUtils.java:144) > ~[beam-sdks-java-core-2.55.0.jar:?] > at > java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) > ~[?:?] > at > org.apache.beam.sdk.schemas.utils.JavaBeanUtils.getGetters(JavaBeanUtils.java:138) > ~[beam-sdks-java-core-2.55.0.jar:?] > at > org.apache.beam.sdk.schemas.AutoValueSchema.fieldValueGetters(AutoValueSchema.java:93) > ~[beam-sdks-java-core-2.55.0.jar:?] > at > org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:145) > ~[beam-sdks-java-core-2.55.0.jar:?] > at > org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:130) > ~[beam-sdks-java-core-2.55.0.jar:?] > at > org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56) > ~[beam-sdks-java-core-2.55.0.jar:?] > > > Is this not supported? or is it a Bug? should I file an issue in GH then? > > Thanks >
Is Pulsar IO Connector Officially Supported?
I see that a Pulsar connector was made available as of BEAM 2.38.0 release but I don't see Pulsar as an official connector on the page below. Is the Pulsar IO connector official or not? If official then can someone please update the page since it gives the idea that a Pulsar IO connector is not available. https://beam.apache.org/documentation/io/connectors/ Thanks --Vince