Re: Failure Restart Strategy leads to error

2022-01-26 Thread Siddhesh Kalgaonkar
hanks, Sid On Wed, Jan 26, 2022 at 1:52 PM Yun Tang wrote: > Hi Siddhesh, > > The root cause is that the configuration of group.id is missing for the > Flink program. The configuration of restart strategy has no relationship > with this. > > I think you should pay your atten

Failure Restart Strategy leads to error

2022-01-25 Thread Siddhesh Kalgaonkar
I have Flink Kafka Consumer in place which works fine until I add the below lines: env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, *// max failures per unit* Time.of(5, TimeUnit.MINUTES), *//time interval for measuring failure rate* Time.of(10, TimeUnit.SECONDS) *// delay*)) It

Re: Async IO code not working

2022-01-12 Thread Siddhesh Kalgaonkar
h line causes the error; as is I can > only guess. > > Please make sure you use the scala variant of the AsyncDataStream > (org.apache.flink.streaming.api.scala.AsyncDataStream). > > > On 11/01/2022 21:32, Siddhesh Kalgaonkar wrote: > > I am using below code to get

Async IO code not working

2022-01-11 Thread Siddhesh Kalgaonkar
I am using below code to get the data from the side output which has filtered records. So, it goes like this: val filterRecords: DataStream[String] = src.process(new ProcessFunction()).getSideOutput(filteredOutputTag) It has filtered records in it. Now, I want to add these records to the db

Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-10 Thread Siddhesh Kalgaonkar
s > Exception { > out.collect(value.f0); > out.collect(value.f1); > ...; > out.collect(value.fN); > } > > Best, > Piotrek > > pt., 7 sty 2022 o 07:05 Siddhesh Kalgaonkar > napisał(a): > >> Hi Francis, >> >> What I am trying to do is you can

Creating DS in low level conversion operators i.e ProcessFunctions

2022-01-08 Thread Siddhesh Kalgaonkar
Hey Team, I have a flow like Kafka Sink Datastream -> Process Function (Separate Class) -> DBSink(Separate Class). Process Function returns me the output as a string and now I want to create a DataStream out of the string variable so that I can call something like ds.addSink(new DBSink()). For

Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
rflow and the mailing list is to solve a > question or a problem, the mailing list is not for getting attention. It > equivalents crossposting, which we rather don't. As David mentioned, time > is limited and we all try to spent it the best we can. > > Best regards, > > Martij

Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
for getting attention. It > equivalents crossposting, which we rather don't. As David mentioned, time > is limited and we all try to spent it the best we can. > > Best regards, > > Martijn > > Op vr 7 jan. 2022 om 16:04 schreef Siddhesh Kalgaonkar < > kalgaonkarsiddh...@gma

Re: Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
u please focus your questions on one channel only? (either SO or the > ML) > > this could lead to unnecessary work duplication (which would be shame, > because the community has limited resources) as people answering on SO > might not be aware of the ML thread > > D. >

Exactly Once Semantics

2022-01-07 Thread Siddhesh Kalgaonkar
I am trying to achieve exactly one semantics using Flink and Kafka. I have explained my scenario thoroughly in this post https://stackoverflow.com/questions/70622321/exactly-once-in-flink-kafka-producer-and-consumer Any help is much appreciated! Thanks, Sid

Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-06 Thread Siddhesh Kalgaonkar
m directly, it's used in the > context of the StreamExecutionEnvironment to create a stream i.e. > DataStream.map([YourRichMapFunction]) this implies that you already need a > datastream to transform a datastream using a mapFunction > (MapFunction/RichMapFunction) > Francis > > On Fri

RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-06 Thread Siddhesh Kalgaonkar
Hi, As I am new and I am facing one issue so I came across RichMapFunction. How can I use RichMapFunction to convert a tuple of strings to datastream? If not how can I do it apart from using StreamExecutionEnvironment? Thanks, Sid

Re: Passing msg and record to the process function

