Hi Tim,
It is unfortunate that the error message was so minimal, we'll definitely
improve that (FLINK-22809).

Skipping NULL keys is a bit problematic, although technically possible, I'm
not sure that this is how we should handle this.
Let me follow up on that.

The way you can customize the behaviour of that connector without having to
fork StateFun, is to define an ingress with a different deserializer.
You would have to use the StatefulFunctionModule [1][2] and bind an
ingress, you can use the KafkaIngressBuilder [3] and set
KafkaIngressBuilde::withDeserializer()
You would also have to define a router to route these messages to target
functions.

I've prepared a minimal example for you here: [4]

I hope this helps,
Igal.


[1]
https://github.com/apache/flink-statefun/blob/release-2.2/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/spi/StatefulFunctionModule.java
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/index.html
[3]
https://github.com/apache/flink-statefun/blob/release-2.2/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
[4] https://github.com/igalshilman/custom-ingress





On Fri, May 28, 2021 at 8:19 PM Timothy Bess <tdbga...@gmail.com> wrote:

> Ok so after digging into it a bit it seems that the exception was thrown
> here:
>
> https://github.com/apache/flink-statefun/blob/release-2.2/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java#L48
>
> I think it'd be useful to have a configuration to prevent null keys from
> halting processing.
> It looks like we are occasionally publishing with a key string that is
> sometimes empty, and that is interpreted by Kafka as null. Then when it's
> read back in, the ingress chokes on the null value.
>
> I'm trying to keep from having to edit statefun and use my own jar, any
> thoughts?
>
> Thanks,
>
> Tim
>
> On Fri, May 28, 2021 at 10:33 AM Timothy Bess <tdbga...@gmail.com> wrote:
>
>> Oh wow that Harness looks cool, I'll have to take a look at that.
>>
>> Unfortunately the JobManager UI seems to just show this:
>> [image: image.png]
>>
>> Though it does seem that maybe the source function is where the failure
>> is happening according to this?
>> [image: image.png]
>>
>> Still investigating, but I do see a lot of these logs:
>> 2021-05-28 14:25:09,199 WARN
>>  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>> [] - Transaction KafkaTransactionState [transactionalId=feedback-union ->
>> functions -> Sink:
>> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-39,
>> producerId=2062, epoch=2684] has been open for 55399128 ms. This is close
>> to or even exceeding the transaction timeout of 900000 ms.
>>
>> Seems like it's restoring some old kafka transaction? Not sure. I like
>> Arvid's idea of attaching a debugger, I'll definitely give that a try.
>>
>> On Fri, May 28, 2021 at 7:49 AM Arvid Heise <ar...@apache.org> wrote:
>>
>>> If logs are not helping, I think the remaining option is to attach a
>>> debugger [1]. I'd probably add a breakpoint to
>>> LegacySourceFunctionThread#run and see what happens. If the issue is in
>>> recovery, you should add a breakpoint to StreamTask#beforeInvoke.
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters
>>>
>>> On Fri, May 28, 2021 at 1:11 PM Igal Shilman <i...@ververica.com> wrote:
>>>
>>>> Hi Tim,
>>>> Any additional logs from before are highly appreciated, this would help
>>>> us to trace this issue.
>>>> By the way, do you see something in the JobManager's UI?
>>>>
>>>> On Fri, May 28, 2021 at 9:06 AM Tzu-Li (Gordon) Tai <
>>>> tzuli...@apache.org> wrote:
>>>>
>>>>> Hi Timothy,
>>>>>
>>>>> It would indeed be hard to figure this out without any stack traces.
>>>>>
>>>>> Have you tried changing to debug level logs? Maybe you can also try
>>>>> using the StateFun Harness to restore and run your job in the IDE - in 
>>>>> that
>>>>> case you should be able to see which code exactly is throwing this
>>>>> exception.
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>> On Fri, May 28, 2021 at 12:39 PM Timothy Bess <tdbga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Just checking to see if anyone has experienced this error. Might just
>>>>>> be a Flink thing that's irrelevant to statefun, but my job keeps failing
>>>>>> over and over with this message:
>>>>>>
>>>>>> 2021-05-28 03:51:13,001 INFO
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] -
>>>>>> Starting FlinkKafkaInternalProducer (10/10) to produce into default
>>>>>> topic __stateful_functions_random_topic_lNVlkW9SkYrtZ1oK
>>>>>> 2021-05-28 03:51:13,001 INFO
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.
>>>>>> FlinkKafkaInternalProducer [] - Attempting to resume transaction
>>>>>> feedback-union -> functions -> Sink:
>>>>>> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-45
>>>>>> with producerId 31 and epoch 3088
>>>>>> 2021-05-28 03:51:13,017 WARN org.apache.flink.runtime.taskmanager.
>>>>>> Task [] - Source: lead-leads-ingress -> router (leads) (10/10)
>>>>>> (ff51aacdb850c6196c61425b82718862) switched from RUNNING to FAILED.
>>>>>> java.lang.NullPointerException: null
>>>>>>
>>>>>> The null pointer doesn't come with any stack traces or anything. It's
>>>>>> really mystifying. Seems to just fail while restoring continuously.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Tim
>>>>>>
>>>>>

Reply via email to