I’ve replicated this on the non-Portable FlinkRunner and DirectRunner. 

On Dec 18, 2025, at 2:15 PM, Reuven Lax via dev <[email protected]> wrote:


Which runner are you using?

On Thu, Dec 18, 2025 at 2:14 PM Reuven Lax <[email protected]> wrote:
The bytecode generated in DoFnInvoker (ByteBuddyDoFnInvokerFactory) does generate casts to make sure that the elements match. I'm not entirely sure offhand why the same DoFnInvoker is being used 0 seems like something might be going wrong with DoFn caching.

Reuven

On Thu, Dec 18, 2025 at 10:19 AM Byron Ellis <[email protected]> wrote:
Hi all,

I ran into sort of an interesting issue last night. Consider the code below. If you try to run it what will happen is you'll get a ClassCastException on the second Filter.by. What appears to be happening is that the Filter.by DoFnInvoker is being reused... which should be fine since that should be working with Object... but what I can't find is where the casting is happening because it seems like a) the cast isn't actually needed? and b) it's doing the wrong cast. Any clues?

Best,
B

@Test
public void testReusedLambda() {
p.apply(Create.of(new SimpleElement1()))
.apply("First", Filter.by(Objects::nonNull))
.apply(ParDo.of(new VerySimpleDoFn<>()))
.apply("Second", Filter.by(Objects::nonNull));
p.run().waitUntilFinish();
}

static class SimpleElement1 implements Serializable {}

static class SimpleElement2 implements Serializable {}

static class VerySimpleDoFn<I> extends DoFn<I, SimpleElement2> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new SimpleElement2());
}
}


Reply via email to