I am using below code to get the data from the side output which has
filtered records.
So, it goes like this:

val filterRecords: DataStream[String] = src.process(new
ProcessFunction()).getSideOutput(filteredOutputTag)

It has filtered records in it.

Now, I want to add these records to the db asynchronously. Therefore, I
wrote below code using documentation reference:

val asyncFunction:AsyncFunction[String,String]=new DBAsyncSink() //SO
reference
AsyncDataStream.unorderedWait(goodRecords,new DBAsyncSink(), 1000,
TimeUnit.SECONDS, 100) //Documentation Reference

and the class for the "DBAsyncSink" is as follows:

class DBAsyncSink extends RichAsyncFunction[String,String] {

  override def open(parameters: Configuration): Unit = {

  }

  override def asyncInvoke(input:String, resultFuture:
ResultFuture[String]): Unit = {

  }

  override def close(): Unit = {
    session.close()
  }

}

I am getting below error:

type mismatch;
 found   : org.apache.flink.streaming.api.scala.DataStream[String]
 required: org.apache.flink.streaming.api.datastream.DataStream[?]

What am I missing over here? I tried a couple of examples but it didn't
work.

Thanks,
Sid

Reply via email to