I’ve replicated this on the non-Portable FlinkRunner and DirectRunner. Which runner are you using? The bytecode generated in DoFnInvoker (ByteBuddyDoFnInvokerFactory) does generate casts to make sure that the elements match. I'm not entirely sure offhand why the same DoFnInvoker is being used 0 seems like something might be going wrong with DoFn caching.
Reuven
Hi all,
I ran into sort of an interesting issue last night. Consider the code below. If you try to run it what will happen is you'll get a ClassCastException on the second Filter.by. What appears to be happening is that the Filter.by DoFnInvoker is being reused... which should be fine since that should be working with Object... but what I can't find is where the casting is happening because it seems like a) the cast isn't actually needed? and b) it's doing the wrong cast. Any clues?
Best, B
@Test public void testReusedLambda() { p.apply(Create.of(new SimpleElement1())) .apply("First", Filter.by(Objects::nonNull)) .apply(ParDo.of(new VerySimpleDoFn<>())) .apply("Second", Filter.by(Objects::nonNull)); p.run().waitUntilFinish(); }
static class SimpleElement1 implements Serializable {}
static class SimpleElement2 implements Serializable {}
static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> { @ProcessElement public void processElement(ProcessContext c) { c.output(new SimpleElement2()); } }
|