2022-01-06 Thread Siddhesh Kalgaonkar
age msg }).toString()) } else { (parsedJson.toString(), "Good Record...") } } match { case Success(x) => { Right(x) } case Failure(err) => { Left(json) } } On Thu, Jan 6, 2022 at 1:43 PM

Re: Passing msg and record to the process function

2022-01-06 Thread Siddhesh Kalgaonkar
ype. However your parseJson should return Either[String, String], thus > causing this issue. > > > Siddhesh Kalgaonkar 于2022年1月5日周三 19:04写道: > >> I have written a process function where I am parsing the JSON and if it >> is not according to the expected format it passes as

Re: extending RichSinkFunction doesn't force to implement any of its methods

2022-01-06 Thread Siddhesh Kalgaonkar
lement. For example you can deal with the > records fed to the sink in the invoke method or clean up the resources in > the finish method. > > Siddhesh Kalgaonkar 于2022年1月6日周四 03:11写道: > >> I have implemented a Cassandra sink and when I am trying to call it from >&

extending RichSinkFunction doesn't force to implement any of its methods

2022-01-05 Thread Siddhesh Kalgaonkar
I have implemented a Cassandra sink and when I am trying to call it from another class via DataStream it is not calling any of the methods. I tried extending other interfaces like ProcessFunction and it is forcing me to implement its methods whereas. when it comes to RichSinkFunction it doesn't

Passing msg and record to the process function

2022-01-05 Thread Siddhesh Kalgaonkar
I have written a process function where I am parsing the JSON and if it is not according to the expected format it passes as Failure to the process function and I print the records which are working fine. Now, I was trying to print the message and the record in case of Success and Failure. I

Error while writing process functions

2022-01-04 Thread Siddhesh Kalgaonkar
After a lot of struggle with the pure Jackson library which doesn't have a strict mode within it due to which I wasn't able to validate the JSON schema. I finally found one way of doing it but now I am not able to map the correct *Success* and *Failure* messages in order to call the Process

Re: TypeInformation | Flink

2021-12-30 Thread Siddhesh Kalgaonkar
tried testing his answer but it didn't work as expected. Thanks, Sid On Wed, Dec 29, 2021 at 10:58 PM Siddhesh Kalgaonkar < kalgaonkarsiddh...@gmail.com> wrote: > Hi David, > > Yes, I already mentioned that I am a newbie to Flink and Scala. I am > making progress as the day

Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
ocs.scala-lang.org/scala3/book/interacting-with-java.html#extending-java-interfaces-in-scala > > On Wed, Dec 29, 2021 at 5:59 PM Siddhesh Kalgaonkar < > kalgaonkarsiddh...@gmail.com> wrote: > >> I have modified my question based on Dominik's inputs. Can somebody help >> to ta

Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar < > kalgaonkarsiddh...@gmail.com> wrote: > >> Hi David, >> >> Thanks for the clarification. I will check the link you shared. Also, as >> mentioned by Dominik, can you help me with the process functions. How ca

Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
ty good hint. The JSON parsing > in this case is not any different as with any other java application (with > jackson / gson / ...). You can then simply split the parsed elements into > good and bad records. > > D. > > On Wed, Dec 29, 2021 at 10:53 AM Siddhesh Kalgaonkar < &

Re: TypeInformation | Flink

2021-12-29 Thread Siddhesh Kalgaonkar
Hi David, Thanks for the clarification. I will check the link you shared. Also, as mentioned by Dominik, can you help me with the process functions. How can I use it for my use case? Thanks, Siddhesh On Wed, Dec 29, 2021 at 3:22 PM Siddhesh Kalgaonkar < kalgaonkarsiddh...@gmail.com>

TypeInformation | Flink

2021-12-28 Thread Siddhesh Kalgaonkar
Hi Team, I am a newbie to Flink and Scala and trying my best to learn everything I can. I doing a practice where I am getting incoming JSON data from the Kafka topic and want to perform a data type check on it. For that, I came across TypeInformation of Flink. Please read my problem in detail