Hi Kali, What's the exception thrown or error message hinted when executing the erroneous step? Please print them here so that we can investigate the problem.
sri hari kali charan Tummala <kali.tumm...@gmail.com> 于2019年7月16日周二 上午4:49写道: > Hi , > > I am trying to write flink table to streaming Sink it fails at casting > Java to Scala or Scala to Java, it fails at below step can anyone help me > out ? about this error. > > > val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new > Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"), > new SimpleStringEncoder[Row]("UTF-8")).build() > > table.addSink(sink2) > > > package com.aws.examples.kinesis.consumer.TransactionExample > > import java.util.Properties > > import com.amazonaws.auth.DefaultAWSCredentialsProviderChain > import org.apache.flink.api.common.functions.MapFunction > import org.apache.flink.api.common.serialization.{SimpleStringEncoder, > SimpleStringSchema} > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer > import > org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, > ConsumerConfigConstants} > import org.apache.flink.table.api.{Table, TableEnvironment} > import com.google.gson.{Gson, JsonObject} > import org.apache.flink.api.java.tuple.{Tuple10, Tuple3} > import java.sql.{DriverManager, Time} > > import com.aws.SchemaJavaClasses.Row1 > import org.apache.flink.types.Row > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} > import org.apache.flink.table.api.scala._ > import org.apache.flink.table.sinks.CsvTableSink > import org.apache.flink.api.java.io.jdbc > import > org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder > import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat > import org.apache.flink.table.api.java._ > import org.apache.flink.api.common.typeinfo.TypeInformation > import org.apache.flink.api.java.DataSet > import org.apache.flink.table.sinks.TableSink > import com.aws.customSinks.CsvCustomSink > import org.apache.flink.core.fs.Path > > import scala.collection.JavaConversions._ > import org.apache.flink.table.sources.CsvTableSource > import org.apache.flink.table.api.Table > import org.apache.flink.table.api.TableEnvironment > import org.apache.flink.table.api.java.StreamTableEnvironment > import org.apache.flink.streaming.api.datastream.DataStream > import > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink > import com.aws.customSinks.CsvCustomSink > import org.apache.flink.streaming.api.functions.sink.SinkFunction > > object KinesisConsumer { > > def main(args: Array[String]): Unit = { > > // set up the streaming execution environment > val env = StreamExecutionEnvironment.createLocalEnvironment > //env.enableCheckpointing(10) > > val tEnv = TableEnvironment.getTableEnvironment(env) > > // Get AWS credentials > val credentialsProvider = new DefaultAWSCredentialsProviderChain > val credentials = credentialsProvider.getCredentials > > // Configure Flink Kinesis consumer > val consumerConfig = new Properties > consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") > consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, > credentials.getAWSAccessKeyId) > consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > credentials.getAWSSecretKey) > consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON") > > // Create Kinesis stream > val kinesis = env.addSource(new > FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), > consumerConfig)) > > val mapFunction: MapFunction[String, Tuple10[String, String, > String,String,String,String,String,String,String,String]] = > new MapFunction[String, Tuple10[String, String, > String,String,String,String,String,String,String,String]]() { > > override def map(s: String): Tuple10[String, String, > String,String,String,String,String,String,String,String] = { > val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) > > val csvData = data.getCc_num+","+ > data.getFirst+","+ > data.getLast+","+ > data.getTrans_num+","+ > data.getTrans_time+","+ > data.getCategory+","+ > data.getMerchant+","+ > data.getAmt+","+ > data.getMerch_lat+","+ > data.getMerch_long > > //println(csvData) > > val p:Array[String] = csvData.split(",") > var cc_num:String = p(0) > var first:String = p(1) > var last:String = p(2) > var trans_num:String = p(3) > var trans_time:String = p(4) > var category:String = p(5) > var merchant:String = p(6) > var amt:String = p(7) > var merch_lat:String = p(8) > var merch_long:String = p(9) > > val creationDate: Time = new Time(System.currentTimeMillis()) > return new Tuple10(cc_num, first, > last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long) > } > } > > val data = kinesis.map(mapFunction) > > //data.print() > > > tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") > > val query = "SELECT distinct > cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long > FROM transactions where cc_num not in ('cc_num')" > val table = tEnv.sqlQuery(query) > > //println(table.toString()) > > //val test = new CsvCustomSink("") > > val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new > Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"), > new SimpleStringEncoder[Row]("UTF-8")).build() > > table.addSink(sink2) > > env.execute() > > } > > } > > > > -- > Thanks & Regards > Sri Tummala > > > > -- > Thanks & Regards > Sri Tummala > >