Hi Frank, This sounds like an interesting issue. Can you share a minimal working example?
Best regards, Niklas > On 9. Feb 2022, at 23:11, Frank Dekervel <fr...@kapernikov.com> wrote: > > Hello, > > When trying to reproduce a bug, we made a DeserialisationSchema that throws > an exception when a malformed message comes in. > Then, we sent a malformed message together with a number of well formed > messages to see what happens. > > val source = KafkaSource.builder[OurMessage]() > .setValueOnlyDeserializer(new BuggySchema(tolerateInvalidIncomingJSON)) > .setBootstrapServers(bootstrap_servers) > .setTopics("mqtt") > .setGroupId("flink") > .setProperty("isolation.level","read_committed") > .setStartingOffsets(OffsetsInitializer.latest()) > .setProperty("transaction.timeout.ms", "900000") > .setProperty("partition.discovery.interval.ms", "60000") > .build; > to simulate our slow API we did this: > > val pusher = alarms.map(x => {Thread.sleep(8000); x.toString()}) > pusher.sinkTo(buildAtLeastOnceStringSink(bootstrap_servers, p, > "alm_log")).uid("almlog").name("almlog") > > Then, we injected a lot of messages, and also one invalid message. We > repeated this test multiple times. And this is where things got weird: i > would expect the job to fail, restart and fail again (since, upon restarting, > it should reprocess the same invalid message). > Sometimes this indeed happens, but sometimes we get the exception only once > and then the application remains in "running" state without continuous > crashing and recovery. I think this is somehow related to the earlier issue > we saw with the duplicated messages. > > However, that's not what happens always: sometimes the job fails, restarts > and then keeps running. And sometimes, it goes into my (expected) restart > loop. Until i understand what's going on, i disabled flink task failure > recovery, and i rely on the flink k8s operator to restart the job on failure. > But i'd like to understand what's happening. > > As a side note: in our pipeline we switch a couple of times between the table > API and the datastream API. could that influence the failure zone > determination? > > Thanks! > > Greetings, > Frank > > >