this is the error.

org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to
org.apache.flink.table.api.scala.StreamTableEnvironment


On Mon, Jul 15, 2019 at 9:54 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi Kali,
>
> What's the exception thrown or error message hinted when executing the
> erroneous step? Please print them here so that we can investigate the
> problem.
>
> sri hari kali charan Tummala <kali.tumm...@gmail.com> 于2019年7月16日周二
> 上午4:49写道:
>
>> Hi ,
>>
>> I am trying to write flink table to streaming Sink it fails at casting
>> Java to Scala or Scala to Java, it fails at below step can anyone help me
>> out ? about this error.
>>
>>
>>     val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new 
>> Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
>>       new SimpleStringEncoder[Row]("UTF-8")).build()
>>
>>     table.addSink(sink2)
>>
>>
>> package com.aws.examples.kinesis.consumer.TransactionExample
>>
>> import java.util.Properties
>>
>> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>> import org.apache.flink.api.common.functions.MapFunction
>> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, 
>> SimpleStringSchema}
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
>> import 
>> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, 
>> ConsumerConfigConstants}
>> import org.apache.flink.table.api.{Table, TableEnvironment}
>> import com.google.gson.{Gson, JsonObject}
>> import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
>> import java.sql.{DriverManager, Time}
>>
>> import com.aws.SchemaJavaClasses.Row1
>> import org.apache.flink.types.Row
>> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
>> import org.apache.flink.table.api.scala._
>> import org.apache.flink.table.sinks.CsvTableSink
>> import org.apache.flink.api.java.io.jdbc
>> import 
>> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
>> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
>> import org.apache.flink.table.api.java._
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.api.java.DataSet
>> import org.apache.flink.table.sinks.TableSink
>> import com.aws.customSinks.CsvCustomSink
>> import org.apache.flink.core.fs.Path
>>
>> import scala.collection.JavaConversions._
>> import org.apache.flink.table.sources.CsvTableSource
>> import org.apache.flink.table.api.Table
>> import org.apache.flink.table.api.TableEnvironment
>> import org.apache.flink.table.api.java.StreamTableEnvironment
>> import org.apache.flink.streaming.api.datastream.DataStream
>> import 
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
>> import com.aws.customSinks.CsvCustomSink
>> import org.apache.flink.streaming.api.functions.sink.SinkFunction
>>
>> object KinesisConsumer {
>>
>>   def main(args: Array[String]): Unit = {
>>
>>     // set up the streaming execution environment
>>     val env = StreamExecutionEnvironment.createLocalEnvironment
>>     //env.enableCheckpointing(10)
>>
>>     val tEnv = TableEnvironment.getTableEnvironment(env)
>>
>>     // Get AWS credentials
>>     val credentialsProvider = new DefaultAWSCredentialsProviderChain
>>     val credentials = credentialsProvider.getCredentials
>>
>>     // Configure Flink Kinesis consumer
>>     val consumerConfig = new Properties
>>     consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>>     consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
>> credentials.getAWSAccessKeyId)
>>     consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
>> credentials.getAWSSecretKey)
>>     consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
>> "TRIM_HORIZON")
>>
>>     // Create Kinesis stream
>>     val kinesis = env.addSource(new 
>> FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), 
>> consumerConfig))
>>
>>     val mapFunction: MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]] =
>>       new MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]]() {
>>
>>         override def map(s: String): Tuple10[String, String, 
>> String,String,String,String,String,String,String,String] = {
>>           val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>>
>>           val csvData = data.getCc_num+","+
>>             data.getFirst+","+
>>             data.getLast+","+
>>             data.getTrans_num+","+
>>             data.getTrans_time+","+
>>             data.getCategory+","+
>>             data.getMerchant+","+
>>             data.getAmt+","+
>>             data.getMerch_lat+","+
>>             data.getMerch_long
>>
>>           //println(csvData)
>>
>>           val p:Array[String] = csvData.split(",")
>>           var cc_num:String = p(0)
>>           var first:String = p(1)
>>           var last:String = p(2)
>>           var trans_num:String = p(3)
>>           var trans_time:String = p(4)
>>           var category:String = p(5)
>>           var merchant:String = p(6)
>>           var amt:String = p(7)
>>           var merch_lat:String = p(8)
>>           var merch_long:String = p(9)
>>
>>           val creationDate: Time = new Time(System.currentTimeMillis())
>>           return new Tuple10(cc_num, first, 
>> last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
>>         }
>>       }
>>
>>     val data = kinesis.map(mapFunction)
>>
>>     //data.print()
>>
>>     
>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>
>>     val query = "SELECT distinct 
>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>  FROM transactions where cc_num not in ('cc_num')"
>>     val table = tEnv.sqlQuery(query)
>>
>>     //println(table.toString())
>>
>>     //val test = new CsvCustomSink("")
>>
>>     val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new 
>> Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
>>       new SimpleStringEncoder[Row]("UTF-8")).build()
>>
>>     table.addSink(sink2)
>>
>>     env.execute()
>>
>>   }
>>
>> }
>>
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala

Reply via email to