Hi Lee,

it writes only after the job is killed and also I dont see all the records
? is there a workaround?

tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
    FileSystem.WriteMode.NO_OVERWRITE,"~","|")


On Mon, Jul 15, 2019 at 10:03 PM JingsongLee <lzljs3620...@aliyun.com>
wrote:

> Hi caizhi and kali:
>
> I think this table should use toRetractStream instead of toAppendStream,
> and you should handle the retract messages. (If you just use distinct, the
> message should always be accumulate message)
>
> Best, JingsongLee
>
> ------------------------------------------------------------------
> From:Caizhi Weng <tsreape...@gmail.com>
> Send Time:2019年7月16日(星期二) 09:52
> To:sri hari kali charan Tummala <kali.tumm...@gmail.com>
> Cc:user <user@flink.apache.org>
> Subject:Re: Stream to CSV Sink with SQL Distinct Values
>
> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala <kali.tumm...@gmail.com> 于2019年7月16日周二
> 上午3:32写道:
> Hi All,
>
> I am trying to read data from kinesis stream and applying SQL
> transformation (distinct) and then tryting to write to CSV sink which is
> failinf due to this issue (org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.) , full
> code is here (
> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
> ).
>
> can anyone help me moveforward on this issue?
>
> Full Code:-
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", 
> new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]]() {
>
>     override def map(s: String): Tuple10[String, String, 
> String,String,String,String,String,String,String,String] = {
>       val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>       val csvData = data.getCc_num+","+
>         data.getFirst+","+
>         data.getLast+","+
>         data.getTrans_num+","+
>         data.getTrans_time+","+
>         data.getCategory+","+
>         data.getMerchant+","+
>         data.getAmt+","+
>         data.getMerch_lat+","+
>         data.getMerch_long
>
>       //println(csvData)
>
>       val p:Array[String] = csvData.split(",")
>       var cc_num:String = p(0)
>       var first:String = p(1)
>       var last:String = p(2)
>       var trans_num:String = p(3)
>       var trans_time:String = p(4)
>       var category:String = p(5)
>       var merchant:String = p(6)
>       var amt:String = p(7)
>       var merch_lat:String = p(8)
>       var merch_long:String = p(9)
>
>       val creationDate: Time = new Time(System.currentTimeMillis())
>       return new Tuple10(cc_num, first, 
> last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
>     }
>   }
>
> val data = kinesis.map(mapFunction)
>
> //data.print()
>
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>
> val query = "SELECT distinct 
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>  FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
>
> val table1 = table.distinct()
>
> tEnv.registerTable("fromAnotherTable",table1)
>
> table.printSchema()
>
> val csvSink:TableSink[Row]  = new 
> CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
> val fieldNames:Array[String]              = 
> Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
> val fieldTypes:Array[TypeInformation[_]]  = Array(
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING,
>   org.apache.flink.api.common.typeinfo.Types.STRING
> )
>
> tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, 
> csvSink)
>
> tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT 
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>  from fromAnotherTable")
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala

Reply via email to