Hi Aljoscha,

this is probably the same problem I am facing.

I execute multiple pipelines on the same Flink cluster - all launched at
the same time...

I guess I can try to switch to SerializableCoder and see how that works?

thanks



On Fri, 28 Apr 2017 at 16:20 Aljoscha Krettek <[email protected]> wrote:

> Hi,
> There is this open issue: https://issues.apache.org/jira/browse/BEAM-1970.
> Could this also be what is affecting you? Are you running several pipelines
> on the same Flink cluster, either one after another or at the same time?
>
> Best,
> Aljoscha
>
> On 28. Apr 2017, at 12:45, Borisa Zivkovic <[email protected]>
> wrote:
>
> Hi,
>
> I have this small pipeline that is writing data to Kafka (using AvroCoder)
> and then another job is reading the same data from Kafka, doing few
> transformations and then writing data back to different Kafka topic
> (AvroCoder again).
>
> First pipeline is very simple, read data from a text file, create POJO,
> use AvroCoder to write POJO to Kafka.
>
> Second pipeline is also simple, read POJO from Kafka, do few
> transformations, create new POJO and write data to Kafka using AvroCoder
> again.
>
> When I use direct runner everything is ok.
>
> When I switch to flink runner (small remote flink cluster) I get this
> exception in the second pipeline
>
> Caused by: java.lang.ClassCastException: test.MyClass cannot be cast to
> test.MyClass
>
> This happens in the the first MapFunction immediately after reading data
> from Kafka.
>
> I found about this problem in Flink and how they resolve it but not sure
> how to fix this when using Beam?!
>
> https://issues.apache.org/jira/browse/FLINK-1390
>
> test.MyClass has annotation @DefaultCoder(AvroCoder.class) and is very
> simple POJO.
>
> Not sure how to fix this and still continue using AvroCoder.
>
> My beam version is 0.6.0 - my flink version is 1.2.0
>
> Anyone experienced something similar or has idea how to fix/workaround
> this?
>
> thanks
>
>
>

Reply via email to