How does your build.sbt looks especially dependencies?
> On 2. Aug 2018, at 00:44, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> 
> Changed as suggested
> 
>    val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>      val dataStream =  streamExecEnv
>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new 
> SimpleStringSchema(), properties))
>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, 
> 'timeissued, 'price)
> 
> Still the same error
> 
> [info] Compiling 1 Scala source to 
> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
> [error] 
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
>  overloaded method value registerDataStream with alternatives:
> [error]   [T](name: String, dataStream: 
> org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)Unit 
> <and>
> [error]   [T](name: String, dataStream: 
> org.apache.flink.streaming.api.datastream.DataStream[T])Unit
> [error]  cannot be applied to (String, 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, 
> Symbol, Symbol, Symbol, Symbol)
> [error]   tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, 
> 'timeissued, 'price)
> [error]            ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM
> 
> Thanks anyway.
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 1 Aug 2018 at 23:34, Fabian Hueske <fhue...@gmail.com> wrote:
>> Hi,
>> 
>> You have to pass the StreamExecutionEnvironment to the getTableEnvironment() 
>> method, not the DataStream (or DataStreamSource).
>> Change 
>> 
>> val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>> 
>> to
>> 
>> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>> 
>> Best,
>> Fabian
>> 
>> 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>>> Hi,
>>> 
>>> FYI, these are my imports
>>> 
>>> import java.util.Properties
>>> import java.util.Arrays
>>> import org.apache.flink.api.common.functions.MapFunction
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> import org.apache.flink.streaming.api.scala
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import org.apache.flink.table.api.TableEnvironment
>>> import org.apache.flink.table.api.scala._
>>> import org.apache.flink.api.scala._
>>> import org.apache.kafka.clients.consumer.ConsumerConfig
>>> import org.apache.kafka.clients.consumer.ConsumerRecord
>>> import org.apache.kafka.clients.consumer.KafkaConsumer
>>> import org.apache.flink.core.fs.FileSystem
>>> import org.apache.flink.streaming.api.TimeCharacteristic
>>> import org.slf4j.LoggerFactory
>>> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, 
>>> FlinkKafkaProducer011}
>>> import java.util.Calendar
>>> import java.util.Date
>>> import java.text.DateFormat
>>> import java.text.SimpleDateFormat
>>> import org.apache.log4j.Logger
>>> import org.apache.log4j.Level
>>> import sys.process.stringSeqToProcess
>>> import java.io.File
>>> 
>>> And this is the simple code
>>> 
>>>     val properties = new Properties()
>>>     properties.setProperty("bootstrap.servers", bootstrapServers)
>>>     properties.setProperty("zookeeper.connect", zookeeperConnect)
>>>     properties.setProperty("group.id", flinkAppName)
>>>     properties.setProperty("auto.offset.reset", "latest")
>>>     val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>>     val dataStream =  streamExecEnv
>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new 
>>> SimpleStringSchema(), properties))
>>>   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>>   tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker, 
>>> 'timeissued, 'price)
>>> 
>>> And this is the compilation error
>>> 
>>> info] Compiling 1 Scala source to 
>>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>>> [error] 
>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138:
>>>  overloaded method value getTableEnvironment with alternatives:
>>> [error]   (executionEnvironment: 
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment
>>>  <and>
>>> [error]   (executionEnvironment: 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment
>>>  <and>
>>> [error]   (executionEnvironment: 
>>> org.apache.flink.api.scala.ExecutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment
>>>  <and>
>>> [error]   (executionEnvironment: 
>>> org.apache.flink.api.java.ExecutionEnvironment)org.apache.flink.table.api.java.BatchTableEnvironment
>>> [error]  cannot be applied to 
>>> (org.apache.flink.streaming.api.datastream.DataStreamSource[String])
>>> [error]   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>> [error]                                   ^
>>> [error] one error found
>>> [error] (compile:compileIncremental) Compilation failed
>>> [error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM
>>> Completed compiling
>>> 
>>> which is really strange
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> http://talebzadehmich.wordpress.com
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>>> On Wed, 1 Aug 2018 at 13:42, Fabian Hueske <fhue...@gmail.com> wrote:
>>>> Hi I think you are mixing Java and Scala dependencies.
>>>> 
>>>> org.apache.flink.streaming.api.datastream.DataStream is the DataStream of 
>>>> the Java DataStream API.
>>>> You should use the DataStream of the Scala DataStream API.
>>>> 
>>>> Best, Fabian
>>>> 
>>>> 2018-08-01 14:01 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>>>>> Hi,
>>>>> 
>>>>> I believed I tried Hequn's suggestion and tried again
>>>>> 
>>>>> import org.apache.flink.table.api.Table
>>>>> import org.apache.flink.table.api.TableEnvironment
>>>>> import org.apache.flink.table.api.scala._
>>>>> 
>>>>> Unfortunately I am still getting the same error!
>>>>> 
>>>>> [info] Compiling 1 Scala source to 
>>>>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>>>>> [error] 
>>>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151:
>>>>>  overloaded method value fromDataStream with alternatives:
>>>>> [error]   [T](dataStream: 
>>>>> org.apache.flink.streaming.api.datastream.DataStream[T], fields: 
>>>>> String)org.apache.flink.table.api.Table <and>
>>>>> [error]   [T](dataStream: 
>>>>> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
>>>>> [error]  cannot be applied to 
>>>>> (org.apache.flink.streaming.api.datastream.DataStreamSource[String], 
>>>>> Symbol, Symbol, Symbol, Symbol)
>>>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 
>>>>> 'ticker, 'timeissued, 'price)
>>>>> [error]                                ^
>>>>> [error] one error found
>>>>> [error] (compile:compileIncremental) Compilation failed
>>>>> [error] Total time: 3 s, completed Aug 1, 2018 12:59:44 PM
>>>>> Completed compiling
>>>>> 
>>>>> 
>>>>> Dr Mich Talebzadeh
>>>>>  
>>>>> LinkedIn  
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>  
>>>>> http://talebzadehmich.wordpress.com
>>>>> 
>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>>> loss, damage or destruction of data or any other property which may arise 
>>>>> from relying on this email's technical content is explicitly disclaimed. 
>>>>> The author will in no case be liable for any monetary damages arising 
>>>>> from such loss, damage or destruction.
>>>>>  
>>>>> 
>>>>> 
>>>>>> On Wed, 1 Aug 2018 at 10:03, Timo Walther <twal...@apache.org> wrote:
>>>>>> If these two imports are the only imports that you added, then you did 
>>>>>> not follow Hequn's advice or the link that I sent you.
>>>>>> 
>>>>>> You need to add the underscore imports to let Scala do its magic.
>>>>>> 
>>>>>> Timo
>>>>>> 
>>>>>> 
>>>>>>> Am 01.08.18 um 10:28 schrieb Mich Talebzadeh:
>>>>>>> Hi Timo,
>>>>>>> 
>>>>>>> These are my two flink table related imports
>>>>>>> 
>>>>>>> import org.apache.flink.table.api.Table
>>>>>>> import org.apache.flink.table.api.TableEnvironment
>>>>>>> 
>>>>>>> And these are my dependencies building with SBT
>>>>>>> 
>>>>>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
>>>>>>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
>>>>>>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
>>>>>>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
>>>>>>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
>>>>>>> libraryDependencies += "org.apache.flink" %% 
>>>>>>> "flink-connector-kafka-0.11" % "1.5.0"
>>>>>>> libraryDependencies += "org.apache.flink" %% 
>>>>>>> "flink-connector-kafka-base" % "1.5.0"
>>>>>>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
>>>>>>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
>>>>>>> libraryDependencies += "org.apache.flink" %% "flink-streaming-java" % 
>>>>>>> "1.5.0" % "provided"
>>>>>>> libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % 
>>>>>>> "provided"
>>>>>>> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"
>>>>>>> 
>>>>>>> There appears to be conflict somewhere that cause this error
>>>>>>> 
>>>>>>> [info] Compiling 1 Scala source to 
>>>>>>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>>>>>>> [error] 
>>>>>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
>>>>>>>  overloaded method value fromDataStream with alternatives:
>>>>>>> [error]   [T](dataStream: 
>>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T], fields: 
>>>>>>> String)org.apache.flink.table.api.Table <and>
>>>>>>> [error]   [T](dataStream: 
>>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
>>>>>>> [error]  cannot be applied to             
>>>>>>> (org.apache.flink.streaming.api.datastream.DataStreamSource[String], 
>>>>>>> Symbol, Symbol, Symbol, Symbol)
>>>>>>> [error]   val table1: Table =             
>>>>>>> tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price)
>>>>>>> [error]                                ^
>>>>>>> [error] one error found
>>>>>>> [error] (compile:compileIncremental) Compilation failed
>>>>>>> 
>>>>>>> Thanks
>>>>>>> 
>>>>>>> 
>>>>>>> Dr Mich Talebzadeh
>>>>>>>  
>>>>>>> LinkedIn  
>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>  
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>> 
>>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>>>>> loss, damage or destruction of data or any other property which may 
>>>>>>> arise from relying on this email's technical content is explicitly 
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages arising from such loss, damage or destruction.
>>>>>>>  
>>>>>>> 
>>>>>>> 
>>>>>>>> On Wed, 1 Aug 2018 at 09:17, Timo Walther <twal...@apache.org> wrote:
>>>>>>>> Hi Mich,
>>>>>>>> 
>>>>>>>> I would check you imports again [1]. This is a pure compiler issue 
>>>>>>>> that is unrelated to your actual data stream. Also check your project 
>>>>>>>> dependencies.
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>> 
>>>>>>>> [1] 
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala
>>>>>>>> 
>>>>>>>>> Am 01.08.18 um 09:30 schrieb Mich Talebzadeh:
>>>>>>>>> 
>>>>>>>>> Hi both,
>>>>>>>>> 
>>>>>>>>> I added the import as Hequn suggested.
>>>>>>>>> 
>>>>>>>>> My stream is very simple and consists of 4 values separated by "," as 
>>>>>>>>> below 
>>>>>>>>> 
>>>>>>>>> 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48
>>>>>>>>> 
>>>>>>>>> So this is what I have been trying to do
>>>>>>>>> 
>>>>>>>>> Code
>>>>>>>>> 
>>>>>>>>>     val dataStream =  streamExecEnv
>>>>>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new 
>>>>>>>>> SimpleStringSchema(), properties))
>>>>>>>>>  //
>>>>>>>>>  //
>>>>>>>>>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>>>>>>>>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 
>>>>>>>>> 'ticker, 'timeissued, 'price)
>>>>>>>>> 
>>>>>>>>> note those four columns in Table1 definition
>>>>>>>>> 
>>>>>>>>> And this is the error being thrown
>>>>>>>>> 
>>>>>>>>> [info] Compiling 1 Scala source to 
>>>>>>>>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>>>>>>>>> [error] 
>>>>>>>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
>>>>>>>>>  overloaded method value fromDataStream with alternatives:
>>>>>>>>> [error]   [T](dataStream: 
>>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T], fields: 
>>>>>>>>> String)org.apache.flink.table.api.Table <and>
>>>>>>>>> [error]   [T](dataStream: 
>>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
>>>>>>>>> [error]  cannot be applied to 
>>>>>>>>> (org.apache.flink.streaming.api.datastream.DataStreamSource[String], 
>>>>>>>>> Symbol, Symbol, Symbol, Symbol)
>>>>>>>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 
>>>>>>>>> 'key, 'ticker, 'timeissued, 'price)
>>>>>>>>> [error]                                ^
>>>>>>>>> [error] one error found
>>>>>>>>> [error] (compile:compileIncremental) Compilation failed
>>>>>>>>> 
>>>>>>>>> I suspect dataStream may not be compatible with this operation?
>>>>>>>>> 
>>>>>>>>> Regards,
>>>>>>>>> 
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>  
>>>>>>>>> LinkedIn  
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>  
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>> 
>>>>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for 
>>>>>>>>> any loss, damage or destruction of data or any other property which 
>>>>>>>>> may arise from relying on this email's technical content is 
>>>>>>>>> explicitly disclaimed. The author will in no case be                  
>>>>>>>>>                          liable for any monetary damages arising from 
>>>>>>>>> such loss, damage or destruction.
>>>>>>>>>  
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Wed, 1 Aug 2018 at 04:51, Hequn Cheng <chenghe...@gmail.com> 
>>>>>>>>>> wrote:
>>>>>>>>>> Hi, Mich
>>>>>>>>>> 
>>>>>>>>>> You can try adding "import org.apache.flink.table.api.scala._", so 
>>>>>>>>>> that the Symbol can be recognized as an Expression.
>>>>>>>>>> 
>>>>>>>>>> Best, Hequn 
>>>>>>>>>> 
>>>>>>>>>> On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh 
>>>>>>>>>> <mich.talebza...@gmail.com> wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>> 
>>>>>>>>>>> I am following this example 
>>>>>>>>>>> 
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api
>>>>>>>>>>> 
>>>>>>>>>>> This is my dataStream which is built on a Kafka topic
>>>>>>>>>>> 
>>>>>>>>>>>    //
>>>>>>>>>>>     //Create a Kafka consumer
>>>>>>>>>>>     //
>>>>>>>>>>>     val dataStream =  streamExecEnv
>>>>>>>>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, 
>>>>>>>>>>> new SimpleStringSchema(), properties))
>>>>>>>>>>>  //
>>>>>>>>>>>  //
>>>>>>>>>>>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>>>>>>>>>>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 
>>>>>>>>>>> 'ticker, 'timeissued, 'price)
>>>>>>>>>>> 
>>>>>>>>>>> While compiling it throws this error
>>>>>>>>>>> 
>>>>>>>>>>> [error] 
>>>>>>>>>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169:
>>>>>>>>>>>  overloaded method value fromDataStream with alternatives:
>>>>>>>>>>> [error]   [T](dataStream: 
>>>>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T], fields: 
>>>>>>>>>>> String)org.apache.flink.table.api.Table <and>
>>>>>>>>>>> [error]   [T](dataStream: 
>>>>>>>>>>> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
>>>>>>>>>>> [error]  cannot be applied to 
>>>>>>>>>>> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
>>>>>>>>>>>  Symbol, Symbol, Symbol, Symbol)
>>>>>>>>>>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 
>>>>>>>>>>> 'key, 'ticker, 'timeissued, 'price)
>>>>>>>>>>> [error]                                ^
>>>>>>>>>>> [error] one error found
>>>>>>>>>>> [error] (compile:compileIncremental) Compilation failed
>>>>>>>>>>> 
>>>>>>>>>>> The topic is very simple, it is comma                             
>>>>>>>>>>> separated prices. I tried mapFunction and flatMap but neither 
>>>>>>>>>>> worked! 
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>  
>>>>>>>>>>> LinkedIn  
>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>  
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>> 
>>>>>>>>>>> Disclaimer: Use it at your own risk. Any and all responsibility for 
>>>>>>>>>>> any loss, damage or destruction of data or any other property which 
>>>>>>>>>>> may arise from relying on this email's technical content is 
>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any 
>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>  
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> 

Reply via email to