hanks,
Sid
On Wed, Jan 26, 2022 at 1:52 PM Yun Tang wrote:
> Hi Siddhesh,
>
> The root cause is that the configuration of group.id is missing for the
> Flink program. The configuration of restart strategy has no relationship
> with this.
>
> I think you should pay your atten
I have Flink Kafka Consumer in place which works fine until I add the below
lines:
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3,
*// max failures per unit* Time.of(5, TimeUnit.MINUTES),
*//time interval for measuring failure rate* Time.of(10, TimeUnit.SECONDS)
*// delay*))
It
h line causes the error; as is I can
> only guess.
>
> Please make sure you use the scala variant of the AsyncDataStream
> (org.apache.flink.streaming.api.scala.AsyncDataStream).
>
>
> On 11/01/2022 21:32, Siddhesh Kalgaonkar wrote:
> > I am using below code to get
I am using below code to get the data from the side output which has
filtered records.
So, it goes like this:
val filterRecords: DataStream[String] = src.process(new
ProcessFunction()).getSideOutput(filteredOutputTag)
It has filtered records in it.
Now, I want to add these records to the db
s
> Exception {
> out.collect(value.f0);
> out.collect(value.f1);
> ...;
> out.collect(value.fN);
> }
>
> Best,
> Piotrek
>
> pt., 7 sty 2022 o 07:05 Siddhesh Kalgaonkar
> napisał(a):
>
>> Hi Francis,
>>
>> What I am trying to do is you can
Hey Team,
I have a flow like Kafka Sink Datastream -> Process Function (Separate
Class) -> DBSink(Separate Class).
Process Function returns me the output as a string and now I want to create
a DataStream out of the string variable so that I can call something like
ds.addSink(new DBSink()). For
rflow and the mailing list is to solve a
> question or a problem, the mailing list is not for getting attention. It
> equivalents crossposting, which we rather don't. As David mentioned, time
> is limited and we all try to spent it the best we can.
>
> Best regards,
>
> Martij
for getting attention. It
> equivalents crossposting, which we rather don't. As David mentioned, time
> is limited and we all try to spent it the best we can.
>
> Best regards,
>
> Martijn
>
> Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gma
u please focus your questions on one channel only? (either SO or the
> ML)
>
> this could lead to unnecessary work duplication (which would be shame,
> because the community has limited resources) as people answering on SO
> might not be aware of the ML thread
>
> D.
>
I am trying to achieve exactly one semantics using Flink and Kafka. I have
explained my scenario thoroughly in this post
https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer
Any help is much appreciated!
Thanks,
Sid
m directly, it's used in the
> context of the StreamExecutionEnvironment to create a stream i.e.
> DataStream.map([YourRichMapFunction]) this implies that you already need a
> datastream to transform a datastream using a mapFunction
> (MapFunction/RichMapFunction)
> Francis
>
> On Fri
Hi,
As I am new and I am facing one issue so I came across RichMapFunction. How
can I use RichMapFunction to convert a tuple of strings to datastream? If
not how can I do it apart from using StreamExecutionEnvironment?
Thanks,
Sid
age
msg
}).toString())
}
else {
(parsedJson.toString(), "Good Record...")
}
}
match {
case Success(x) => {
Right(x)
}
case Failure(err) => {
Left(json)
}
}
On Thu, Jan 6, 2022 at 1:43 PM
ype. However your parseJson should return Either[String, String], thus
> causing this issue.
>
>
> Siddhesh Kalgaonkar 于2022年1月5日周三 19:04写道:
>
>> I have written a process function where I am parsing the JSON and if it
>> is not according to the expected format it passes as
lement. For example you can deal with the
> records fed to the sink in the invoke method or clean up the resources in
> the finish method.
>
> Siddhesh Kalgaonkar 于2022年1月6日周四 03:11写道:
>
>> I have implemented a Cassandra sink and when I am trying to call it from
>&
I have implemented a Cassandra sink and when I am trying to call it from
another class via DataStream it is not calling any of the methods. I tried
extending other interfaces like ProcessFunction and it is forcing me to
implement its methods whereas. when it comes to RichSinkFunction it doesn't
I have written a process function where I am parsing the JSON and if it is
not according to the expected format it passes as Failure to the process
function and I print the records which are working fine. Now, I was trying
to print the message and the record in case of Success and Failure. I
After a lot of struggle with the pure Jackson library which doesn't have a
strict mode within it due to which I wasn't able to validate the JSON
schema. I finally found one way of doing it but now I am not able to map
the correct *Success* and *Failure* messages in order to call the Process
tried
testing his answer but it didn't work as expected.
Thanks,
Sid
On Wed, Dec 29, 2021 at 10:58 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com> wrote:
> Hi David,
>
> Yes, I already mentioned that I am a newbie to Flink and Scala. I am
> making progress as the day
ocs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala
>
> On Wed, Dec 29, 2021 at 5:59 PM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> I have modified my question based on Dominik's inputs. Can somebody help
>> to ta
On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
> kalgaonkarsiddh...@gmail.com> wrote:
>
>> Hi David,
>>
>> Thanks for the clarification. I will check the link you shared. Also, as
>> mentioned by Dominik, can you help me with the process functions. How ca
ty good hint. The JSON parsing
> in this case is not any different as with any other java application (with
> jackson / gson / ...). You can then simply split the parsed elements into
> good and bad records.
>
> D.
>
> On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar <
&
Hi David,
Thanks for the clarification. I will check the link you shared. Also, as
mentioned by Dominik, can you help me with the process functions. How can I
use it for my use case?
Thanks,
Siddhesh
On Wed, Dec 29, 2021 at 3:22 PM Siddhesh Kalgaonkar <
kalgaonkarsiddh...@gmail.com>
Hi Team,
I am a newbie to Flink and Scala and trying my best to learn everything I
can. I doing a practice where I am getting incoming JSON data from the
Kafka topic and want to perform a data type check on it.
For that, I came across TypeInformation of Flink. Please read my problem in
detail
24 matches
Mail list logo