yes even the delimiter can be replaced, have to test what happens if the data itself has a comma in it I need to test.
table.toRetractStream(TypeInformation.of(classOf[Row])) .map(_._2.toString.replaceAll(",","~")) .writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125", FileSystem.WriteMode.OVERWRITE) On Wed, Jul 17, 2019 at 6:47 PM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Amazing all issues resolved in one go thanks Cheng , one issue though I > can't write map.(_._2) to CSV looks like it doesn't support right now have > to be TextFile. > > below is a full code if someone wants in Scala. > > Git Code is here:- > https://github.com/kali786516/FlinkStreamAndSql > > package com.aws.examples.kinesis.consumer.transactionExampleScala > > import java.util.Properties > import com.amazonaws.auth.DefaultAWSCredentialsProviderChain > import > com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass > import com.google.gson.Gson > import org.apache.flink.api.common.functions.MapFunction > import org.apache.flink.api.common.serialization.SimpleStringSchema > import org.apache.flink.types.Row > import org.apache.flink.streaming.api.scala.{DataStream, > StreamExecutionEnvironment} > import org.apache.flink.table.api.scala.StreamTableEnvironment > import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer > import > org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, > ConsumerConfigConstants} > import org.apache.flink.api.scala._ > import org.apache.flink.table.api.scala._ > import java.sql.{DriverManager, Time} > import org.apache.flink.api.common.typeinfo.TypeInformation > import org.apache.flink.core.fs.{FileSystem, Path} > > object TransactionScalaTest { > > /* > extends RetractStreamTableSink[Row] > override def configure(strings: Array[String], typeInformations: > Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ??? > > override def getFieldNames: Array[String] = ??? > > override def getFieldTypes: Array[TypeInformation[_]] = ??? > > override def emitDataStream(dataStream: > DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ??? > > override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] > = super.getOutputType > > override def getRecordType: TypeInformation[Row] = ??? > > */ > > def main(args: Array[String]): Unit = { > > > > // set up the streaming execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > //env.enableCheckpointing(10000) > > val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) > > // Get AWS credentials > val credentialsProvider = new DefaultAWSCredentialsProviderChain > val credentials = credentialsProvider.getCredentials > > // Configure Flink Kinesis consumer > val consumerConfig = new Properties > consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") > consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, > credentials.getAWSAccessKeyId) > consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > credentials.getAWSSecretKey) > consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON") > > // Create Kinesis stream > val kinesis = env.addSource(new > FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(), > consumerConfig)) > > val mapFunction: MapFunction[String, (String, String, String, String, > String, String, String, String, String, String)] = > new MapFunction[String, (String, String, String, String, String, > String, String, String, String, String)]() { > > override def map(s: String): (String, String, String, String, String, > String, String, String, String, String) = { > > val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) > > val csvData = data.getCc_num + "," + > data.getFirst + "," + > data.getLast + "," + > data.getTrans_num + "," + > data.getTrans_time + "," + > data.getCategory + "," + > data.getMerchant + "," + > data.getAmt + "," + > data.getMerch_lat + "," + > data.getMerch_long > > //println(csvData) > > val p: Array[String] = csvData.split(",") > var cc_num: String = p(0) > var first: String = p(1) > var last: String = p(2) > var trans_num: String = p(3) > var trans_time: String = p(4) > var category: String = p(5) > var merchant: String = p(6) > var amt: String = p(7) > var merch_lat: String = p(8) > var merch_long: String = p(9) > > val creationDate:Time = new Time(System.currentTimeMillis()) > return (cc_num, first, last, trans_num, trans_time, category, > merchant, amt, merch_lat, merch_long) > } > } > > > val data = kinesis.map(mapFunction) > > tEnv.registerDataStream("transactions", data, > 'cc_num,'first_column,'last_column,'trans_num, > > 'trans_time,'category_column,'merchant_column,'amt_column,'merch_lat,'merch_long) > //tEnv.registerDataStream("transactions", data, > "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") > val query = "SELECT distinct > cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long > FROM transactions where cc_num not in ('cc_num') " > val table = tEnv.sqlQuery(query) > > table > .toRetractStream(TypeInformation.of(classOf[Row])) > .map(_._2) > > .writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",FileSystem.WriteMode.OVERWRITE) > > table.printSchema() > > table.toRetractStream(TypeInformation.of(classOf[Row])).print() > > env.execute() > > /* > > > table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122", > FileSystem.WriteMode.OVERWRITE, > "\n","|") > > val test = > table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2) > > > test.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut123",FileSystem.WriteMode.OVERWRITE) > > > test.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122", > FileSystem.WriteMode.OVERWRITE, > "\n","|") > > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.api.common.typeinfo.TypeInformation > implicit val typeInfo = TypeInformation.of(classOf[Row]) > > val ds = table.toRetractStream(TypeInformation.of(classOf[Row])) > > > ds.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",FileSystem.WriteMode.OVERWRITE, > "\n","|") > > tEnv.toRetractStream(table, > TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15", > FileSystem.WriteMode.NO_OVERWRITE, "\n", "|") > > > table.distinct().writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", > "\n","|") > > import org.apache.flink.api.common.time.Time > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.api.common.typeinfo.TypeInformation > > implicit val typeInfo = TypeInformation.of(classOf[Row]) > > > table.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", > FileSystem.WriteMode.OVERWRITE, "\n", "|") > > > table.toRetractStream(TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", > FileSystem.WriteMode.NO_OVERWRITE, "\n", "|") > > ds. > > writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", > FileSystem.WriteMode.NO_OVERWRITE, "\n", "|") > > > tEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6)) > > tEnv.toRetractStream(table) > > .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", > FileSystem.WriteMode.OVERWRITE, "\n", "|") > > tEnv.toRetractStream(table,classOf[T]) > > */ > > } > > } > > > > > > > > On Wed, Jul 17, 2019 at 10:11 AM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Question 1:- >> >> I did tired map function end up having issue ( >> https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i >> ) >> >> I am trying to convert a Tuple[Boolean,Row] to Row using map function, I >> am getting this error asking me for InferedR , what is InferedR in FLink? >> >> val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] = >> new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() { >> override def map(t: tuple.Tuple2[Boolean, Row]): Row = { >> t.f1 >> } >> /*override def map(t: tuple.Tuple2[Boolean, Row], collector: >> Collector[Object]): Unit = { >> collector.collect(t.f1) >> } >> */ >> } >> >> tEnv.toRetractStream(table, >> classOf[org.apache.flink.types.Row]).map(mymapFunction) >> >> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", >> FileSystem.WriteMode.OVERWRITE,"\n","|") >> >> and when I try to I get a different type of error. >> >> >> >> >> *Error:(143, 74) type mismatch; found : >> org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal] >> required: >> org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?] >> tEnv.toRetractStream(table, >> classOf[org.apache.flink.types.Row]).map(mymapFunction)* >> >> *Question 2:- * >> *I dont have any source data issue, to regenerate this issue for testing >> its simple.* >> >> *create a kinesis stream * >> *run the producer * >> >> https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala >> >> then run the consumer:- >> >> https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala >> >> Thanks >> Sri >> >> >> >> >> >> >> >> On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng <chenghe...@gmail.com> >> wrote: >> >>> Hi Sri, >>> >>> Question1: >>> You can use a map to filter the "true", i.e, ds.map(_._2). >>> Note, it's ok to remove the "true" flag for distinct as it does not >>> generate updates. For other query contains updates, such as a non-window >>> group by, we should not filter the flag or the result is not correct. >>> >>> Question 2: >>> I can't reproduce this problem in my local environment. Maybe there is >>> something wrong with the source data? >>> >>> Best, Hequn >>> >>> On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> windows for question 1 or question 2 or both ? >>>> >>>> Thanks >>>> Sri >>>> >>>> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <taher...@gmail.com> >>>> wrote: >>>> >>>>> Looks like you need a window >>>>> >>>>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala < >>>>> kali.tumm...@gmail.com> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I am trying to write toRetractSream to CSV which is kind of working >>>>>> ok but I get extra values like True and then my output data values. >>>>>> >>>>>> Question1 :- >>>>>> I dont want true in my output data how to achieve this? >>>>>> >>>>>> Scree >>>>>> >>>>>> Question 2:- >>>>>> in the output file (CSV) I am missing data in the last line is the >>>>>> toRetractStram closing before writing to file? >>>>>> >>>>>> Screen Shot attached >>>>>> >>>>>> Code:- >>>>>> >>>>>> val data = kinesis.map(mapFunction) >>>>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") >>>>>> val query = "SELECT distinct >>>>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long >>>>>> FROM transactions where cc_num not in ('cc_num')" >>>>>> val table = tEnv.sqlQuery(query) >>>>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) >>>>>> >>>>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", >>>>>> FileSystem.WriteMode.OVERWRITE,"\n","|") >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks & Regards >>>>>> Sri Tummala >>>>>> >>>>>> >>>> >>>> -- >>>> Thanks & Regards >>>> Sri Tummala >>>> >>>> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala