Hi Kali,

What's the exception thrown or error message hinted when executing the
erroneous step? Please print them here so that we can investigate the
problem.

sri hari kali charan Tummala <kali.tumm...@gmail.com> 于2019年7月16日周二
上午4:49写道:

> Hi ,
>
> I am trying to write flink table to streaming Sink it fails at casting
> Java to Scala or Scala to Java, it fails at below step can anyone help me
> out ? about this error.
>
>
>     val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new 
> Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
>       new SimpleStringEncoder[Row]("UTF-8")).build()
>
>     table.addSink(sink2)
>
>
> package com.aws.examples.kinesis.consumer.TransactionExample
>
> import java.util.Properties
>
> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, 
> SimpleStringSchema}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
> import 
> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, 
> ConsumerConfigConstants}
> import org.apache.flink.table.api.{Table, TableEnvironment}
> import com.google.gson.{Gson, JsonObject}
> import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
> import java.sql.{DriverManager, Time}
>
> import com.aws.SchemaJavaClasses.Row1
> import org.apache.flink.types.Row
> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
> import org.apache.flink.table.api.scala._
> import org.apache.flink.table.sinks.CsvTableSink
> import org.apache.flink.api.java.io.jdbc
> import 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
> import org.apache.flink.table.api.java._
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.table.sinks.TableSink
> import com.aws.customSinks.CsvCustomSink
> import org.apache.flink.core.fs.Path
>
> import scala.collection.JavaConversions._
> import org.apache.flink.table.sources.CsvTableSource
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.java.StreamTableEnvironment
> import org.apache.flink.streaming.api.datastream.DataStream
> import 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
> import com.aws.customSinks.CsvCustomSink
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
>
> object KinesisConsumer {
>
>   def main(args: Array[String]): Unit = {
>
>     // set up the streaming execution environment
>     val env = StreamExecutionEnvironment.createLocalEnvironment
>     //env.enableCheckpointing(10)
>
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>
>     // Get AWS credentials
>     val credentialsProvider = new DefaultAWSCredentialsProviderChain
>     val credentials = credentialsProvider.getCredentials
>
>     // Configure Flink Kinesis consumer
>     val consumerConfig = new Properties
>     consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>     consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
>     consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
>     consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
>     // Create Kinesis stream
>     val kinesis = env.addSource(new 
> FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), 
> consumerConfig))
>
>     val mapFunction: MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]] =
>       new MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]]() {
>
>         override def map(s: String): Tuple10[String, String, 
> String,String,String,String,String,String,String,String] = {
>           val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>           val csvData = data.getCc_num+","+
>             data.getFirst+","+
>             data.getLast+","+
>             data.getTrans_num+","+
>             data.getTrans_time+","+
>             data.getCategory+","+
>             data.getMerchant+","+
>             data.getAmt+","+
>             data.getMerch_lat+","+
>             data.getMerch_long
>
>           //println(csvData)
>
>           val p:Array[String] = csvData.split(",")
>           var cc_num:String = p(0)
>           var first:String = p(1)
>           var last:String = p(2)
>           var trans_num:String = p(3)
>           var trans_time:String = p(4)
>           var category:String = p(5)
>           var merchant:String = p(6)
>           var amt:String = p(7)
>           var merch_lat:String = p(8)
>           var merch_long:String = p(9)
>
>           val creationDate: Time = new Time(System.currentTimeMillis())
>           return new Tuple10(cc_num, first, 
> last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
>         }
>       }
>
>     val data = kinesis.map(mapFunction)
>
>     //data.print()
>
>     
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>
>     val query = "SELECT distinct 
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>  FROM transactions where cc_num not in ('cc_num')"
>     val table = tEnv.sqlQuery(query)
>
>     //println(table.toString())
>
>     //val test = new CsvCustomSink("")
>
>     val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new 
> Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
>       new SimpleStringEncoder[Row]("UTF-8")).build()
>
>     table.addSink(sink2)
>
>     env.execute()
>
>   }
>
> }
>
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

Reply via email to