yes even the delimiter can be replaced, have to test what happens if the
data itself has a comma in it I need to test.

table.toRetractStream(TypeInformation.of(classOf[Row]))
  .map(_._2.toString.replaceAll(",","~"))
  
.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",
FileSystem.WriteMode.OVERWRITE)


On Wed, Jul 17, 2019 at 6:47 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Amazing all issues resolved in one go thanks Cheng , one issue though I
> can't write map.(_._2) to CSV looks like it doesn't support right now have
> to be TextFile.
>
> below is a full code if someone wants in Scala.
>
> Git Code is here:-
> https://github.com/kali786516/FlinkStreamAndSql
>
> package com.aws.examples.kinesis.consumer.transactionExampleScala
>
> import java.util.Properties
> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> import 
> com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass
> import com.google.gson.Gson
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.types.Row
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
> import 
> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, 
> ConsumerConfigConstants}
> import org.apache.flink.api.scala._
> import org.apache.flink.table.api.scala._
> import java.sql.{DriverManager, Time}
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.core.fs.{FileSystem, Path}
>
> object TransactionScalaTest {
>
>   /*
>   extends RetractStreamTableSink[Row]
>   override def configure(strings: Array[String], typeInformations: 
> Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???
>
>   override def getFieldNames: Array[String] = ???
>
>   override def getFieldTypes: Array[TypeInformation[_]] = ???
>
>   override def emitDataStream(dataStream: 
> DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???
>
>   override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] 
> = super.getOutputType
>
>   override def getRecordType: TypeInformation[Row] = ???
>
>    */
>
>   def main(args: Array[String]): Unit = {
>
>
>
>     // set up the streaming execution environment
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     //env.enableCheckpointing(10000)
>
>     val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
>
>     // Get AWS credentials
>     val credentialsProvider = new DefaultAWSCredentialsProviderChain
>     val credentials = credentialsProvider.getCredentials
>
>     // Configure Flink Kinesis consumer
>     val consumerConfig = new Properties
>     consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>     consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
>     consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
>     consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
>     // Create Kinesis stream
>     val kinesis = env.addSource(new 
> FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(), 
> consumerConfig))
>
>     val mapFunction: MapFunction[String, (String, String, String, String, 
> String, String, String, String, String, String)] =
>       new MapFunction[String, (String, String, String, String, String, 
> String, String, String, String, String)]() {
>
>         override def map(s: String): (String, String, String, String, String, 
> String, String, String, String, String) = {
>
>           val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>           val csvData = data.getCc_num + "," +
>             data.getFirst + "," +
>             data.getLast + "," +
>             data.getTrans_num + "," +
>             data.getTrans_time + "," +
>             data.getCategory + "," +
>             data.getMerchant + "," +
>             data.getAmt + "," +
>             data.getMerch_lat + "," +
>             data.getMerch_long
>
>           //println(csvData)
>
>           val p: Array[String] = csvData.split(",")
>           var cc_num: String = p(0)
>           var first: String = p(1)
>           var last: String = p(2)
>           var trans_num: String = p(3)
>           var trans_time: String = p(4)
>           var category: String = p(5)
>           var merchant: String = p(6)
>           var amt: String = p(7)
>           var merch_lat: String = p(8)
>           var merch_long: String = p(9)
>
>           val creationDate:Time = new Time(System.currentTimeMillis())
>           return (cc_num, first, last, trans_num, trans_time, category, 
> merchant, amt, merch_lat, merch_long)
>         }
>       }
>
>
>     val data = kinesis.map(mapFunction)
>
>     tEnv.registerDataStream("transactions", data, 
> 'cc_num,'first_column,'last_column,'trans_num,
>       
> 'trans_time,'category_column,'merchant_column,'amt_column,'merch_lat,'merch_long)
>     //tEnv.registerDataStream("transactions", data, 
> "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>     val query = "SELECT distinct 
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>  FROM transactions where cc_num not in ('cc_num') "
>     val table = tEnv.sqlQuery(query)
>
>     table
>       .toRetractStream(TypeInformation.of(classOf[Row]))
>       .map(_._2)
>       
> .writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",FileSystem.WriteMode.OVERWRITE)
>
>     table.printSchema()
>
>     table.toRetractStream(TypeInformation.of(classOf[Row])).print()
>
>     env.execute()
>
>     /*
>
>     
> table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
>       FileSystem.WriteMode.OVERWRITE,
>       "\n","|")
>
>     val test = 
> table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2)
>
>     
> test.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut123",FileSystem.WriteMode.OVERWRITE)
>
>     
> test.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
>       FileSystem.WriteMode.OVERWRITE,
>       "\n","|")
>
>     import org.apache.flink.streaming.api.scala._
>     import org.apache.flink.api.common.typeinfo.TypeInformation
>     implicit val typeInfo = TypeInformation.of(classOf[Row])
>
>     val ds = table.toRetractStream(TypeInformation.of(classOf[Row]))
>
>     
> ds.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",FileSystem.WriteMode.OVERWRITE,
>     "\n","|")
>
>     tEnv.toRetractStream(table, 
> TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",
>       FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")
>
>     
> table.distinct().writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>       "\n","|")
>
>     import org.apache.flink.api.common.time.Time
>     import org.apache.flink.streaming.api.TimeCharacteristic
>     import org.apache.flink.api.common.typeinfo.TypeInformation
>
>     implicit val typeInfo = TypeInformation.of(classOf[Row])
>
>     
> table.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>       FileSystem.WriteMode.OVERWRITE, "\n", "|")
>
>     
> table.toRetractStream(TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>       FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")
>
>     ds.
>       
> writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>         FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")
>
>     
> tEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6))
>
>     tEnv.toRetractStream(table)
>       
> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>         FileSystem.WriteMode.OVERWRITE, "\n", "|")
>
>     tEnv.toRetractStream(table,classOf[T])
>
>     */
>
>   }
>
> }
>
>
>
>
>
>
>
> On Wed, Jul 17, 2019 at 10:11 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Question 1:-
>>
>> I did tired map function end up having issue (
>> https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i
>> )
>>
>> I am trying to convert a Tuple[Boolean,Row] to Row using map function, I
>> am getting this error asking me for InferedR , what is InferedR in FLink?
>>
>>   val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
>>     new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
>>       override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
>>         t.f1
>>       }
>>       /*override def map(t: tuple.Tuple2[Boolean, Row], collector: 
>> Collector[Object]): Unit = {
>>         collector.collect(t.f1)
>>       }
>>     */
>> }
>>
>> tEnv.toRetractStream(table, 
>> classOf[org.apache.flink.types.Row]).map(mymapFunction)
>>   
>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>>   FileSystem.WriteMode.OVERWRITE,"\n","|")
>>
>> and when I try to I get a different type of error.
>>
>>
>>
>>
>> *Error:(143, 74) type mismatch; found   :
>> org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
>>  required:
>> org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
>>   tEnv.toRetractStream(table,
>> classOf[org.apache.flink.types.Row]).map(mymapFunction)*
>>
>> *Question 2:- *
>> *I dont have any source data issue, to regenerate this issue for testing
>> its simple.*
>>
>> *create a kinesis stream *
>> *run the producer *
>>
>> https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala
>>
>> then run the consumer:-
>>
>> https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala
>>
>> Thanks
>> Sri
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng <chenghe...@gmail.com>
>> wrote:
>>
>>> Hi Sri,
>>>
>>> Question1:
>>> You can use a map to filter the "true", i.e, ds.map(_._2).
>>> Note, it's ok to remove the "true" flag for distinct as it does not
>>> generate updates. For other query contains updates, such as a non-window
>>> group by, we should not filter the flag or the result is not correct.
>>>
>>> Question 2:
>>> I can't reproduce this problem in my local environment. Maybe there is
>>> something wrong with the source data?
>>>
>>> Best, Hequn
>>>
>>> On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> windows for question 1 or question 2 or both ?
>>>>
>>>> Thanks
>>>> Sri
>>>>
>>>> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala <taher...@gmail.com>
>>>> wrote:
>>>>
>>>>> Looks like you need a window
>>>>>
>>>>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <
>>>>> kali.tumm...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I am trying to write toRetractSream to CSV which is kind of working
>>>>>> ok but I get extra values like True and then my output data values.
>>>>>>
>>>>>> Question1 :-
>>>>>> I dont want true in my output data how to achieve this?
>>>>>>
>>>>>> Scree
>>>>>>
>>>>>> Question 2:-
>>>>>> in the output file (CSV) I am missing data in the last line is the
>>>>>> toRetractStram closing before writing to file?
>>>>>>
>>>>>> Screen Shot attached
>>>>>>
>>>>>> Code:-
>>>>>>
>>>>>> val data = kinesis.map(mapFunction)
>>>>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>>>>> val query = "SELECT distinct 
>>>>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>>>>>  FROM transactions where cc_num not in ('cc_num')"
>>>>>> val table = tEnv.sqlQuery(query)
>>>>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
>>>>>>   
>>>>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>>>>>>     FileSystem.WriteMode.OVERWRITE,"\n","|")
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks & Regards
>>>>>> Sri Tummala
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Thanks & Regards
>>>> Sri Tummala
>>>>
>>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala

Reply via email to