Hi Frank,

This sounds like an interesting issue. Can you share a minimal working example?

Best regards,
Niklas

> On 9. Feb 2022, at 23:11, Frank Dekervel <fr...@kapernikov.com> wrote:
> 
> Hello,
> 
> When trying to reproduce a bug, we made a DeserialisationSchema that throws 
> an exception when a malformed message comes in.
> Then, we sent a malformed message together with a number of well formed 
> messages to see what happens.
> 
>     val source = KafkaSource.builder[OurMessage]()
>       .setValueOnlyDeserializer(new BuggySchema(tolerateInvalidIncomingJSON))
>       .setBootstrapServers(bootstrap_servers)
>       .setTopics("mqtt")
>       .setGroupId("flink")
>       .setProperty("isolation.level","read_committed")
>       .setStartingOffsets(OffsetsInitializer.latest())
>       .setProperty("transaction.timeout.ms", "900000")
>       .setProperty("partition.discovery.interval.ms", "60000")
>       .build;
> to simulate our slow API we did this:
> 
> val pusher = alarms.map(x => {Thread.sleep(8000); x.toString()})
> pusher.sinkTo(buildAtLeastOnceStringSink(bootstrap_servers, p, 
> "alm_log")).uid("almlog").name("almlog")
> 
> Then, we injected a lot of messages, and also one invalid message. We 
> repeated this test multiple times. And this is where things got weird: i 
> would expect the job to fail, restart and fail again (since, upon restarting, 
> it should reprocess the same invalid message). 
> Sometimes this indeed happens, but sometimes we get the exception only once 
> and then the application remains in "running" state without continuous 
> crashing and recovery. I think this is somehow related to the earlier issue 
> we saw with the duplicated messages.
> 
> However, that's not what happens always: sometimes the job fails, restarts 
> and then keeps running. And sometimes, it goes into my (expected) restart 
> loop. Until i understand what's going on, i disabled flink task failure 
> recovery, and i rely on the flink k8s operator to restart the job on failure. 
> But i'd like to understand what's happening.
> 
> As a side note: in our pipeline we switch a couple of times between the table 
> API and the datastream API. could that influence the failure zone 
> determination?
> 
> Thanks!
> 
> Greetings,
> Frank
> 
> 
> 

Reply via email to