Hi Siddhesh,

how to use a ProcessFunction is documented here:

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/

.process() is similar to .map() but with more Flink specific methods available. Anyway, a simple map() should also do the job. But the base architecture mentioned in Stackoverflow is valid.

Here are some resources that might help you during the development:

https://github.com/twalthr/flink-api-examples

https://github.com/ververica/flink-training (this has Scala examples)

I hope this helps.

Regards,
Timo




On 30.12.21 18:09, Siddhesh Kalgaonkar wrote:
Hi Team,

Dominik has answered the question but I am trying to debug the code but since I am new I am not able to understand the code. I think something still needs to be changed in his answer. Can somebody help me to understand that snippet? The user who answered is not much active it seems. I tried testing his answer but it didn't work as expected.

Thanks,
Sid

On Wed, Dec 29, 2021 at 10:58 PM Siddhesh Kalgaonkar <kalgaonkarsiddh...@gmail.com <mailto:kalgaonkarsiddh...@gmail.com>> wrote:

    Hi David,

    Yes, I already mentioned that I am a newbie to Flink and Scala. I am
    making progress as the day progresses. I have modified my question
    again. But I am not sure how to use it. Could you please correct it?
    or add something if I missed something?

    On Wed, Dec 29, 2021 at 10:53 PM David Morávek <d...@apache.org
    <mailto:d...@apache.org>> wrote:

        Hi Siddhesh,

        You can not change the method signature when you're implementing
        an interface.

        I'm not really sure this belongs to the ML anymore as this is
        getting more into Scala / Java fundamentals. There are some
        great learning resources online for Scala [1], I'd recommend
        starting from there. Also if you're are not familiar with Scala
        I'd highly recommend starting with Java API first as it's way
        more intuitive to use with Flink as you don't have to deal with
        Scala / Java interoperability.

        [1]
        
https://docs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala
        
<https://docs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala>

        On Wed, Dec 29, 2021 at 5:59 PM Siddhesh Kalgaonkar
        <kalgaonkarsiddh...@gmail.com
        <mailto:kalgaonkarsiddh...@gmail.com>> wrote:

            I have modified my question based on Dominik's inputs. Can
            somebody help to take it forward?

            Thanks,
            Siddhesh

            On Wed, Dec 29, 2021 at 3:32 PM David Morávek
            <d...@apache.org <mailto:d...@apache.org>> wrote:

                Please always try to include user@f.a.o in your reply,
                so other can participate in the discussion and learn
                from your findings.

                I think Dominik has already given you pretty good hint.
                The JSON parsing in this case is not any different as
                with any other java application (with jackson / gson /
                ...). You can then simply split the parsed elements into
                good and bad records.

                D.

                On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar
                <kalgaonkarsiddh...@gmail.com
                <mailto:kalgaonkarsiddh...@gmail.com>> wrote:

                    Hi David,

                    Thanks for the clarification. I will check the link
                    you shared. Also, as mentioned by Dominik, can you
                    help me with the process functions. How can I use it
                    for my use case?

                    Thanks,
                    Siddhesh

                    On Wed, Dec 29, 2021 at 2:50 PM David Morávek
                    <d...@apache.org <mailto:d...@apache.org>> wrote:

                        Hi Siddhesh,

                        it seems that the question is already being
                        answered in the SO thread, so let's keep the
                        discussion focused there.

                        Looking at the original question, I think it's
                        important to understand, that the
                        TypeInformation is not meant to be used for
                        "runtime" matching, but to address the type
                        erasure [1] limitation for the UDFs (user
                        defined functions), so Flink can pick the
                        correct serializer / deserializer.

                        [1]
                        
https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
                        
<https://docs.oracle.com/javase/tutorial/java/generics/erasure.html>

                        Best,
                        D.

                        On Tue, Dec 28, 2021 at 9:21 PM Siddhesh
                        Kalgaonkar <kalgaonkarsiddh...@gmail.com
                        <mailto:kalgaonkarsiddh...@gmail.com>> wrote:

                            Hi Team,

                            I am a newbie to Flink and Scala and trying
                            my best to learn everything I can. I doing a
                            practice where  I am getting incoming JSON
                            data from the Kafka topic and want to
                            perform a data type check on it.
                            For that, I came across TypeInformation of
                            Flink. Please read my problem in detail from
                            the below link:

                            Flink Problem
                            
<https://stackoverflow.com/questions/70500023/typeinformation-in-flink-to-compare-the-datatypes-dynamically>

                            I went through the documentation but didn't
                            come across any relevant examples. Any
                            suggestions would help.

                            Looking forward to hearing from you.


                            Thanks,
                            Siddhesh


Reply via email to