Hi Siddhesh,
how to use a ProcessFunction is documented here:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/process_function/
.process() is similar to .map() but with more Flink specific methods
available. Anyway, a simple map() should also do the job. But the base
architecture mentioned in Stackoverflow is valid.
Here are some resources that might help you during the development:
https://github.com/twalthr/flink-api-examples
https://github.com/ververica/flink-training (this has Scala examples)
I hope this helps.
Regards,
Timo
On 30.12.21 18:09, Siddhesh Kalgaonkar wrote:
Hi Team,
Dominik has answered the question but I am trying to debug the code but
since I am new I am not able to understand the code. I think something
still needs to be changed in his answer. Can somebody help me to
understand that snippet? The user who answered is not much active it
seems. I tried testing his answer but it didn't work as expected.
Thanks,
Sid
On Wed, Dec 29, 2021 at 10:58 PM Siddhesh Kalgaonkar
<kalgaonkarsiddh...@gmail.com <mailto:kalgaonkarsiddh...@gmail.com>> wrote:
Hi David,
Yes, I already mentioned that I am a newbie to Flink and Scala. I am
making progress as the day progresses. I have modified my question
again. But I am not sure how to use it. Could you please correct it?
or add something if I missed something?
On Wed, Dec 29, 2021 at 10:53 PM David Morávek <d...@apache.org
<mailto:d...@apache.org>> wrote:
Hi Siddhesh,
You can not change the method signature when you're implementing
an interface.
I'm not really sure this belongs to the ML anymore as this is
getting more into Scala / Java fundamentals. There are some
great learning resources online for Scala [1], I'd recommend
starting from there. Also if you're are not familiar with Scala
I'd highly recommend starting with Java API first as it's way
more intuitive to use with Flink as you don't have to deal with
Scala / Java interoperability.
[1]
https://docs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala
<https://docs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala>
On Wed, Dec 29, 2021 at 5:59 PM Siddhesh Kalgaonkar
<kalgaonkarsiddh...@gmail.com
<mailto:kalgaonkarsiddh...@gmail.com>> wrote:
I have modified my question based on Dominik's inputs. Can
somebody help to take it forward?
Thanks,
Siddhesh
On Wed, Dec 29, 2021 at 3:32 PM David Morávek
<d...@apache.org <mailto:d...@apache.org>> wrote:
Please always try to include user@f.a.o in your reply,
so other can participate in the discussion and learn
from your findings.
I think Dominik has already given you pretty good hint.
The JSON parsing in this case is not any different as
with any other java application (with jackson / gson /
...). You can then simply split the parsed elements into
good and bad records.
D.
On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar
<kalgaonkarsiddh...@gmail.com
<mailto:kalgaonkarsiddh...@gmail.com>> wrote:
Hi David,
Thanks for the clarification. I will check the link
you shared. Also, as mentioned by Dominik, can you
help me with the process functions. How can I use it
for my use case?
Thanks,
Siddhesh
On Wed, Dec 29, 2021 at 2:50 PM David Morávek
<d...@apache.org <mailto:d...@apache.org>> wrote:
Hi Siddhesh,
it seems that the question is already being
answered in the SO thread, so let's keep the
discussion focused there.
Looking at the original question, I think it's
important to understand, that the
TypeInformation is not meant to be used for
"runtime" matching, but to address the type
erasure [1] limitation for the UDFs (user
defined functions), so Flink can pick the
correct serializer / deserializer.
[1]
https://docs.oracle.com/javase/tutorial/java/generics/erasure.html
<https://docs.oracle.com/javase/tutorial/java/generics/erasure.html>
Best,
D.
On Tue, Dec 28, 2021 at 9:21 PM Siddhesh
Kalgaonkar <kalgaonkarsiddh...@gmail.com
<mailto:kalgaonkarsiddh...@gmail.com>> wrote:
Hi Team,
I am a newbie to Flink and Scala and trying
my best to learn everything I can. I doing a
practice where I am getting incoming JSON
data from the Kafka topic and want to
perform a data type check on it.
For that, I came across TypeInformation of
Flink. Please read my problem in detail from
the below link:
Flink Problem
<https://stackoverflow.com/questions/70500023/typeinformation-in-flink-to-compare-the-datatypes-dynamically>
I went through the documentation but didn't
come across any relevant examples. Any
suggestions would help.
Looking forward to hearing from you.
Thanks,
Siddhesh