Re: Getting compilation error in Array[TypeInformation]

2018-08-09 Thread Mich Talebzadeh
Thanks those suggestions helped

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 9 Aug 2018 at 16:41, Timo Walther  wrote:

> Hi Mich,
>
> I strongly recommend to read a good Scala programming tutorial before
> writing on a mailing list.
>
> As the error indicates you are missing generic parameters. If you don't
> know the parameter use `Array[TypeInformation[_]]` or `TableSink[_]`. For
> the types class you need to import the types class
> "org.apache.flink.table.api.Types".
>
> Regards,
> Timo
>
>
> Am 09.08.18 um 17:18 schrieb Mich Talebzadeh:
>
> This is the code in Scala
>
> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
> tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
> 'timeissued, 'price)
> val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
> 'price > 99.0).select('key, 'ticker, 'timeissued, 'price)
>
> val fieldNames: Array[String] = Array("key", "ticker", "timeissued",
> "price")
> val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> val sink: TableSink = new CsvTableSink(writeDirectory+fileName,
> fieldDelim = ",")
> tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes,
> sink)
> result.insertInto("CsvSinkTable")
>
> When compiling I get the following error
>
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
> class TypeInformation takes type parameters
> [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> [error]   ^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
> not found: value Types
> [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> [error]^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
> not found: value Types
> [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> [error]  ^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
> not found: value Types
> [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> [error]
> ^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
> not found: value Types
> [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
> Types.STRING, Types.STRING, Types.Float)
> [error]
> ^
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:172:
> trait TableSink takes type parameters
> [error] val sink: TableSink = new
> CsvTableSink(writeDirectory+fileName, fieldDelim = ",")
> [error]   ^
> [error] 6 errors found
>
> May be I am not importing the correct dependencies.
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>


Getting compilation error in Array[TypeInformation]

2018-08-09 Thread Mich Talebzadeh
This is the code in Scala

val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
'timeissued, 'price)
val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
'price > 99.0).select('key, 'ticker, 'timeissued, 'price)

val fieldNames: Array[String] = Array("key", "ticker", "timeissued",
"price")
val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
val sink: TableSink = new CsvTableSink(writeDirectory+fileName,
fieldDelim = ",")
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
result.insertInto("CsvSinkTable")

When compiling I get the following error

[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
class TypeInformation takes type parameters
[error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
[error]   ^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
[error]^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
[error]  ^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
[error]
^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171:
not found: value Types
[error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING,
Types.STRING, Types.STRING, Types.Float)
[error]
^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:172:
trait TableSink takes type parameters
[error] val sink: TableSink = new CsvTableSink(writeDirectory+fileName,
fieldDelim = ",")
[error]   ^
[error] 6 errors found

May be I am not importing the correct dependencies.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Working out through individual messages in Flink

2018-08-07 Thread Mich Talebzadeh
Hi Jorn,

Thanks I uploaded the Scala code to my GitHub  --> md_streaming.scala

https://github.com/michTalebzadeh/Flink

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 7 Aug 2018 at 23:29, Jörn Franke  wrote:

> Hi Mich,
>
> Would it be possible to share the full source code ?
> I am missing a call to streamExecEnvironment.execute
>
> Best regards
>
> On 8. Aug 2018, at 00:02, Mich Talebzadeh 
> wrote:
>
> Hi Fabian,
>
> Reading your notes above I have converted the table back to DataStream.
>
> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
> tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
> 'timeissued, 'price)
>
>val key =
> tableEnv.scan("priceTable").select('key).toDataStream[Row]
>val ticker =
> tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>val timeissued =
> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>val price =
> tableEnv.scan("priceTable").select('price).toDataStream[Row]
>
> My intension is to create an Hbase sink as follows:
>
> // Save prices to Hbase table
>  var p = new Put(new String(key).getBytes())
>  p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
> new String(ticker).getBytes())
>  p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new
> String(timeissued).getBytes())
>  p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
> new String(priceToString).getBytes())
>  p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
> new String(CURRENCY).getBytes())
>  p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
> new String(1.toString).getBytes())
>      p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
> new String(System.currentTimeMillis.toString).getBytes())
>  HbaseTable.put(p)
>  HbaseTable.flushCommits()
>
> However, I don't seem to be able to get the correct values for the columns!
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 30 Jul 2018 at 09:58, Fabian Hueske  wrote:
>
>> A *Table*Source [1], is a special input connector for Flink's relational
>> APIs (Table API and SQL) [2].
>> You can transform and filter with these APIs as well (it's probably even
>> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>>
>> However, there is no *Table*Sink for HBase and you would need to convert
>> the Table back to a DataStream [3].
>> That's not very difficult since the APIs are integrated with each other.
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>>
>> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :
>>
>>> Thanks Fabian. That was very useful.
>>>
>>> How about an operation like below?
>>>
>>>  // create builder
>>>  val KafkaTableSource = Kafka011JsonTableSource.builder()
>>>  // set Kafka topic
>>> .forTopic(topicsValue)
>>>  // set Kafka consumer properties
>>> .w

Re: Working out through individual messages in Flink

2018-08-07 Thread Mich Talebzadeh
Hi Fabian,

Reading your notes above I have converted the table back to DataStream.

val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
'timeissued, 'price)

   val key =
tableEnv.scan("priceTable").select('key).toDataStream[Row]
   val ticker =
tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
   val timeissued =
tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
   val price =
tableEnv.scan("priceTable").select('price).toDataStream[Row]

My intension is to create an Hbase sink as follows:

// Save prices to Hbase table
 var p = new Put(new String(key).getBytes())
 p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
new String(ticker).getBytes())
 p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new
String(timeissued).getBytes())
 p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
new String(priceToString).getBytes())
 p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
new String(CURRENCY).getBytes())
 p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
new String(1.toString).getBytes())
 p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
new String(System.currentTimeMillis.toString).getBytes())
 HbaseTable.put(p)
 HbaseTable.flushCommits()

However, I don't seem to be able to get the correct values for the columns!

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 09:58, Fabian Hueske  wrote:

> A *Table*Source [1], is a special input connector for Flink's relational
> APIs (Table API and SQL) [2].
> You can transform and filter with these APIs as well (it's probably even
> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>
> However, there is no *Table*Sink for HBase and you would need to convert
> the Table back to a DataStream [3].
> That's not very difficult since the APIs are integrated with each other.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>
> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :
>
>> Thanks Fabian. That was very useful.
>>
>> How about an operation like below?
>>
>>  // create builder
>>  val KafkaTableSource = Kafka011JsonTableSource.builder()
>>  // set Kafka topic
>> .forTopic(topicsValue)
>>  // set Kafka consumer properties
>> .withKafkaProperties(properties)
>>  // set Table schema
>> .withSchema(TableSchema.builder()
>> .field("key", Types.STRING)
>> .field("ticker", Types.STRING)
>> .field("timeissued", Types.STRING)
>> .field("price", Types.FLOAT)
>> .build())
>>
>> Will that be OK?
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> Flink processes streams record by record, instead of micro-batchin

Re: Passing the individual table coilumn values to the local variables

2018-08-07 Thread Mich Talebzadeh
I need this operation to stored filtered rows in an Hbase table.

I can access an existing Hbase table through flink API

My challenge is to put rows into Hbase table. Something like below and I
don't seem to be able to extract individual column values from priceTable




*val key =
tableEnv.scan("priceTable").select('key).toDataStream[Row].print()
val ticker =
tableEnv.scan("priceTable").select('ticker).toDataStream[Row].print()
val timeissued =
tableEnv.scan("priceTable").select('timeissued).toDataStream[Row].print()
val price =
tableEnv.scan("priceTable").select('price).toDataStream[Row].print()*
   val CURRENCY = "GBP"
   val op_type = "1"
   val op_time = System.currentTimeMillis.toString
/*
   if (price > 99.0)
   {
// Save prices to Hbase table
 var p = new Put(new String(key).getBytes())
 p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
new String(ticker).getBytes())
 p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new
String(timeissued).getBytes())
 p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
new String(priceToString).getBytes())
 p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
new String(CURRENCY).getBytes())
 p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
new String(1.toString).getBytes())
 p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
new String(System.currentTimeMillis.toString).getBytes())
 HbaseTable.put(p)
 HbaseTable.flushCommits()
 if(tableEnv.scan("priceTable").filter('ticker == "VOD" &&
'price > 99.0))
 {
   sqltext = Calendar.getInstance.getTime.toString + ", Price
on "+ticker+" hit " +price.toString
   //java.awt.Toolkit.getDefaultToolkit().beep()
   println(sqltext)
 }
   }

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 7 Aug 2018 at 17:07, Mich Talebzadeh 
wrote:

> Hi,
>
> The following works fine
>
>tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
> 'timeissued, 'price)
> val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
> 'price > 99.0).select('key, 'ticker, 'timeissued, 'price)
> val r = result.toDataStream[Row]
> r.print()
>
> Now I would like to get the individual column values from priceTable into
> local variables
>
> This does not seem to work
>
> val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
> val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
>  val timeissued =
> tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
>  val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]
>
> What alternatives are there?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Passing the individual table coilumn values to the local variables

2018-08-07 Thread Mich Talebzadeh
Hi,

The following works fine

   tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
'timeissued, 'price)
val result = tableEnv.scan("priceTable").filter('ticker === "VOD" &&
'price > 99.0).select('key, 'ticker, 'timeissued, 'price)
val r = result.toDataStream[Row]
r.print()

Now I would like to get the individual column values from priceTable into
local variables

This does not seem to work

val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
 val timeissued =
tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
 val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]

What alternatives are there?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Increase parallelism according to the number of tasks

2018-08-07 Thread Mich Talebzadeh
This worked

streamExecEnv.setParallelism(2)

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 7 Aug 2018 at 08:48, Mich Talebzadeh 
wrote:

> Hi,
>
> In my test environment I have two task managers.
>
> I would like to increase the parallelism to 2 from default of 1. Can it be
> done through properties
>
>properties.setProperty("parallelism", "2")
>
> Although that does not change anything.
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Increase parallelism according to the number of tasks

2018-08-07 Thread Mich Talebzadeh
Hi,

In my test environment I have two task managers.

I would like to increase the parallelism to 2 from default of 1. Can it be
done through properties

   properties.setProperty("parallelism", "2")

Although that does not change anything.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Converting a DataStream into a Table throws error

2018-08-07 Thread Mich Talebzadeh
Ok gents thanks for clarification.

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 7 Aug 2018 at 02:21, Hequn Cheng  wrote:

> Hi Mich,
>
> I think this is the behavior of the compiler. When run your job in local,
> you have to remove the provided or add jar to the lib path. But if run on
> cluster, you have to add the provided to ignore flink classes, since these
> classes are already exist in your installation version.
>
> Best, Hequn
>
> On Tue, Aug 7, 2018 at 1:34 AM, Mich Talebzadeh  > wrote:
>
>> Thanks Fabian,
>>
>> I looked at the maven and this is what it says *provided*
>>
>>
>> [image: image.png]
>> However, this jar file is not shipped with Flink? Is this deliberate?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 6 Aug 2018 at 18:14, Fabian Hueske  wrote:
>>
>>> The problem is that you declared it as provided.
>>> This means the build tool assumes it will be there and therefore does
>>> not include it in the Jar file.
>>> By adding it to the lib folder you are providing the dependency.
>>>
>>> Best, Fabian
>>>
>>> 2018-08-06 18:58 GMT+02:00 Mich Talebzadeh :
>>>
>>>> Hi,
>>>>
>>>> I resolved this issue of
>>>>
>>>> java.lang.NoClassDefFoundError: org/apache/flink/table/api/
>>>> TableEnvironment
>>>>
>>>> By adding the jar file
>>>>
>>>> flink-table_2.11-1.5.0.jar
>>>>
>>>> To $FLINK_HOME/lib
>>>>
>>>> It compiles and run OK now.
>>>>
>>>> Rather strange as I had this dependency in my SBT
>>>>
>>>> libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
>>>> "provided"
>>>>
>>>>
>>>> HTH
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 2 Aug 2018 at 08:26, Mich Talebzadeh 
>>>> wrote:
>>>>
>>>>> Thanks everyone for the advice
>>>>>
>>>>> This worked and passed the compilation error
>>>>>
>>>>> import org.apache.flink.table.api.TableEnvironment
>>>>> import org.apache.flink.table.api.scala._
>>>>> import org.apache.flink.api.scala._
>>>>>
>>>>> ….
>>>>>
>>>>> val dataStream =  streamExecEnv
>>>>>.addSource(new FlinkKafkaConsumer011[String](t

Running SQL to print to Std Out

2018-08-06 Thread Mich Talebzadeh
Hi,

This is the streaming program I have for trade prices following the doc for
result set for tables

https://flink.apache.org/news/2017/03/29/table-sql-api-update.html

val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val ds =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
val splitStream = ds.map(new MapFunction[String, Tuple4[String, String,
String, Float]] {
  override def map(value: String): Tuple4[String, String, String,
Float] = {
var cols = value.split(',')
return (cols(0).toString, cols(1).toString, cols(2).toString,
cols(3).toFloat)
  }
})
val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker,
'timeissued, 'price)

val result =
tableEnv.scan("priceTable").filter('ticker.isNotNull).select('key, 'ticker,
'timeissued, 'price)
val r = result.toDataStream[Row]
r.print()

This compiles and runs but I do not see any ouput to screen.

This is the output from Flink GUI

[image: image.png]

I can verify that data being streamed in so there is no issue there.
However, I don't see any output and Flink GUI does not look healthy
(circles).

Appreciate any input.

Thanks,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Converting a DataStream into a Table throws error

2018-08-06 Thread Mich Talebzadeh
Thanks Fabian,

I looked at the maven and this is what it says *provided*


[image: image.png]
However, this jar file is not shipped with Flink? Is this deliberate?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 6 Aug 2018 at 18:14, Fabian Hueske  wrote:

> The problem is that you declared it as provided.
> This means the build tool assumes it will be there and therefore does not
> include it in the Jar file.
> By adding it to the lib folder you are providing the dependency.
>
> Best, Fabian
>
> 2018-08-06 18:58 GMT+02:00 Mich Talebzadeh :
>
>> Hi,
>>
>> I resolved this issue of
>>
>> java.lang.NoClassDefFoundError: org/apache/flink/table/api/
>> TableEnvironment
>>
>> By adding the jar file
>>
>> flink-table_2.11-1.5.0.jar
>>
>> To $FLINK_HOME/lib
>>
>> It compiles and run OK now.
>>
>> Rather strange as I had this dependency in my SBT
>>
>> libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
>> "provided"
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 2 Aug 2018 at 08:26, Mich Talebzadeh 
>> wrote:
>>
>>> Thanks everyone for the advice
>>>
>>> This worked and passed the compilation error
>>>
>>> import org.apache.flink.table.api.TableEnvironment
>>> import org.apache.flink.table.api.scala._
>>> import org.apache.flink.api.scala._
>>>
>>> ….
>>>
>>> val dataStream =  streamExecEnv
>>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>>> SimpleStringSchema(), properties))
>>>val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>>   tableEnv.registerDataStream("priceTable", dataStream, "key, ticker,
>>> timeissued, price")
>>>   sqltext  = "SELECT key from priceTable";
>>>   val result = tableEnv.sql(sqltext);
>>>
>>> Now I get this runtime error
>>>
>>> ….
>>> [success] Total time: 16 s, completed Aug 2, 2018 8:23:12 AM
>>> Completed compiling
>>> Thu Aug 2 08:23:12 BST 2018 , Running in **Standalone mode**
>>> Starting execution of program
>>> java.lang.NoClassDefFoundError:
>>> org/apache/flink/table/api/TableEnvironment$
>>> at md_streaming$.main(md_streaming.scala:140)
>>> at md_streaming.main(md_streaming.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
>>>

Re: Converting a DataStream into a Table throws error

2018-08-06 Thread Mich Talebzadeh
Hi,

I resolved this issue of

java.lang.NoClassDefFoundError: org/apache/flink/table/api/TableEnvironment

By adding the jar file

flink-table_2.11-1.5.0.jar

To $FLINK_HOME/lib

It compiles and run OK now.

Rather strange as I had this dependency in my SBT

libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
"provided"


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 2 Aug 2018 at 08:26, Mich Talebzadeh 
wrote:

> Thanks everyone for the advice
>
> This worked and passed the compilation error
>
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala._
> import org.apache.flink.api.scala._
>
> ….
>
> val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   tableEnv.registerDataStream("priceTable", dataStream, "key, ticker,
> timeissued, price")
>   sqltext  = "SELECT key from priceTable";
>   val result = tableEnv.sql(sqltext);
>
> Now I get this runtime error
>
> ….
> [success] Total time: 16 s, completed Aug 2, 2018 8:23:12 AM
> Completed compiling
> Thu Aug 2 08:23:12 BST 2018 , Running in **Standalone mode**
> Starting execution of program
> java.lang.NoClassDefFoundError:
> org/apache/flink/table/api/TableEnvironment$
> at md_streaming$.main(md_streaming.scala:140)
> at md_streaming.main(md_streaming.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.table.api.TableEnvironment$
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 2 Aug 2018 at 03:07, vino yang  wrote:
>
>> Hi Mich,
>>
>> It seems that the type of your DataStream stream is always wrong.
>> If you want to s

Re: Converting a DataStream into a Table throws error

2018-08-03 Thread Mich Talebzadeh
Apologies should read Vino and Timo

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 3 Aug 2018 at 09:14, Mich Talebzadeh 
wrote:

> Thanks a lot Timo.
>
> I will try the changes suggested.
>
> Appreciated
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 3 Aug 2018 at 03:56, vino yang  wrote:
>
>> Hi Mich,
>>
>> I have reviewed your code in the github you provided.
>>
>> I copied your code to org.apache.flink.table.examples.scala under
>> flink-examples-table. It passed the compilation and didn't report the
>> exception you provided, although there are other exceptions (it's about
>> hdfs, this is because of my environment).
>> First of all, the necessary dependencies are just a few:
>>
>> **
>> *org.apache.flink*
>> *flink-table_${scala.binary.version}*
>> *${project.version}*
>> **
>>
>> **
>> *org.apache.flink*
>> *flink-streaming-scala_${scala.binary.version}*
>> *${project.version}*
>> **
>>
>> **
>> *org.apache.flink*
>>
>> *flink-connector-kafka-0.11_${scala.binary.version}*
>> *${project.version}*
>> **
>>
>> Please remove irrelevant dependencies.
>>
>> In addition, your StreamExecutionEnvironment object has a problem with
>> its reference. The correct path should be:
>> Import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>
>> In addition, you specify four fields, but your stream object is
>> DataStream[String]. This is also a mistake. You are advised to parse it
>> first with a MapFunction. The example is as follows:
>>
>> *Val dataStream = streamExecEnv*
>> *  .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>> SimpleStringSchema(), properties))*
>>
>> *Val newDateStream = dataStream.map(new MapFunction[String,
>> Tuple4[String, String, String, String]] {*
>> *  Override def map(value: String): Tuple4[String, String, String,
>> String] = {*
>> *    Return null*
>> *  }*
>> *})*
>>
>> *Val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)*
>> *//tableEnv.registerDataStream("table1", streamExecEnv, 'key,
>> 'ticker, 'timeissued, 'price)*
>> *tableEnv.registerDataStream("priceTable", newDateStream, 'key,
>> 'ticker, 'timeissued, 'price)*
>>
>> Thanks, vino.
>>
>> 2018-08-02 20:36 GMT+08:00 Mich Talebzadeh :
>>
>>> Appreciate if anyone had a chance to look at the Scala code in GitHub
>>> and advise
>>>
>>> https://github.com/michTalebzadeh/Flink
>>>
>>> Regards,
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 2 Aug 2018 at 09:06, Mich Talebzadeh 
>>> wrote:
>>>

Re: Converting a DataStream into a Table throws error

2018-08-03 Thread Mich Talebzadeh
Thanks a lot Timo.

I will try the changes suggested.

Appreciated

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 3 Aug 2018 at 03:56, vino yang  wrote:

> Hi Mich,
>
> I have reviewed your code in the github you provided.
>
> I copied your code to org.apache.flink.table.examples.scala under
> flink-examples-table. It passed the compilation and didn't report the
> exception you provided, although there are other exceptions (it's about
> hdfs, this is because of my environment).
> First of all, the necessary dependencies are just a few:
>
> **
> *org.apache.flink*
> *flink-table_${scala.binary.version}*
> *${project.version}*
> **
>
> **
> *org.apache.flink*
> *flink-streaming-scala_${scala.binary.version}*
> *${project.version}*
> **
>
> **
> *org.apache.flink*
>
> *flink-connector-kafka-0.11_${scala.binary.version}*
> *${project.version}*
> **
>
> Please remove irrelevant dependencies.
>
> In addition, your StreamExecutionEnvironment object has a problem with its
> reference. The correct path should be:
> Import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>
> In addition, you specify four fields, but your stream object is
> DataStream[String]. This is also a mistake. You are advised to parse it
> first with a MapFunction. The example is as follows:
>
> *Val dataStream = streamExecEnv*
> *  .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))*
>
> *Val newDateStream = dataStream.map(new MapFunction[String,
> Tuple4[String, String, String, String]] {*
> *  Override def map(value: String): Tuple4[String, String, String,
> String] = {*
> *Return null*
> *  }*
> *})*
>
> *Val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)*
> *//tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker,
> 'timeissued, 'price)*
> *tableEnv.registerDataStream("priceTable", newDateStream, 'key,
> 'ticker, 'timeissued, 'price)*
>
> Thanks, vino.
>
> 2018-08-02 20:36 GMT+08:00 Mich Talebzadeh :
>
>> Appreciate if anyone had a chance to look at the Scala code in GitHub and
>> advise
>>
>> https://github.com/michTalebzadeh/Flink
>>
>> Regards,
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 2 Aug 2018 at 09:06, Mich Talebzadeh 
>> wrote:
>>
>>> Thanks Timo,
>>>
>>> Did as suggested getting this compilation error
>>>
>>> [info] Compiling 1 Scala source to
>>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>>> [error]
>>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:136:
>>> could not find implicit value for evidence parameter of type
>>> org.apache.flink.api.common.typeinfo.TypeInformation[String]
>>> [error].addSource(new FlinkKafkaConsumer011[String](topicsValue,
>>> new SimpleStringSchema(), properties))
>>> [error]  ^
>>> [error] one error found
>>> [error] (compile:compileIncremental) Compilation failed
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> 

Re: Converting a DataStream into a Table throws error

2018-08-02 Thread Mich Talebzadeh
Appreciate if anyone had a chance to look at the Scala code in GitHub and
advise

https://github.com/michTalebzadeh/Flink

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 2 Aug 2018 at 09:06, Mich Talebzadeh 
wrote:

> Thanks Timo,
>
> Did as suggested getting this compilation error
>
> [info] Compiling 1 Scala source to
> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:136:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error].addSource(new FlinkKafkaConsumer011[String](topicsValue,
> new SimpleStringSchema(), properties))
> [error]  ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 2 Aug 2018 at 09:01, Timo Walther  wrote:
>
>> Whenever you use Scala and there is a Scala specific class use it.
>>
>> remove: import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> add: import org.apache.flink.streaming.api.scala._
>>
>> This will use
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.
>>
>> Timo
>>
>> Am 02.08.18 um 09:47 schrieb Mich Talebzadeh:
>>
>> Tremendous. Many thanks.
>>
>> Put the sbt build file and the Scala code here
>>
>> https://github.com/michTalebzadeh/Flink
>>
>> Regards,
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 2 Aug 2018 at 08:27, Timo Walther  wrote:
>>
>>> Hi Mich,
>>>
>>> could you share your project with us (maybe on github)? Then we can
>>> import it and debug what the problem is.
>>>
>>> Regards,
>>> Timo
>>>
>>> Am 02.08.18 um 07:37 schrieb Mich Talebzadeh:
>>>
>>> Hi Jorn,
>>>
>>> Here you go the dependencies
>>>
>>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
>>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
>>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
>>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
>>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
>>> libraryDependencies += "org.apache.flink" %%
>>> "flink-connector-kafka-0.11" % "1.5.0"
>>> libraryDependencies += "org.apache.flink" %%
>>> "flink-connector-kafka-base" % "1.5.0"
>>> libraryDependencies += "org.apache.flink" %% "flink-scala" % &

Re: Converting a DataStream into a Table throws error

2018-08-02 Thread Mich Talebzadeh
Thanks Timo,

Did as suggested getting this compilation error

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:136:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error].addSource(new FlinkKafkaConsumer011[String](topicsValue,
new SimpleStringSchema(), properties))
[error]  ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 2 Aug 2018 at 09:01, Timo Walther  wrote:

> Whenever you use Scala and there is a Scala specific class use it.
>
> remove: import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> add: import org.apache.flink.streaming.api.scala._
>
> This will use
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.
>
> Timo
>
> Am 02.08.18 um 09:47 schrieb Mich Talebzadeh:
>
> Tremendous. Many thanks.
>
> Put the sbt build file and the Scala code here
>
> https://github.com/michTalebzadeh/Flink
>
> Regards,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 2 Aug 2018 at 08:27, Timo Walther  wrote:
>
>> Hi Mich,
>>
>> could you share your project with us (maybe on github)? Then we can
>> import it and debug what the problem is.
>>
>> Regards,
>> Timo
>>
>> Am 02.08.18 um 07:37 schrieb Mich Talebzadeh:
>>
>> Hi Jorn,
>>
>> Here you go the dependencies
>>
>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
>> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11"
>> % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base"
>> % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
>> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
>> "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
>> "provided"
>> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from 

Re: Converting a DataStream into a Table throws error

2018-08-02 Thread Mich Talebzadeh
Tremendous. Many thanks.

Put the sbt build file and the Scala code here

https://github.com/michTalebzadeh/Flink

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 2 Aug 2018 at 08:27, Timo Walther  wrote:

> Hi Mich,
>
> could you share your project with us (maybe on github)? Then we can import
> it and debug what the problem is.
>
> Regards,
> Timo
>
> Am 02.08.18 um 07:37 schrieb Mich Talebzadeh:
>
> Hi Jorn,
>
> Here you go the dependencies
>
> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11"
> % "1.5.0"
> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base"
> % "1.5.0"
> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
> "1.5.0"
> libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
> "provided"
> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 2 Aug 2018 at 06:19, Jörn Franke  wrote:
>
>>
>> How does your build.sbt looks especially dependencies?
>> On 2. Aug 2018, at 00:44, Mich Talebzadeh 
>> wrote:
>>
>> Changed as suggested
>>
>>val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>  val dataStream =  streamExecEnv
>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>> SimpleStringSchema(), properties))
>>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>   tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker,
>> 'timeissued, 'price)
>>
>> Still the same error
>>
>> [info] Compiling 1 Scala source to
>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>> [error]
>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
>> overloaded method value registerDataStream with alternatives:
>> [error]   [T](name: String, dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
>> String)Unit 
>> [error]   [T](name: String, dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T])Unit
>> [error]  cannot be applied to (String,
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,
>> Symbol, Symbol, Symbol, Symbol)
>> [error]   tableEnv.registerDataStream("table1", streamExecEnv, 'key,
>> 'ticker, 'timeissued, 'price)
>> [error]^
>> [error] one error found
>> [error] (compile:compileIncremental) Compilation failed
>> [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM
>>
>> Thanks anyway.
>>
>> Dr Mich Talebzadeh
>>
>>
>>
&

Re: Converting a DataStream into a Table throws error

2018-08-02 Thread Mich Talebzadeh
Thanks everyone for the advice

This worked and passed the compilation error

import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._

….

val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  tableEnv.registerDataStream("priceTable", dataStream, "key, ticker,
timeissued, price")
  sqltext  = "SELECT key from priceTable";
  val result = tableEnv.sql(sqltext);

Now I get this runtime error

….
[success] Total time: 16 s, completed Aug 2, 2018 8:23:12 AM
Completed compiling
Thu Aug 2 08:23:12 BST 2018 , Running in **Standalone mode**
Starting execution of program
java.lang.NoClassDefFoundError: org/apache/flink/table/api/TableEnvironment$
at md_streaming$.main(md_streaming.scala:140)
at md_streaming.main(md_streaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.table.api.TableEnvironment$
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 2 Aug 2018 at 03:07, vino yang  wrote:

> Hi Mich,
>
> It seems that the type of your DataStream stream is always wrong.
> If you want to specify four fields, usually the DataStream type should be
> similar: DataStream[(Type1, Type2, Type3, Type4)], not DataStream[String],
> you can try it.
>
> Thanks, vino
>
> 2018-08-02 6:44 GMT+08:00 Mich Talebzadeh :
>
>> Changed as suggested
>>
>>val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>  val dataStream =  streamExecEnv
>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>> SimpleStringSchema(), properties))
>>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>   tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker,
>> 'timeissued, 'price)
>>
>> Still the same error
>>
>> [info] Compiling 1 Scala source to
>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>> [error]
>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
>> overloaded method value registerDataStream with alternatives:
>> [error]   [T](name: String, dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
>> String)Unit 
>> [error]   [T](name: String, dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T])Unit
>> [error]  cannot be applied to (String,
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,
>&g

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Hi Jorn,

Here you go the dependencies

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
"provided"
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 2 Aug 2018 at 06:19, Jörn Franke  wrote:

>
> How does your build.sbt looks especially dependencies?
> On 2. Aug 2018, at 00:44, Mich Talebzadeh 
> wrote:
>
> Changed as suggested
>
>val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker,
> 'timeissued, 'price)
>
> Still the same error
>
> [info] Compiling 1 Scala source to
> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
> overloaded method value registerDataStream with alternatives:
> [error]   [T](name: String, dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
> String)Unit 
> [error]   [T](name: String, dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T])Unit
> [error]  cannot be applied to (String,
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,
> Symbol, Symbol, Symbol, Symbol)
> [error]   tableEnv.registerDataStream("table1", streamExecEnv, 'key,
> 'ticker, 'timeissued, 'price)
> [error]^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM
>
> Thanks anyway.
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 1 Aug 2018 at 23:34, Fabian Hueske  wrote:
>
>> Hi,
>>
>> You have to pass the StreamExecutionEnvironment to the
>> getTableEnvironment() method, not the DataStream (or DataStreamSource).
>> Change
>>
>> val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>
>> to
>>
>> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>
>> Best,
>> Fabian
>>
>> 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh :
>>
>>> Hi,
>>>
>>> FYI, these are my imports
>>>
>>> import java.util.Properties
>>> import java.util.Arrays
>>> import org.apache.flink.api.com

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Changed as suggested

   val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
  val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker,
'timeissued, 'price)

Still the same error

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139:
overloaded method value registerDataStream with alternatives:
[error]   [T](name: String, dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)Unit 
[error]   [T](name: String, dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])Unit
[error]  cannot be applied to (String,
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment,
Symbol, Symbol, Symbol, Symbol)
[error]   tableEnv.registerDataStream("table1", streamExecEnv, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM

Thanks anyway.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 1 Aug 2018 at 23:34, Fabian Hueske  wrote:

> Hi,
>
> You have to pass the StreamExecutionEnvironment to the
> getTableEnvironment() method, not the DataStream (or DataStreamSource).
> Change
>
> val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>
> to
>
> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>
> Best,
> Fabian
>
> 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh :
>
>> Hi,
>>
>> FYI, these are my imports
>>
>> import java.util.Properties
>> import java.util.Arrays
>> import org.apache.flink.api.common.functions.MapFunction
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.scala
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.flink.table.api.TableEnvironment
>> import org.apache.flink.table.api.scala._
>> import org.apache.flink.api.scala._
>> import org.apache.kafka.clients.consumer.ConsumerConfig
>> import org.apache.kafka.clients.consumer.ConsumerRecord
>> import org.apache.kafka.clients.consumer.KafkaConsumer
>> import org.apache.flink.core.fs.FileSystem
>> import org.apache.flink.streaming.api.TimeCharacteristic
>> import org.slf4j.LoggerFactory
>> import
>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011,
>> FlinkKafkaProducer011}
>> import java.util.Calendar
>> import java.util.Date
>> import java.text.DateFormat
>> import java.text.SimpleDateFormat
>> import org.apache.log4j.Logger
>> import org.apache.log4j.Level
>> import sys.process.stringSeqToProcess
>> import java.io.File
>>
>> And this is the simple code
>>
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", bootstrapServers)
>> properties.setProperty("zookeeper.connect", zookeeperConnect)
>> properties.setProperty("group.id", flinkAppName)
>> properties.setProperty("auto.offset.reset", "latest")
>> val  streamExecEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> val dataStream =  streamExecEnv
>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>> SimpleStringSchema(), properties))
>>   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
>>   tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker,
>> 'timeissued, 'price)
>>
>> And this is the compilation error
>>
>> info] Compiling 1 Scala source to
>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>> [error]
>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138:
>> overloaded me

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Hi,

FYI, these are my imports

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.slf4j.LoggerFactory
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011,
FlinkKafkaProducer011}
import java.util.Calendar
import java.util.Date
import java.text.DateFormat
import java.text.SimpleDateFormat
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
import java.io.File

And this is the simple code

val properties = new Properties()
properties.setProperty("bootstrap.servers", bootstrapServers)
properties.setProperty("zookeeper.connect", zookeeperConnect)
properties.setProperty("group.id", flinkAppName)
properties.setProperty("auto.offset.reset", "latest")
val  streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
  val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
  tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker,
'timeissued, 'price)

And this is the compilation error

info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138:
overloaded method value getTableEnvironment with alternatives:
[error]   (executionEnvironment:
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment

[error]   (executionEnvironment:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment

[error]   (executionEnvironment:
org.apache.flink.api.scala.ExecutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment

[error]   (executionEnvironment:
org.apache.flink.api.java.ExecutionEnvironment)org.apache.flink.table.api.java.BatchTableEnvironment
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String])
[error]   val tableEnv = TableEnvironment.getTableEnvironment(dataStream)
[error]   ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM
Completed compiling

which is really strange

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 1 Aug 2018 at 13:42, Fabian Hueske  wrote:

> Hi I think you are mixing Java and Scala dependencies.
>
> org.apache.flink.streaming.api.datastream.DataStream is the DataStream of
> the Java DataStream API.
> You should use the DataStream of the Scala DataStream API.
>
> Best, Fabian
>
> 2018-08-01 14:01 GMT+02:00 Mich Talebzadeh :
>
>> Hi,
>>
>> I believed I tried Hequn's suggestion and tried again
>>
>> import org.apache.flink.table.api.Table
>> import org.apache.flink.table.api.TableEnvironment
>>
>> *import org.apache.flink.table.api.scala._*
>> Unfortunately I am still getting the same error!
>>
>> [info] Compiling 1 Scala source to
>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
>> [error]
>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151:
>> overloaded method value fromDataStream with alternatives:
>> [error]   [T](dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
>> String)org.apache.flink.table.api.Table 
>> [error]   [T](dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Tab

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Hi,

I believed I tried Hequn's suggestion and tried again

import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment

*import org.apache.flink.table.api.scala._*
Unfortunately I am still getting the same error!

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Aug 1, 2018 12:59:44 PM
Completed compiling


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 1 Aug 2018 at 10:03, Timo Walther  wrote:

> If these two imports are the only imports that you added, then you did not
> follow Hequn's advice or the link that I sent you.
>
> You need to add the underscore imports to let Scala do its magic.
>
> Timo
>
>
> Am 01.08.18 um 10:28 schrieb Mich Talebzadeh:
>
> Hi Timo,
>
> These are my two flink table related imports
>
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.TableEnvironment
>
> And these are my dependencies building with SBT
>
> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11"
> % "1.5.0"
> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base"
> % "1.5.0"
> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
> libraryDependencies += "org.apache.flink" %% "flink-streaming-java" %
> "1.5.0" % "provided"
>
> *libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
> "provided" *libraryDependencies += "org.apache.kafka" %% "kafka" %
> "0.11.0.0"
>
> There appears to be conflict somewhere that cause this error
>
> [info] Compiling 1 Scala source to
> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
> overloaded method value fromDataStream with alternatives:
> [error]   [T](dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
> String)org.apache.flink.table.api.Table 
> [error]   [T](dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
> [error]  cannot be applied to
> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
> Symbol, Symbol, Symbol, Symbol)
> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
> 'ticker, 'timeissued, 'price)
> [error]^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Hi Timo,

These are my two flink table related imports

import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment

And these are my dependencies building with SBT

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-java" %
"1.5.0" % "provided"

*libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" %
"provided"*libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

There appears to be conflict somewhere that cause this error

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 1 Aug 2018 at 09:17, Timo Walther  wrote:

> Hi Mich,
>
> I would check you imports again [1]. This is a pure compiler issue that is
> unrelated to your actual data stream. Also check your project dependencies.
>
> Regards,
> Timo
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala
>
> Am 01.08.18 um 09:30 schrieb Mich Talebzadeh:
>
>
> Hi both,
>
> I added the import as Hequn suggested.
>
> My stream is very simple and consists of 4 values separated by "," as
> below
>
> 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48
>
> So this is what I have been trying to do
>
> Code
>
> val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>  //
>  //
>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker,
> 'timeissued, 'price)
>
> note those four columns in Table1 definition
>
> And this is the error being thrown
>
> [info] Compiling 1 Scala source to
> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
> overloaded method value fromDataStream with alternatives:
> [error]   [T](dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
> String)org.apache.flink.table.api.Table 
> [error]   [T](dataStream:
> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
> [error]  cannot be applied to
> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
> Symbol, Symbol, Symbol, Symbol)
> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
> 'ticker, 'ti

Re: Converting a DataStream into a Table throws error

2018-08-01 Thread Mich Talebzadeh
Hi both,

I added the import as Hequn suggested.

My stream is very simple and consists of 4 values separated by "," as below

05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48

So this is what I have been trying to do

Code

val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
 //
 //
  val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker,
'timeissued, 'price)

note those four columns in Table1 definition

And this is the error being thrown

[info] Compiling 1 Scala source to
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes...
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

I suspect dataStream may not be compatible with this operation?

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 1 Aug 2018 at 04:51, Hequn Cheng  wrote:

> Hi, Mich
>
> You can try adding "import org.apache.flink.table.api.scala._", so that
> the Symbol can be recognized as an Expression.
>
> Best, Hequn
>
> On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh  > wrote:
>
>> Hi,
>>
>> I am following this example
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api
>>
>> This is my dataStream which is built on a Kafka topic
>>
>>//
>> //Create a Kafka consumer
>> //
>> val dataStream =  streamExecEnv
>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>> SimpleStringSchema(), properties))
>>  //
>>  //
>>   val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
>>   val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker,
>> 'timeissued, 'price)
>>
>> While compiling it throws this error
>>
>> [error]
>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169:
>> overloaded method value fromDataStream with alternatives:
>> [error]   [T](dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T], fields:
>> String)org.apache.flink.table.api.Table 
>> [error]   [T](dataStream:
>> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
>> [error]  cannot be applied to
>> (org.apache.flink.streaming.api.datastream.DataStreamSource[String],
>> Symbol, Symbol, Symbol, Symbol)
>> [error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
>> 'ticker, 'timeissued, 'price)
>> [error]^
>> [error] one error found
>> [error] (compile:compileIncremental) Compilation failed
>>
>> The topic is very simple, it is comma separated prices. I tried
>> mapFunction and flatMap but neither worked!
>>
>> Thanks,
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


Converting a DataStream into a Table throws error

2018-07-31 Thread Mich Talebzadeh
Hi,

I am following this example

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api

This is my dataStream which is built on a Kafka topic

   //
//Create a Kafka consumer
//
val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))
 //
 //
  val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
  val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker,
'timeissued, 'price)

While compiling it throws this error

[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169:
overloaded method value fromDataStream with alternatives:
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T], fields:
String)org.apache.flink.table.api.Table 
[error]   [T](dataStream:
org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table
[error]  cannot be applied to
(org.apache.flink.streaming.api.datastream.DataStreamSource[String],
Symbol, Symbol, Symbol, Symbol)
[error]   val table1: Table = tableEnv.fromDataStream(dataStream, 'key,
'ticker, 'timeissued, 'price)
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

The topic is very simple, it is comma separated prices. I tried mapFunction
and flatMap but neither worked!

Thanks,


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: splitting DataStream throws error

2018-07-30 Thread Mich Talebzadeh
Thanks

So the assumption is that one cannot perform split on DataStream[String]
directly?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 21:54, Chesnay Schepler  wrote:

> You define a flatMap function that takes a string, calls String#split on
> it and collects the array.
>
> On 30.07.2018 22:04, Mich Talebzadeh wrote:
>
>
> Hi,
>
> I have Kafka streaming feeds where a row looks like below where fields are
> separated by ","
> I can split them easily with split function
>
> scala> val oneline =
> "05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48"
> oneline: String =
> 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48
> scala> oneline.split(",")
> res26: Array[String] = Array(05521df6-4ccf-4b2f-b874-eb27d461b305, IBM,
> 2018-07-30T19:51:50, 190.48)
>
> I can get the individual columns as below
>
> scala>val key = oneline.split(",").map(_.trim).view(0).toString
> key: String = 05521df6-4ccf-4b2f-b874-eb27d461b305
> scala>val key = oneline.split(",").map(_.trim).view(1).toString
> key: String = IBM
> scala>val key = oneline.split(",").map(_.trim).view(2).toString
> key: String = 2018-07-30T19:51:50
> scala>val key = oneline.split(",").map(_.trim).view(3).toFloat
> key: Float = 190.48
>
> Now when I apply the same to dataStream in flink it fails
>
> val dataStream =  streamExecEnv
>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
> SimpleStringSchema(), properties))
>
>
> *dataStream.split(",") *
> [error]
> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:154:
> type mismatch;
> [error]  found   : String(",")
> [error]  required:
> org.apache.flink.streaming.api.collector.selector.OutputSelector[String]
> [error] dataStream.split(",")
> [error]  ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
>
> What operation do I need to do on dataStream to make this split work?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>


splitting DataStream throws error

2018-07-30 Thread Mich Talebzadeh
Hi,

I have Kafka streaming feeds where a row looks like below where fields are
separated by ","
I can split them easily with split function

scala> val oneline =
"05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48"
oneline: String =
05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48
scala> oneline.split(",")
res26: Array[String] = Array(05521df6-4ccf-4b2f-b874-eb27d461b305, IBM,
2018-07-30T19:51:50, 190.48)

I can get the individual columns as below

scala>val key = oneline.split(",").map(_.trim).view(0).toString
key: String = 05521df6-4ccf-4b2f-b874-eb27d461b305
scala>val key = oneline.split(",").map(_.trim).view(1).toString
key: String = IBM
scala>val key = oneline.split(",").map(_.trim).view(2).toString
key: String = 2018-07-30T19:51:50
scala>val key = oneline.split(",").map(_.trim).view(3).toFloat
key: Float = 190.48

Now when I apply the same to dataStream in flink it fails

val dataStream =  streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))


*dataStream.split(",")*
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:154:
type mismatch;
[error]  found   : String(",")
[error]  required:
org.apache.flink.streaming.api.collector.selector.OutputSelector[String]
[error] dataStream.split(",")
[error]  ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

What operation do I need to do on dataStream to make this split work?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Thanks again.

The Hbase connector works fine in Flink

// Start Hbase table stuff
val tableName = "MARKETDATAHBASESPEEDFLINK"
val hbaseConf = HBaseConfiguration.create()
//  Connecting to remote Hbase
hbaseConf.set("hbase.master", hbaseHost)
hbaseConf.set("hbase.zookeeper.quorum",zookeeperHost)
hbaseConf.set("hbase.zookeeper.property.clientPort",zooKeeperClientPort)
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
// create this table with column family
val admin = new HBaseAdmin(hbaseConf)
if(!admin.isTableAvailable(tableName))
{
  println("Creating table " + tableName)
  val tableDesc = new HTableDescriptor(tableName)
  tableDesc.addFamily(new HColumnDescriptor("PRICE_INFO".getBytes()))
  tableDesc.addFamily(new HColumnDescriptor("OPERATION".getBytes()))
  admin.createTable(tableDesc)
} else {
  println("Table " + tableName + " already exists!!")
}
val HbaseTable = new HTable(hbaseConf, tableName)
// End Hbase table stuff

So I just need to split every row into columns and put it into Hbase as
follows:

// Save prices to Hbase table
 var p = new Put(new String(key).getBytes())
 //p.add("PRICE_INFO".getBytes(), "key".getBytes(),
new String(ticker).getBytes())
 p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),
new String(ticker).getBytes())
 p.add("PRICE_INFO".getBytes(), "SSUED".getBytes(), new
String(timeissued).getBytes())
 p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),
new String(priceToString).getBytes())
 p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),
new String(CURRENCY).getBytes())
 p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),
new String(1.toString).getBytes())
     p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),
new String(System.currentTimeMillis.toString).getBytes())
 HbaseTable.put(p)
 HbaseTable.flushCommits()


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 09:58, Fabian Hueske  wrote:

> A *Table*Source [1], is a special input connector for Flink's relational
> APIs (Table API and SQL) [2].
> You can transform and filter with these APIs as well (it's probably even
> easier). In SQL this would be the SELECT and WHERE clauses of a query.
>
> However, there is no *Table*Sink for HBase and you would need to convert
> the Table back to a DataStream [3].
> That's not very difficult since the APIs are integrated with each other.
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>
> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh :
>
>> Thanks Fabian. That was very useful.
>>
>> How about an operation like below?
>>
>>  // create builder
>>  val KafkaTableSource = Kafka011JsonTableSource.builder()
>>  // set Kafka topic
>> .forTopic(topicsValue)
>>  // set Kafka consumer properties
>> .withKafkaProperties(properties)
>>  // set Table schema
>> .withSchema(TableSchema.builder()
>> .field("key", Types.STRING)
>> .field("ticker", Types.STRING)
>> .field("timeissued", Types.STRING)
>> .field("price", Types.FLOAT)
>> .build())
>>
>> Will that be OK?
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility f

Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Thanks Fabian. That was very useful.

How about an operation like below?

 // create builder
 val KafkaTableSource = Kafka011JsonTableSource.builder()
 // set Kafka topic
.forTopic(topicsValue)
 // set Kafka consumer properties
.withKafkaProperties(properties)
 // set Table schema
.withSchema(TableSchema.builder()
.field("key", Types.STRING)
.field("ticker", Types.STRING)
.field("timeissued", Types.STRING)
.field("price", Types.FLOAT)
    .build())

Will that be OK?


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jul 2018 at 09:19, Fabian Hueske  wrote:

> Hi,
>
> Flink processes streams record by record, instead of micro-batching
> records together. Since every record comes by itself, there is no for-each.
> Simple record-by-record transformations can be done with a MapFunction,
> filtering out records with a FilterFunction. You can also implement a
> FlatMapFunction to do both in one step.
>
> Once the stream is transformed and filtered, you can write it to HBase
> with a sink function.
>
>
> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh :
>
>> Just to clarify these are the individual prices separated by ','. The
>> below shows three price lines in the topic
>>
>> UUID,Security, Time,Price
>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>>>
>>> In Spark streaming I go through the RDD and walk through rows one by one
>>> and check prices
>>> In prices are valuable I store them into an Hbase table
>>>
>>> val dstream = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>> dstream.cache()
>>> dstream.foreachRDD
>>> { pricesRDD =>
>>>   // Work on individual messages
>>>   *   for(line <- pricesRDD.collect.toArray)*
>>>  {
>>>var key = line._2.split(',').view(0).toString
>>>var ticker =  line._2.split(',').view(1).toString
>>>var timeissued = line._2.split(',').view(2).toString
>>>var price = line._2.split(',').view(3).toFloat
>>>val priceToString = line._2.split(',').view(3)
>>> if (price > 90.0)
>>>{
>>>//save to Hbase table
>>>}
>>>   }
>>>  }
>>>
>>> That works fine.
>>>
>>> In Flink I define my source as below
>>>
>>> val streamExecEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> val stream = streamExecEnv
>>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>>> SimpleStringSchema(), properties))
>>>
>>> Is there anyway I can perform similar operation in Flink? I need to

Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds.

In Spark streaming I go through the RDD and walk through rows one by one
and check prices
In prices are valuable I store them into an Hbase table

val dstream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
dstream.cache()
dstream.foreachRDD
{ pricesRDD =>
  // Work on individual messages
  *   for(line <- pricesRDD.collect.toArray)*
 {
   var key = line._2.split(',').view(0).toString
   var ticker =  line._2.split(',').view(1).toString
   var timeissued = line._2.split(',').view(2).toString
   var price = line._2.split(',').view(3).toFloat
   val priceToString = line._2.split(',').view(3)
if (price > 90.0)
   {
   //save to Hbase table
   }
  }
 }

That works fine.

In Flink I define my source as below

val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = streamExecEnv
   .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go
through every topic load sent and look at the individual rows/ For example
what is the equivalent of

*for(line <- pricesRDD.collect.toArray)*
In flink?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


not found: type CustomWatermarkEmitter

2018-07-29 Thread Mich Talebzadeh
Emitter())
[error]  ^
[error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99:
type mismatch;
[error]  found   :
org.apache.flink.streaming.api.datastream.DataStreamSource[String]
[error]  required:
org.apache.flink.streaming.api.functions.source.SourceFunction[?]
[error] env.addSource(myConsumer).print()
[error]   ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM
Completed compiling
Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode**
Could not build the program from JAR file.

I don't see why it is failing. Appreciate any suggestions.

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Does Flink release Hadoop(R) 2.8 work with Hadoop 3?

2018-07-28 Thread Mich Talebzadeh
Hi Vino,

Many thanks.

I can confirm that Flink version flink-1.5.0-bin-hadoop28-scala_2.11 works
fine with Hadoop 3.1.0. I did not need to build it from source.

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 28 Jul 2018 at 07:48, vino yang  wrote:

> Hi Mich,
>
> I think this depends on the backward compatibility of the Hadoop client
> API. In theory, there is no problem.
> Hadoop 2.8 to Hadoop 3.0 is a very large upgrade, and personally recommend
> using a client version that is consistent with the Hadoop cluster.
> By compiling and packaging from source, you can let Flink bundle specific
> Hadoop versions. Flink has shaded hadoop see here[1]. You can try to change
> the maven version and repackage the project.
>
> [1]: https://github.com/apache/flink/tree/master/flink-shaded-hadoop
>
> Thanks, vino.
>
> 2018-07-27 23:31 GMT+08:00 Mich Talebzadeh :
>
>> Hi,
>>
>> I can run Flink without bundled Hadoop fine. I was wondering if Flink
>> with Hadoop 2.8 works with Hadoop 3 as well?
>>
>> Thanks,
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


Does Flink release Hadoop(R) 2.8 work with Hadoop 3?

2018-07-27 Thread Mich Talebzadeh
Hi,

I can run Flink without bundled Hadoop fine. I was wondering if Flink with
Hadoop 2.8 works with Hadoop 3 as well?

Thanks,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Real time streaming as a microservice

2018-07-15 Thread Mich Talebzadeh
Hi Deepak,

I will put it there once all the bits and pieces come together. At the
moment I am drawing the diagrams. I will let you know.

Definitely everyone's contribution is welcome.

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 15 Jul 2018 at 09:16, Deepak Sharma  wrote:

> Is it on github Mich ?
> I would love to use the flink and spark edition and add some use cases
> from my side.
>
> Thanks
> Deepak
>
> On Sun, Jul 15, 2018, 13:38 Mich Talebzadeh 
> wrote:
>
>> Hi all,
>>
>> I have now managed to deploy both ZooKeeper and Kafka as microservices
>> using docker images.
>>
>> The idea came to me as I wanted to create lightweight processes for both
>> ZooKeeper and Kafka to be used as services for Flink and Spark
>> simultaneously.
>>
>> In this design both Flink and Spark rely on streaming market data
>> messages published through Kafka. My current design is simple one docker
>> for Zookeeper and another for Kafka
>>
>> [root@rhes75 ~]# docker ps -a
>> CONTAINER IDIMAGE   COMMAND
>> CREATED STATUS
>> PORTSNAMES
>> 05cf097ac139ches/kafka  "/start.sh"  9 hours
>> ago Up 9 hours  *0.0.0.0:7203->7203/tcp,
>> 0.0.0.0:9092->9092/tcp*   kafka
>> b173e455cc80jplock/zookeeper"/opt/zookeeper/bin/…"   10 hours
>> agoUp 10 hours (healthy)   *2888/tcp, 0.0.0.0:2181->2181/tcp,
>> 3888/tcp*   zookeeper
>>
>> Note that the docker ports are exposed to the physical host that running
>> the containers.
>>
>> A test message is simply created as follows:
>> ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181
>> --replication-factor 1 --partitions 1 --topic test
>>
>> Note that rhes75 is the host that houses the dockers and port 2181 is the
>> zookeeper port used by the zookeeper docker and mapped
>>
>> The spark streaming uses speed layer in Lambda architecture to write to
>> an Hbase table for selected market data (Hbase requires connectivity to a
>> Zookeeper). For Hbase I specified a zookeeper instance running on another
>> host and Hbase works fine.
>>
>> Anyway I will provide further info and diagrams.
>>
>> Cheers,
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 15 Jul 2018 at 08:40, Mich Talebzadeh 
>> wrote:
>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>> Thanks got it sorted.
>>>
>>> Regards,
>>>
>>>
>>> On Tue, 10 Jul 2018 at 09:24, Mich Talebzadeh 
>>> wrote:
>>>
>>>> Thanks R

Re: Real time streaming as a microservice

2018-07-15 Thread Mich Talebzadeh
Hi all,

I have now managed to deploy both ZooKeeper and Kafka as microservices
using docker images.

The idea came to me as I wanted to create lightweight processes for both
ZooKeeper and Kafka to be used as services for Flink and Spark
simultaneously.

In this design both Flink and Spark rely on streaming market data messages
published through Kafka. My current design is simple one docker for
Zookeeper and another for Kafka

[root@rhes75 ~]# docker ps -a
CONTAINER IDIMAGE   COMMAND
CREATED STATUS
PORTSNAMES
05cf097ac139ches/kafka  "/start.sh"  9 hours
ago Up 9 hours  *0.0.0.0:7203->7203/tcp,
0.0.0.0:9092->9092/tcp*   kafka
b173e455cc80jplock/zookeeper"/opt/zookeeper/bin/…"   10 hours
agoUp 10 hours (healthy)   *2888/tcp, 0.0.0.0:2181->2181/tcp,
3888/tcp*   zookeeper

Note that the docker ports are exposed to the physical host that running
the containers.

A test message is simply created as follows:
${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181
--replication-factor 1 --partitions 1 --topic test

Note that rhes75 is the host that houses the dockers and port 2181 is the
zookeeper port used by the zookeeper docker and mapped

The spark streaming uses speed layer in Lambda architecture to write to an
Hbase table for selected market data (Hbase requires connectivity to a
Zookeeper). For Hbase I specified a zookeeper instance running on another
host and Hbase works fine.

Anyway I will provide further info and diagrams.

Cheers,


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 15 Jul 2018 at 08:40, Mich Talebzadeh 
wrote:

>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
> Thanks got it sorted.
>
> Regards,
>
>
> On Tue, 10 Jul 2018 at 09:24, Mich Talebzadeh 
> wrote:
>
>> Thanks Rahul.
>>
>> This is the outcome of
>>
>> [root@rhes75 ~]# iptables -t nat -L -n
>> Chain PREROUTING (policy ACCEPT)
>> target prot opt source   destination
>> DOCKER all  --  0.0.0.0/00.0.0.0/0ADDRTYPE
>> match dst-type LOCAL
>> Chain INPUT (policy ACCEPT)
>> target prot opt source   destination
>> Chain OUTPUT (policy ACCEPT)
>> target prot opt source   destination
>> DOCKER all  --  0.0.0.0/0   !127.0.0.0/8  ADDRTYPE
>> match dst-type LOCAL
>> Chain POSTROUTING (policy ACCEPT)
>> target prot opt source   destination
>> MASQUERADE  all  --  172.17.0.0/160.0.0.0/0
>> MASQUERADE  all  --  172.18.0.0/160.0.0.0/0
>> RETURN all  --  192.168.122.0/24 224.0.0.0/24
>> RETURN all  --  192.168.122.0/24 255.255.255.255
>> MASQUERADE  tcp  --  192.168.122.0/24!192.168.122.0/24 masq
>> ports: 1024-65535
>> MASQUERADE  udp  --  192.168.122.0/24!192.168.122.0/24 masq
>> ports: 1024-65535
>> MASQUERADE  all  --  192.168.122.0/24!192.168.122.0/24
>> Chain DOCKER (2 references)
>> target prot opt source   destination
>> RETURN all  --  0.0.0.0/00.0.0.0/0
>> RETURN all  --  0.0.0.0/00.0.0.0/0
>>
>> So basically I need to connect to container from another host as the link
>> points it out.
>>
>> My docker is already running.
>>
>> [root@rhes75 ~]# docker ps -a
>> CONTAINER ID    IMAGE   COMMAND
>> CREATED STATUS  PORTS   NAMES
>> 8dd84a174834ubuntu  "

Re: Real time streaming as a microservice

2018-07-08 Thread Mich Talebzadeh
Thanks Jorn.

So I gather as you correctly suggested, microservices do provide value in
terms of modularisation. However, there will always "inevitably" be
scenarios where the receiving artefact say Flink needs communication
protocol changes?

thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 8 Jul 2018 at 10:25, Jörn Franke  wrote:

> That they are loosely coupled does not mean they are independent. For
> instance, you would not be able to replace Kafka with zeromq in your
> scenario. Unfortunately also Kafka sometimes needs to introduce breaking
> changes and the dependent application needs to upgrade.
> You will not be able to avoid these scenarios in the future (this is only
> possible if micro services don’t communicate with each other or if they
> would never need to change their communication protocol - pretty impossible
> ). However there are ways of course to reduce it, eg kafka could reduce the
> number of breaking changes or you can develop a very lightweight
> microservice that is very easy to change and that only deals with the
> broker integration and your application etc.
>
> > On 8. Jul 2018, at 10:59, Mich Talebzadeh 
> wrote:
> >
> > Hi,
> >
> > I have created the Kafka messaging architecture as a microservice that
> > feeds both Spark streaming and Flink. Spark streaming uses micro-batches
> > meaning "collect and process data" and flink as an event driven
> > architecture (a stateful application that reacts to incoming events by
> > triggering computations etc.
> >
> > According to Wikipedia, A Microservice is a  technique that structures an
> > application as a collection of loosely coupled services. In a
> microservices
> > architecture, services are fine-grained and the protocols are
> lightweight.
> >
> > Ok for streaming data among other things I have to create and configure
> > topic (or topics), design a robust zookeeper ensemble and create Kafka
> > brokers with scalability and resiliency. Then I can offer the streaming
> as
> > a microservice to subscribers among them Spark and Flink. I can upgrade
> > this microservice component in isolation without impacting either Spark
> or
> > Flink.
> >
> > The problem I face here is the dependency on Flink etc on the jar files
> > specific for the version of Kafka deployed. For example kafka_2.12-1.1.0
> is
> > built on Scala 2.12 and Kafka version 1.1.0. To make this work in Flink
> 1.5
> > application, I need  to use the correct dependency in sbt build. For
> > example:
> > libraryDependencies += "org.apache.flink" %%
> "flink-connector-kafka-0.11" %
> > "1.5.0"
> > libraryDependencies += "org.apache.flink" %%
> "flink-connector-kafka-base" %
> > "1.5.0"
> > libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
> > libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
> > libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
> > "1.5.0"
> > libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"
> >
> > and the Scala code needs to change:
> >
> > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
> > …
> >val stream = env
> > .addSource(new FlinkKafkaConsumer011[String]("md", new
> > SimpleStringSchema(), properties))
> >
> > So in summary some changes need to be made to Flink to be able to
> interact
> > with the new version of Kafka. And more importantly if one can use an
> > abstract notion of microservice here?
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> > <
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >*
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
>


Real time streaming as a microservice

2018-07-08 Thread Mich Talebzadeh
Hi,

I have created the Kafka messaging architecture as a microservice that
feeds both Spark streaming and Flink. Spark streaming uses micro-batches
meaning "collect and process data" and flink as an event driven
architecture (a stateful application that reacts to incoming events by
triggering computations etc.

According to Wikipedia, A Microservice is a  technique that structures an
application as a collection of loosely coupled services. In a microservices
architecture, services are fine-grained and the protocols are lightweight.

Ok for streaming data among other things I have to create and configure
topic (or topics), design a robust zookeeper ensemble and create Kafka
brokers with scalability and resiliency. Then I can offer the streaming as
a microservice to subscribers among them Spark and Flink. I can upgrade
this microservice component in isolation without impacting either Spark or
Flink.

The problem I face here is the dependency on Flink etc on the jar files
specific for the version of Kafka deployed. For example kafka_2.12-1.1.0 is
built on Scala 2.12 and Kafka version 1.1.0. To make this work in Flink 1.5
application, I need  to use the correct dependency in sbt build. For
example:
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
"1.5.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"

and the Scala code needs to change:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
…
val stream = env
 .addSource(new FlinkKafkaConsumer011[String]("md", new
SimpleStringSchema(), properties))

So in summary some changes need to be made to Flink to be able to interact
with the new version of Kafka. And more importantly if one can use an
abstract notion of microservice here?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: A use-case for Flink and reactive systems

2018-07-05 Thread Mich Talebzadeh
Hi,

What you are saying is I can either use Flink and forget database layer, or
make a java microservice with a database. Mixing Flink with a Database
doesn't make any sense.

I would have thought that moving with microservices concept, Flink handling
streaming data from the upstream microservice is an independent entity and
microservice on its own assuming loosely coupled terminologies here

Database tier is another microservice that interacts with your Flink and
can serve other consumers. Indeed in classic CEP like Aleri or StreamBase
you may persist your data to an external database for analytice.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 5 Jul 2018 at 12:26, Yersinia Ruckeri 
wrote:

> It helps, at least it's fairly clear now.
> I am not against storing the state into Flink, but as per your first
> point, I need to get it persisted, asynchronously, in an external database
> too to let other possible application/services to retrieve the state.
> What you are saying is I can either use Flink and forget database layer,
> or make a java microservice with a database. Mixing Flink with a Database
> doesn't make any sense.
> My concerns with the database is how do you work out the previous state to
> calculate the new one? Is it good and fast? (moving money from account A to
> B isn't a problem cause you have two separate events).
>
> Moreover, a second PoC I was considering is related to Flink CEP. Let's
> say I am elaborating sensor data, I want to have a rule which is working on
> the following principle:
> - If the temperature is more than 40
> - If the temperature yesterday at noon was more than 40
> - If no one used vents yesterday and two days ago
> then do something/emit some event.
>
> This simple CEP example requires you to mine previous data/states from a
> DB, right? Can Flink be considered for it without an external DB but only
> relying on its internal RockDB ?
>
> Hope I am not generating more confusion here.
>
> Y
>
> On Thu, Jul 5, 2018 at 11:21 AM, Fabian Hueske  wrote:
>
>> Hi Yersinia,
>>
>> The main idea of an event-driven application is to hold the state (i.e.,
>> the account data) in the streaming application and not in an external
>> database like Couchbase.
>> This design is very scalable (state is partitioned) and avoids look-ups
>> from the external database because all state interactions are local.
>> Basically, you are moving the database into the streaming application.
>>
>> There are a few things to consider if you maintain the state in the
>> application:
>> - You might need to mirror the state in an external database to make it
>> queryable and available while the streaming application is down.
>> - You need to have a good design to ensure that your consistency
>> requirements are met in case of a failure (upsert writes can temporarily
>> reset the external state).
>> - The design becomes much more challenging if you need to access the
>> state of two accounts to perform a transaction (subtract from the first and
>> add to the second account) because Flink state is distributed per key and
>> does not support remote lookups.
>>
>> If you do not want to store the state in the Flink application, I agree
>> with Jörn that there's no need for Flink.
>>
>> Hope this helps,
>> Fabian
>>
>> 2018-07-05 10:45 GMT+02:00 Yersinia Ruckeri :
>>
>>> My idea started from here: https://flink.apache.org/usecases.html
>>> First use case describes what I am trying to realise (
>>> https://flink.apache.org/img/usecases-eventdrivenapps.png)
>>> My application is Flink, listening to incoming events, changing the
>>> state of an object (really an aggregate here) and pushing out another event.
>>> States can be persisted asynchronously; this is ok.
>>>
>>> My point on top to this picture is that the "state" it's not just
>>> persisting something, but retrieving the current state, manipulate it new
>>> information and persist the updated state.
>>>
>>>
>>> Y
>>>
>>> On Wed, Jul 4, 2018 at 10:16 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.co

Re: A use-case for Flink and reactive systems

2018-07-05 Thread Mich Talebzadeh
Hi Fabian,

On your point below

… Basically, you are moving the database into the streaming application.

This assumes a finite size for the data in the streaming application to
persist. In terms of capacity planning how this works?

Some applications like Fraud try to address this by deploying databases
like Aerospike <https://www.aerospike.com/> that the Keys are kept in the
memory and indexed and the values are stored on SSD devices. I was
wondering how Flink in general can address this?


Regards,


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 5 Jul 2018 at 11:22, Fabian Hueske  wrote:

> Hi Yersinia,
>
> The main idea of an event-driven application is to hold the state (i.e.,
> the account data) in the streaming application and not in an external
> database like Couchbase.
> This design is very scalable (state is partitioned) and avoids look-ups
> from the external database because all state interactions are local.
> Basically, you are moving the database into the streaming application.
>
> There are a few things to consider if you maintain the state in the
> application:
> - You might need to mirror the state in an external database to make it
> queryable and available while the streaming application is down.
> - You need to have a good design to ensure that your consistency
> requirements are met in case of a failure (upsert writes can temporarily
> reset the external state).
> - The design becomes much more challenging if you need to access the state
> of two accounts to perform a transaction (subtract from the first and add
> to the second account) because Flink state is distributed per key and does
> not support remote lookups.
>
> If you do not want to store the state in the Flink application, I agree
> with Jörn that there's no need for Flink.
>
> Hope this helps,
> Fabian
>
> 2018-07-05 10:45 GMT+02:00 Yersinia Ruckeri :
>
>> My idea started from here: https://flink.apache.org/usecases.html
>> First use case describes what I am trying to realise (
>> https://flink.apache.org/img/usecases-eventdrivenapps.png)
>> My application is Flink, listening to incoming events, changing the state
>> of an object (really an aggregate here) and pushing out another event.
>> States can be persisted asynchronously; this is ok.
>>
>> My point on top to this picture is that the "state" it's not just
>> persisting something, but retrieving the current state, manipulate it new
>> information and persist the updated state.
>>
>>
>> Y
>>
>> On Wed, Jul 4, 2018 at 10:16 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Looks interesting.
>>>
>>> As I understand you have a microservice based on ingestion where a topic
>>> is defined for streaming messages that include transactional data. These
>>> transactions should already exist in your DB. For now we look at DB as part
>>> of your microservices and we take a logical view of it.
>>>
>>> So
>>>
>>>
>>>1. First microservice M1 provides ingestion of kafka yopic
>>>2. Second microservice M2 deploys Flink or Spark Streaming to
>>>manipulate the incoming messages. We can look at this later.
>>>3. Third microservice M3 consist of the database that provides
>>>current records for accounts identified by the account number in your
>>>message queue
>>>4. M3 will have to validate the incoming account number, update the
>>>transaction and provide completion handshake. Effectively you are 
>>> providing
>>>DPaaS
>>>
>>>
>>> So far we have avoided interfaces among these services. But I gather M1
>>> and M2 are well established. Assuming that Couchbase is your choice of DB I
>>> believe it provides JDBC drivers of some sort. It does not have to be
>>> Couchbase. You can achieve the same with Hbase as well or MongoDB. Anyhow
>>> the only challenge I see here is the interface between your Flink
>>> application in M2 and M3
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>

Re: A use-case for Flink and reactive systems

2018-07-04 Thread Mich Talebzadeh
Looks interesting.

As I understand you have a microservice based on ingestion where a topic is
defined for streaming messages that include transactional data. These
transactions should already exist in your DB. For now we look at DB as part
of your microservices and we take a logical view of it.

So


   1. First microservice M1 provides ingestion of kafka yopic
   2. Second microservice M2 deploys Flink or Spark Streaming to manipulate
   the incoming messages. We can look at this later.
   3. Third microservice M3 consist of the database that provides current
   records for accounts identified by the account number in your message queue
   4. M3 will have to validate the incoming account number, update the
   transaction and provide completion handshake. Effectively you are providing
   DPaaS


So far we have avoided interfaces among these services. But I gather M1 and
M2 are well established. Assuming that Couchbase is your choice of DB I
believe it provides JDBC drivers of some sort. It does not have to be
Couchbase. You can achieve the same with Hbase as well or MongoDB. Anyhow
the only challenge I see here is the interface between your Flink
application in M2 and M3

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 4 Jul 2018 at 17:56, Yersinia Ruckeri 
wrote:

> Hi all,
>
> I am working on a prototype which should include Flink in a reactive
> systems software. The easiest use-case with a traditional bank system where
> I have one microservice for transactions and another one for
> account/balances.
> Both are connected with Kafka.
>
> Transactions record a transaction and then send, via Kafka, a message
> which include account identifer and the amount.
> On the other microservice I want to have Flink consuming this topic and
> updating the balance of my account based on the incoming message. it needs
> to pull from the DB my data and make the update.
> The DB is Couchbase.
>
> I spent few hours online today, but so far I only found there's no sink
> for Couchbase, I need to build one which shouldn't be a big deal. I haven't
> found information on how I can make Flink able to interact with a DB to
> retrieve information and store information.
>
> I guess the case is a good case, as updating state in an event sourcing
> based application is part of the first page of the manual. I am not looking
> to just dump a state into a DB, but to interact with the DB: retrieving
> data, elaborating them with the input coming from my queue, and persisting
> them (especially if I want to make a second prototype using Flink CEP).
>
> I probably even read how to do it, but I didn't recognize.
>
> Can anybody help me to figure out better this part?
>
> Thanks,
> Y.
>


Handling back pressure in Flink.

2018-07-04 Thread Mich Talebzadeh
Hi,

In spark one can handle back pressure by setting the spark conf parameter:

sparkConf.set("spark.streaming.backpressure.enabled","true")

With backpressure you make Spark Streaming application stable, i.e.
receives data only as fast as it can process it. In general one needs to
ensure that your microbatching processing time is less that your batch
interval, i.e the rate that your producer sends data into Kafka. For
example this is shown in Spark GUI below for batch interval = 2 seconds

[image: image.png]

Is there such procedure in Flink please?

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: run time error java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09

2018-07-04 Thread Mich Talebzadeh
yes indeed thanks. It is all working fine.

But only writing to a text file. I want to emulate what I do with Flink as
I do with Spark streaming writing high value events to Hbase on HDFS.


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 4 Jul 2018 at 09:18, Fabian Hueske  wrote:

> Looking at the other threads, I assume you solved this issue.
>
> The problem should have been that FlinkKafka09Consumer is not included in
> the flink-connector-kafka-0.11 module, because it is the connector for
> Kafka 0.9 and not Kafka 0.11.
>
> Best, Fabian
>
> 2018-07-02 11:20 GMT+02:00 Mich Talebzadeh :
>
>> Hi,
>>
>> I created a jar file with sbt with this sbt file
>>
>>  cat md_streaming.sbt
>> name := "md_streaming"
>> version := "1.0"
>> scalaVersion := "2.11.8"
>>
>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.3"
>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
>> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11"
>> % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base"
>> % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
>> "1.5.0"
>> libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"
>>
>> and the Scala code is very basic
>>
>> import java.util.Properties
>> import java.util.Arrays
>> import org.apache.flink.api.common.functions.MapFunction
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.streaming.api.datastream.DataStream
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.flink.streaming.util.serialization.DeserializationSchema
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> object md_streaming
>> {
>>   def main(args: Array[String])
>>   {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", "rhes75:9092")
>> properties.setProperty("zookeeper.connect", "rhes75:2181")
>> properties.setProperty("group.id", "md_streaming")
>> val stream = env
>>  .addSource(new FlinkKafkaConsumer09[String]("md", new
>> SimpleStringSchema(), properties))
>>  .writeAsText("/tmp/md_streaming.txt")
>> env.execute("Flink Kafka Example")
>>   }
>> }
>>
>> It compiles OK as follows
>>
>> Compiling md_streaming
>> [info] Set current project to md_streaming (in build
>> file:/home/hduser/dba/bin/flink/md_streaming/)
>> [success] Total time: 0 s, completed Jul 2, 2018 10:16:05 AM
>> [info] Set current project to md_streaming (in build
>> file:/home/hduser/dba/bin/flink/md_streaming/)
>> [info] Updating
>> {file:/home/hduser/dba/bin/flink/md_streaming/}md_streaming...
>> [info] Resolving jline#jline;2.12.1 ...
>> [info] Done updating.
>> [warn] Scala version was updated by one of library dependencies:
>> [warn]  * org.scala-lang:scala-library:(2.11.8, 2.11.11, 2.11.6, 2.11.7)
>> -> 2.11.12
>> [warn] To force scalaVersion, add the following:
>> [warn]  ivyScala := ivyScala.value map { _.copy(overrideScalaVers

Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign

2018-07-03 Thread Mich Talebzadeh
Hi Fabian.

Thanks. Great contribution! It is working

info] SHA-1: 98d78b909631e5d30664df6a7a4a3f421d4fd33b
[info] Packaging
/home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/md_streaming-assembly-1.0.jar
...
[info] Done packaging.
[success] Total time: 14 s, completed Jul 3, 2018 9:32:25 AM
Completed compiling
Tue Jul 3 09:32:25 BST 2018 , Running in **Standalone mode**

*Starting execution of program*
And the test prices generated in the topic are added to the file for each
security

ad534c19-fb77-4966-86a8-dc411d7a0607,MKS,2018-07-03T09:50:03,319.2
e96e5d96-03fc-4603-899f-5ea5151fc280,IBM,2018-07-03T09:50:07,124.67
a64a51a4-f27c-439b-b9cc-28dc593acab3,MRW,2018-07-03T09:50:07,286.27
2d15089d-3635-4a7e-b2d5-20b92dd67186,MSFT,2018-07-03T09:50:07,22.8
415d1215-18e6-46ca-a771-98c614e8c3fb,ORCL,2018-07-03T09:50:07,32.57
e0dd5832-20bd-4951-a4dd-3a3a10c99a01,SAP,2018-07-03T09:50:07,69.32
4222eea9-d9a7-46e1-8b1e-c2b634170fad,SBRY,2018-07-03T09:50:07,235.22
4f2e0927-29ff-44ff-aa2e-9b16fdcd0024,TSCO,2018-07-03T09:50:07,403.64
49437f8b-5e2b-42e9-b3d7-f95eee9c5432,VOD,2018-07-03T09:50:07,239.08
0f96463e-40a5-47c5-b2c6-7191992ab0b1,BP,2018-07-03T09:50:07,587.75
d0041bf1-a313-4623-a7cc-2ce8204590bb,MKS,2018-07-03T09:50:07,406.02
c443ac53-f762-4fad-b11c-0fd4e98812fb,IBM,2018-07-03T09:50:10,168.52
67f2d372-f918-445e-8dac-7556a2dfd0aa,MRW,2018-07-03T09:50:10,293.2
57372392-53fd-48eb-aa94-c317c75d6545,MSFT,2018-07-03T09:50:10,46.53
c3839c12-be63-416c-a404-8d0333071559,ORCL,2018-07-03T09:50:10,31.57
29eca46c-bd4c-475e-a9c9-7bf105fcc935,SAP,2018-07-03T09:50:10,77.81
89f98ad0-dc34-476f-baa5-fc2fa92aa2d5,SBRY,2018-07-03T09:50:10,239.95
431494f3-1215-48ae-a534-5bf3fbe20f2f,TSCO,2018-07-03T09:50:10,331.12
2203095f-8826-424d-a1e3-fa212194ac35,VOD,2018-07-03T09:50:10,232.05
816ddc9b-f403-4ea9-8d55-c3afd0eae110,BP,2018-07-03T09:50:10,506.4
23c07878-d64d-4d1e-84a4-c14c23357467,MKS,2018-07-03T09:50:10,473.06

kind regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 3 Jul 2018 at 09:11, Fabian Hueske  wrote:

> Hi Mich,
>
> FlinkKafkaConsumer09 is the connector for Kafka 0.9.x.
> Have you tried to use FlinkKafkaConsumer011 instead of
> FlinkKafkaConsumer09?
>
> Best, Fabian
>
>
>
> 2018-07-02 22:57 GMT+02:00 Mich Talebzadeh :
>
>> This is becoming very tedious.
>>
>> As suggested I changed the kafka dependency from
>>
>> ibraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"
>> to
>>
>> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0"
>>
>> and compiled and ran the job again anf failed. This is the log file
>>
>> 2018-07-02 21:38:38,656 INFO
>> org.apache.kafka.common.utils.AppInfoParser   - Kafka
>> version : 1.1.0
>> 2018-07-02 21:38:38,656 INFO
>> org.apache.kafka.common.utils.AppInfoParser   - Kafka
>> commitId : fdcf75ea326b8e07
>> 2018-07-02 21:38:38,696 INFO
>> org.apache.kafka.clients.Metadata - Cluster ID:
>> 3SqEt4DcTruOr_SlQ6fqTQ
>> 2018-07-02 21:38:38,698 INFO
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> Consumer subtask 0 will start reading the following 1 partitions from the
>> committed group offsets in Kafka: [KafkaTopicPartition{topic='md',
>> partition=0}]
>> 2018-07-02 21:38:38,702 INFO
>> org.apache.kafka.clients.consumer.ConsumerConfig  -
>> ConsumerConfig values:
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [rhes75:9092]
>> check.crcs = true
>> client.id =
>> connections.max.idle.ms = 54
>> enable.auto.commit = true
>> exclude.internal.topics = true
>> fetch.max.bytes = 52428800
>> fetch.max.wait.ms = 500
>> fetch.min.bytes = 1
>> group.id = md_streaming
>> heartbeat.interval.ms = 3000
>> interceptor.classes = []
>> internal.leave.group.on.close = true
>> isolation.level = read_uncommitted
>> key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArray

Re: Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.

2018-07-03 Thread Mich Talebzadeh
thanks Hequn and Jorn that helped.

But I am still getting this error for a simple streaming program at
execution!

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.flink.core.fs.FileSystem

object md_streaming
{
  def main(args: Array[String])
  {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "rhes75:9092")
//properties.setProperty("zookeeper.connect", "rhes75:2181")
properties.setProperty("group.id", "md_streaming")
properties.setProperty("auto.offset.reset", "latest")
val stream = env
 .addSource(new FlinkKafkaConsumer09[String]("md", new
SimpleStringSchema(), properties))
 .writeAsText("/tmp/md_streaming.txt",
FileSystem.WriteMode.OVERWRITE)
env.execute("Flink Kafka Example")
  }
}


Completed compiling
Starting execution of program

 The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException:
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at md_streaming$.main(md_streaming.scala:32)
at md_streaming.main(md_streaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary d

Re: Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.

2018-07-03 Thread Mich Talebzadeh
thanks Hequn.

When I use as suggested, I am getting this error

error]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:30:
not found: value FileSystem
[error]  .writeAsText("/tmp/md_streaming.txt",
FileSystem.WriteMode.OVERWRITE)
[error]^
[error] one error found

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 3 Jul 2018 at 03:16, Hequn Cheng  wrote:

> Hi Mich,
>
> It seems the writeMode has not been set correctly. Have you ever tried
>
>> .writeAsText("/tmp/md_streaming.txt", FileSystem.WriteMode.OVERWRITE);
>
>
> On Mon, Jul 2, 2018 at 10:44 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Flink 1.5
>>
>> This streaming data written to a file
>>
>> val stream = env
>>  .addSource(new FlinkKafkaConsumer09[String]("md", new
>> SimpleStringSchema(), properties))
>>  .writeAsText("/tmp/md_streaming.txt")
>>  env.execute("Flink Kafka Example")
>>
>> The error states
>>
>> Caused by: java.io.IOException: File or directory /tmp/md_streaming.txt
>> already exists. Existing files and directories are not overwritten in
>> NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and
>> directories.
>>
>> Is there any append in writeAsText? I tried OVERWRITE but  did not work.
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign

2018-07-02 Thread Mich Talebzadeh
KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:171)
2018-07-02 21:38:38,709 WARN
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher  -
Error while closing Kafka consumer
java.lang.NullPointerException
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:286)
2018-07-02 21:38:38,710 INFO
org.apache.flink.runtime.taskmanager.Task - Source:
Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b)
switched from RUNNING to FAILED.
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)
2018-07-02 21:38:38,713 INFO
org.apache.flink.runtime.taskmanager.Task - Freeing
task resources for Source: Custom Source -> Sink: Unnamed (1/1)
(bcb46879e709768c9160dd11e09ba05b).
2018-07-02 21:38:38,713 INFO
org.apache.flink.runtime.taskmanager.Task - Ensuring
all FileSystem streams are closed for task Source: Custom Source -> Sink:
Unnamed (1/1) (bcb46879e709768c9160d


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 2 Jul 2018 at 20:59, Ted Yu  wrote:

> From flink-connector-kafka-0.11 dependency, we know the version of Kafka
> used is (flink-connectors/flink-connector-kafka-0.11/pom.xml):
>
> 0.11.0.2
> From Kafka side, you specified 1.1.0
>
> I think these versions produce what you experienced.
> If you use Kafka 0.11, this problem should go away.
>
> On Mon, Jul 2, 2018 at 11:24 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> This is the code
>>
>> import java.util.Properties
>> import java.util.Arrays
>> import org.apache.flink.api.common.functions.MapFunction
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.streaming.api.datastream.DataStream
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.flink.streaming.util.serialization.DeserializationSchema
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>> import org.apache.kafka.clients.consumer.ConsumerConfig;
>> import org.apache.kafka.clients.consumer.ConsumerRecord;
>> import org.apache.kafka.clients.consumer.ConsumerRecords;
>> import org.apache.kafka.clients.consumer.KafkaConsumer;
>>
>> object md_streaming
>> {
>>   def main(args: Array[String])
>>   {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", "rhes75:9092")
>> properties.setProperty("zookeeper.connect", "rhes75:2181")
>> properties.setProperty("group.id", "md_streaming")
>> val stream = env
>>  .addSource(new FlinkKafkaConsumer09[String]("md", new
>> SimpleStringSchema(), properties))
>>  .writeAsText("/tmp/md_streaming.txt")
>> env.execute("Flink Kafka Example")
>>   }
>>
>> and this is the sbt dependencies
>>
>> libraryDependencies += "org.apache.flink" %%
>> "flink-connector-kafka-0.11" % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base"
>> % "1.5.0"
>> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
>> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0"
>> libraryDependencies += "org.apache.flink&q

Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign

2018-07-02 Thread Mich Talebzadeh
Hi,

This is the code

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

object md_streaming
{
  def main(args: Array[String])
  {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "rhes75:9092")
properties.setProperty("zookeeper.connect", "rhes75:2181")
properties.setProperty("group.id", "md_streaming")
val stream = env
 .addSource(new FlinkKafkaConsumer09[String]("md", new
SimpleStringSchema(), properties))
 .writeAsText("/tmp/md_streaming.txt")
env.execute("Flink Kafka Example")
  }

and this is the sbt dependencies

libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" %
"1.5.0"
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0"
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" %
"1.5.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"


Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 2 Jul 2018 at 17:45, Ted Yu  wrote:

> Here is the signature of assign :
>
> public void assign(Collection partitions) {
>
> Looks like RestClusterClient was built against one version of Kafka but
> runs against a different version.
>
> Please check the sbt dependency and the version of Kafka jar on the
> classpath.
>
> Thanks
>
> On Mon, Jul 2, 2018 at 9:35 AM, Mich Talebzadeh  > wrote:
>
>> Have you seen this error by any chance in flink streaming with Kafka
>> please?
>>
>> org.apache.flink.client.program.ProgramInvocationException:
>> java.lang.NoSuchMethodError:
>> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>> at md_streaming$.main(md_streaming.scala:30)
>> at md_streaming.main(md_streaming.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
>>  

java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign

2018-07-02 Thread Mich Talebzadeh
Have you seen this error by any chance in flink streaming with Kafka
please?

org.apache.flink.client.program.ProgramInvocationException:
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at md_streaming$.main(md_streaming.scala:30)
at md_streaming.main(md_streaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)


thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.

2018-07-02 Thread Mich Talebzadeh
Flink 1.5

This streaming data written to a file

val stream = env
 .addSource(new FlinkKafkaConsumer09[String]("md", new
SimpleStringSchema(), properties))
 .writeAsText("/tmp/md_streaming.txt")
 env.execute("Flink Kafka Example")

The error states

Caused by: java.io.IOException: File or directory /tmp/md_streaming.txt
already exists. Existing files and directories are not overwritten in
NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and
directories.

Is there any append in writeAsText? I tried OVERWRITE but  did not work.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


run time error java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09

2018-07-02 Thread Mich Talebzadeh
nk.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

Do I need to pass additional classes? I suspect the jar file is not
complete!

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: compiling flink job with maven is failing with error: object flink is not a member of package org.apache

2018-07-01 Thread Mich Talebzadeh
Thanks

Just to do a test I have under root directory md_streaming the following

hduser@rhes75: /home/hduser/dba/bin/flink/md_streaming> ltr
total 24
drwxr-xr-x 3 hduser hadoop 4096 Jun 30 16:10 src
drwxr-xr-x 4 hduser hadoop 4096 Jun 30 16:13 ..
-rw-r--r-- 1 hduser hadoop 7732 Jul  1 22:40 pom.xml
drwxr-xr-x 4 hduser hadoop 4096 Jul  1 22:43 .
drwxr-xr-x 4 hduser hadoop 4096 Jul  2 03:35 target

My source scala file is here

ltr src/main/scala/md_streaming.scala
-rw-r--r-- 1 hduser hadoop 1355 Jul  1 21:30
src/main/scala/md_streaming.scala

and  I have attached my  pom xml

Now when I run mvn package I get no source

hduser@rhes75: /home/hduser/dba/bin/flink/md_streaming> mvn package
[INFO] Scanning for projects...
[INFO]
[INFO]

[INFO] Building md_streaming 1.5.0
[INFO]

[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @
md_streaming ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory
/home/hduser/dba/bin/flink/md_streaming/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @
md_streaming ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources)
@ md_streaming ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory
/home/hduser/dba/bin/flink/md_streaming/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @
md_streaming ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ md_streaming
---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ md_streaming ---
[WARNING] JAR will be empty - no content was marked for inclusion!
[INFO]
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ md_streaming ---
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.7 from the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing
/home/hduser/dba/bin/flink/md_streaming/target/md_streaming-1.5.0.jar with
/home/hduser/dba/bin/flink/md_streaming/target/md_streaming-1.5.0-shaded.jar
[INFO]

[INFO] BUILD SUCCESS
[INFO]

[INFO] Total time: 0.586 s
[INFO] Finished at: 2018-07-02T03:44:32+01:00
[INFO] Final Memory: 26M/962M
[INFO]


So I guess my pom.xml is incorrect!


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 2 Jul 2018 at 03:12, zhangminglei <18717838...@163.com> wrote:

> Hi, Mich
>
> From your jar contents, it seems not correct. There is not md_streaming
> class in your jar. You should put this to your jar file and kafka lib also.
> By the way, you need to use maven-shade-plugin or maven-assembly-plugin to
> help you.
>
> Cheers
> Minglei
>
> 在 2018年7月2日,上午2:18,Mich Talebzadeh  写道:
>
> Hi,
>
> I am still not there.
>
> This is the simple Scala program that I created a jar file for using mvn
>
> import java.util.Properties
> import java.util.Arrays
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.datastream.DataStream
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.streaming.util.serialization.DeserializationSchema
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> object md_streaming
> {
>   def main(args: Array[String]) {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val properties = new Properties()
>//val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes75:9092", "schema.registry.url&quo

Re: compiling flink job with maven is failing with error: object flink is not a member of package org.apache

2018-07-01 Thread Mich Talebzadeh
Hi,

I am still not there.

This is the simple Scala program that I created a jar file for using mvn

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object md_streaming
{
  def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
   //val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes75:9092", "schema.registry.url" -> "http://rhes75:8081;,
"zookeeper.connect" -> "rhes75:2181", "group.id" -> flinkAppName )
properties.setProperty("bootstrap.servers", "rhes75:9092")
properties.setProperty("zookeeper.connect", "rhes75:2181")
properties.setProperty("group.id", "md_streaming")
val stream = env
 .addSource(new FlinkKafkaConsumer09[String]("md", new
SimpleStringSchema(), properties))
 .writeAsText("/tmp/md_streaming.txt")
env.execute("Flink Kafka Example")
  }
}
}

Compiles OK and creates this jar file

jar tvf
/home/hduser/dba/bin/flink/md_streaming/target/md_streaming-1.5.0.jar
   157 Sun Jul 01 18:57:46 BST 2018 META-INF/MANIFEST.MF
 0 Sun Jul 01 18:57:46 BST 2018 META-INF/
 0 Sun Jul 01 18:57:46 BST 2018 META-INF/maven/
 0 Sun Jul 01 18:57:46 BST 2018 META-INF/maven/flink/
 0 Sun Jul 01 18:57:46 BST 2018 META-INF/maven/flink/md_streaming/
  7641 Sun Jul 01 18:57:34 BST 2018
META-INF/maven/flink/md_streaming/pom.xml
   102 Sun Jul 01 18:08:54 BST 2018
META-INF/maven/flink/md_streaming/pom.properties

And I try to run is as below

bin/flink run
/home/hduser/dba/bin/flink/md_streaming/target/md_streaming-1.5.0.jar

 The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: *The program's
entry point class 'md_streaming' was not found in the jar file.*
at
org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:616)
at
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:199)
at
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
at
org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:833)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:201)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: java.lang.ClassNotFoundException: md_streaming
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
at
org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:613)

Any ideas?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 1 Jul 2018 at 18:35, Mich Talebzadeh 
wrote:

> apologies Jorn
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebza

Re: compiling flink job with maven is failing with error: object flink is not a member of package org.apache

2018-07-01 Thread Mich Talebzadeh
Thanks Franke. That did the trick!

[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.7 from the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[DEBUG] Processing JAR
/home/hduser/dba/bin/flink/md_streaming/target/maven-compiler-plugin-1.5.0.jar
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing
/home/hduser/dba/bin/flink/md_streaming/target/maven-compiler-plugin-1.5.0.jar
with
/home/hduser/dba/bin/flink/md_streaming/target/maven-compiler-plugin-1.5.0-shaded.jar
[INFO]

[INFO] BUILD SUCCESS
[INFO]

[INFO] Total time: 12.868 s
[INFO] Finished at: 2018-07-01T17:35:33+01:00
[INFO] Final Memory: 19M/736M
[INFO]

Completed compiling

Regards,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 1 Jul 2018 at 17:26, Jörn Franke  wrote:

> Shouldn’t it be 1.5.0 instead of 1.5?
>
> On 1. Jul 2018, at 18:10, Mich Talebzadeh 
> wrote:
>
> Ok some clumsy work by me not creating the pom.xml file in flink
> sub-directory (it was putting it in spark)!
>
> Anyhow this is the current issue I am facing
>
> [INFO]
> 
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 0.857 s
> [INFO] Finished at: 2018-07-01T17:07:10+01:00
> [INFO] Final Memory: 22M/962M
> [INFO]
> 
> [ERROR] Failed to execute goal on project maven-compiler-plugin: Could not
> resolve dependencies for project flink:maven-compiler-plugin:jar:1.5: The
> following artifacts could not be resolved:
> org.apache.flink:flink-java:jar:1.5, org
> .apache.flink:flink-streaming-java_2.11:jar:1.5: Could not find artifact
> org.apache.flink:flink-java:jar:1.5 in central (
> https://repo.maven.apache.org/maven2) -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
> goal on project maven-compiler-plugin: Could not resolve dependencies for
> project flink:maven-compiler-plugin:jar:1.5: The following artifacts could
> not be re
> solved: org.apache.flink:flink-java:jar:1.5,
> org.apache.flink:flink-streaming-java_2.11:jar:1.5: Could not find artifact
> org.apache.flink:flink-java:jar:1.5 in central (
> https://repo.maven.apache.org/maven2)
>
> FYI I remove ~/.m2 directory to get rid of anything cached!
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 1 Jul 2018 at 15:07, zhangminglei <18717838...@163.com> wrote:
>
>> Hi, Mich
>>
>> [WARNING]  Expected all dependencies to require Scala version: 2.10.4
>> [WARNING]  spark:scala:1.0 requires scala version: 2.11.7
>> [WARNING] Multiple versions of scala libraries detected!
>>
>>
>> I think you should make your scala version to 2.11 first. And try again.
>>
>> Cheers
>> Minglei
>>
>> 在 2018年7月1日,下午9:24,Mich Talebzadeh  写道:
>>
>> Hi Minglei,
>>
>> Many thanks
>>
>> My flink version is 1.5
>>
>> This is the pom.xml from GitHub as suggested
>>
>> 
>> http://maven.apache.org/POM/4.0.0; xmlns:xsi="
>> http://www.w3.org/2001/XMLSchema-instance;
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>> 4.0.0

Re: compiling flink job with maven is failing with error: object flink is not a member of package org.apache

2018-07-01 Thread Mich Talebzadeh
Ok some clumsy work by me not creating the pom.xml file in flink
sub-directory (it was putting it in spark)!

Anyhow this is the current issue I am facing

[INFO]

[INFO] BUILD FAILURE
[INFO]

[INFO] Total time: 0.857 s
[INFO] Finished at: 2018-07-01T17:07:10+01:00
[INFO] Final Memory: 22M/962M
[INFO]

[ERROR] Failed to execute goal on project maven-compiler-plugin: Could not
resolve dependencies for project flink:maven-compiler-plugin:jar:1.5: The
following artifacts could not be resolved:
org.apache.flink:flink-java:jar:1.5, org
.apache.flink:flink-streaming-java_2.11:jar:1.5: Could not find artifact
org.apache.flink:flink-java:jar:1.5 in central (
https://repo.maven.apache.org/maven2) -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
goal on project maven-compiler-plugin: Could not resolve dependencies for
project flink:maven-compiler-plugin:jar:1.5: The following artifacts could
not be re
solved: org.apache.flink:flink-java:jar:1.5,
org.apache.flink:flink-streaming-java_2.11:jar:1.5: Could not find artifact
org.apache.flink:flink-java:jar:1.5 in central (
https://repo.maven.apache.org/maven2)

FYI I remove ~/.m2 directory to get rid of anything cached!

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 1 Jul 2018 at 15:07, zhangminglei <18717838...@163.com> wrote:

> Hi, Mich
>
> [WARNING]  Expected all dependencies to require Scala version: 2.10.4
> [WARNING]  spark:scala:1.0 requires scala version: 2.11.7
> [WARNING] Multiple versions of scala libraries detected!
>
>
> I think you should make your scala version to 2.11 first. And try again.
>
> Cheers
> Minglei
>
> 在 2018年7月1日,下午9:24,Mich Talebzadeh  写道:
>
> Hi Minglei,
>
> Many thanks
>
> My flink version is 1.5
>
> This is the pom.xml from GitHub as suggested
>
> 
> http://maven.apache.org/POM/4.0.0; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance;
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
> 4.0.0
> ${groupId}
> ${artifactId}
> 1.5
> jar
> Flink Quickstart Job
> http://www.myorganization.org
> 
>
> UTF-8
> @project.version@
> 1.8
> 2.11
> 2.11
> 2.11
> 
> 
> 
> apache.snapshots
> Apache Development Snapshot Repository
> 
> https://repository.apache.org/content/repositories/snapshots/
> 
> false
> 
> 
> true
> 
> 
> 
> 
> 
> 
> 
> org.apache.flink
> flink-java
> 1.5
> provided
> 
> 
> org.apache.flink
> flink-streaming-java_2.11
> 1.5
> provided
> 
> 
> 
> 
> 
> 
> org.slf4j
> slf4j-log4j12
> 1.7.7
> runtime
> 
> 
> log4j
> log4j
> 1.2.17
> runtime
> 
> 
> 
> 
> 
> 
> org.apache.maven.plugins
>
> maven-compiler-plugin
> 3.1
> 
>  

Re: compiling flink job with maven is failing with error: object flink is not a member of package org.apache

2018-07-01 Thread Mich Talebzadeh
.apache.flink
flink-java
1.5
compile


org.apache.flink

flink-streaming-java_2.11
1.5
compile







But I am still getting the same errors for input

[INFO] Scanning for projects...
[INFO]
[INFO]

[INFO] Building scala 1.0
[INFO]

[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ scala
---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory
/home/hduser/dba/bin/flink/md_streaming/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ scala ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-scala-plugin:2.15.2:compile (default) @ scala ---
[INFO] Checking for multiple versions of scala
[WARNING]  Expected all dependencies to require Scala version: 2.10.4
[WARNING]  spark:scala:1.0 requires scala version: 2.11.7
[WARNING] Multiple versions of scala libraries detected!
[INFO] includes = [**/*.java,**/*.scala,]
[INFO] excludes = []
[INFO] /home/hduser/dba/bin/flink/md_streaming/src/main/scala:-1: info:
compiling
[INFO] Compiling 1 source files to
/home/hduser/dba/bin/flink/md_streaming/target/classes at 1530451461171
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:3:
error: object flink is not a member of package org.apache
[INFO] import org.apache.flink.api.common.functions.MapFunction
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:4:
error: object flink is not a member of package org.apache
[INFO] import org.apache.flink.api.java.utils.ParameterTool
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:5:
error: object flink is not a member of package org.apache
[INFO] import org.apache.flink.streaming.api.datastream.DataStream
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:6:
error: object flink is not a member of package org.apache
[INFO] import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:7:
error: object flink is not a member of package org.apache
[INFO] import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:8:
error: object flink is not a member of package org.apache
[INFO] import
org.apache.flink.streaming.util.serialization.SimpleStringSchema
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:9:
error: object flink is not a member of package org.apache
[INFO] import
org.apache.flink.streaming.util.serialization.DeserializationSchema
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:10:
error: object flink is not a member of package org.apache
[INFO] import
org.apache.flink.streaming.util.serialization.SimpleStringSchema
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:18:
error: not found: value StreamExecutionEnvironment
[INFO] val env = StreamExecutionEnvironment.getExecutionEnvironment
[INFO]   ^
[ERROR] 9 errors found
[INFO]

[INFO] BUILD FAILURE


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 1 Jul 2018 at 14:07, zhangminglei <18717838...@163.com> wrote:

> Hi, Mich.
>
> Is there a basic MVN pom file for flink? The default one from GitHub does
> not seem to be working!
>
>
> Please take a look on
> https://github.com/apache/flink/blob/master/flink-quickstart/flink-quickstart-java/src/main/resources

compiling flink job with maven is failing with error: object flink is not a member of package org.apache

2018-07-01 Thread Mich Talebzadeh
I have done many times with sbt or maven for spark streaming.

Trying to compile a simple program that compiles ok in flink-scala.sh

The imports are as follows
import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

with adding to classpath the following jars it compiles

flink-connector-kafka-0.9_2.11-1.5.0.jar
flink-connector-kafka-base_2.11-1.5.0.jar

I guess my pom.xml is incorrect.

I have added these two dependencies to the pom.xml file


  org.apache.flink
  flink-streaming-java_2.11
  1.4.2
  provided


org.apache.flink
flink-core
1.5.0


However, I am getting these basic errors!

[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:4:
error: object flink is not a member of package org.apache
[INFO] import org.apache.flink.api.java.utils.ParameterTool
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:5:
error: object flink is not a member of package org.apache
[INFO] import org.apache.flink.streaming.api.datastream.DataStream
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:6:
error: object flink is not a member of package org.apache
[INFO] import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:7:
error: object flink is not a member of package org.apache
[INFO] import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:8:
error: object flink is not a member of package org.apache
[INFO] import
org.apache.flink.streaming.util.serialization.SimpleStringSchema
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:9:
error: object flink is not a member of package org.apache
[INFO] import
org.apache.flink.streaming.util.serialization.DeserializationSchema
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:10:
error: object flink is not a member of package org.apache
[INFO] import
org.apache.flink.streaming.util.serialization.SimpleStringSchema
[INFO]   ^
[ERROR]
/home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:18:
error: not found: value StreamExecutionEnvironment
[INFO] val env = StreamExecutionEnvironment.getExecutionEnvironment

Is there a basic MVN pom file for flink? The default one from GitHub does
not seem to be working!

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: error: object connectors is not a member of package org.apache.flink.streaming

2018-06-30 Thread Mich Talebzadeh
Thanks Rong

This worked.

$FLINK_HOME/bin/start-scala-shell.sh local --addclasspath
/home/hduser/jars/flink-connector-kafka-0.9_2.11-1.5.0.jar:/home/hduser/jars/flink-connector-kafka-base_2.11-1.5.0.jar

Regards,


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 30 Jun 2018 at 17:00, Rong Rong  wrote:

> Hi Mich,
>
> Ted is correct, Flink release binary does not include any connectors and
> you will have to include the appropriate connector version. This is to
> avoid dependency conflicts between different Kafka releases.
>
> You probably need the specific Kafka connector version jar file as well,
> so in your case since you are using the scala shell. The following command
> should work:
> ./bin/start-scala-shell.sh --addclasspath
> ": base_2.11>"
>
> --
> Rong
>
> On Sat, Jun 30, 2018 at 1:11 AM Ted Yu  wrote:
>
>> Please add flink-connector-kafka-base_2.11 jar to the classpath.
>>
>> On Sat, Jun 30, 2018 at 1:06 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>>
>>> Great Ted added that jar file to the classpath
>>>
>>> Running this code
>>>
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
>>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>>> import java.util.Properties
>>> object Main {
>>>   def main(args: Array[String]) {
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> val properties = new Properties()
>>> properties.setProperty("bootstrap.servers", "localhost:9092")
>>> properties.setProperty("zookeeper.connect", "localhost:2181")
>>> properties.setProperty("group.id", "test")
>>> val stream = env
>>>   .addSource(new FlinkKafkaConsumer082[String]("md", new
>>> SimpleStringSchema(), properties))
>>>   .print
>>> env.execute("Flink Kafka Example")
>>>   }
>>> }
>>>
>>> I am getting this error now
>>>
>>> :77: error: Class
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase not
>>> found - continuing with a stub.
>>>  .addSource(new FlinkKafkaConsumer082[String]("md", new
>>> SimpleStringSchema(), properties))
>>>   ^
>>> :77: error: overloaded method value addSource with alternatives:
>>>   [T](function:
>>> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T]
>>> => Unit)(implicit evidence$10:
>>> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
>>> 
>>>   [T](function:
>>> org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
>>> evidence$9:
>>> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
>>>  cannot be applied to
>>> (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String])
>>>  .addSource(new FlinkKafkaConsumer082[String]("md", new
>>> SimpleStringSchema(), properties))
>>>
>>> any ideas please?
>>>
>>> Regards,
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>&g

Displaying topic data with Flink streaming

2018-06-30 Thread Mich Talebzadeh
Hi,

I have a streaming topic called "md" that displays test market data.

I have written a simple program to stream data in via kafka into flinl.

Flink version 1.5
Kafka version 2.12

This is the sample program in scala that compiles ok in start-scala-shell.sh

import java.util.Properties
import java.util.Arrays
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.DeserializationSchema
//import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object Main {
  def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "sampleScala")
val stream = env
 .addSource(new FlinkKafkaConsumer09[String]("md", new
SimpleStringSchema(), properties))
 .print()
env.execute("Flink Kafka Example")
  }
}

warning: there was one deprecation warning; re-run with -deprecation for
details
defined object Main

But I do not see any streaming output.

A naïve question. How do I execute the above compiled object in this shell?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


error: object connectors is not a member of package org.apache.flink.streaming

2018-06-29 Thread Mich Talebzadeh
I am following this Flink Kafka example

https://stackoverflow.com/questions/31446374/can-anyone-share-a-flink-kafka-example-in-scala

This is my edited program. I am using Flink 1.5 in flink-scala shell

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import java.util.Properties

But I am getting this error

scala> import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala._

scala> import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
:76: error: object connectors is not a member of package
org.apache.flink.streaming
   import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082

any reason I am getting this error? Are the jar files missing? Cab one add
jar files as parameters to* start-scala-shell.sh local*

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: First try running I am getting org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.

2018-06-29 Thread Mich Talebzadeh
As it turned out in the application log, it could mot find yarn
configuraration! not anything to do with port

Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.conf.YarnConfiguration.

I had installed flink *w**ithout bundled Hadoop* and my version of Hadoop
is 3.1

I went back and downloaded flink built with Hadoop 2.8 and that worked

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 29 Jun 2018 at 14:57, Hequn Cheng  wrote:

> port should be consistent.
>
> 1> nc -l 2219
> 2>./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2219
>
> On Fri, Jun 29, 2018 at 9:21 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> thanks Hequn.
>>
>> This the port I started with
>>
>> hduser@rhes75: /data6/hduser/flink-1.5.0> nc -l 2219
>> hello
>>
>> and as I expected I should collect from port 2219? However, I did what
>> you suggested
>>
>> ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2199
>> Starting execution of program
>> 
>>  The program finished with the following exception:
>> org.apache.flink.client.program.ProgramInvocationException: Could not
>> retrieve the execution result.
>>
>> unfortunately I am getting the same error.
>>
>> Cheers
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 29 Jun 2018 at 14:11, Hequn Cheng  wrote:
>>
>>> Hi Mich,
>>>
>>> You port is not matching. Start netcat with "nc -l 2219 &", but run
>>> flink job with "--port 2199".
>>>
>>>
>>>
>>>
>>> On Fri, Jun 29, 2018 at 8:40 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have installed flink 1.5 in standalone mode and trying a basic run as
>>>> per this example
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html
>>>>
>>>> started netcat on port 2199
>>>>
>>>>  nc -l 2219 &
>>>>
>>>> Run the example
>>>>
>>>> ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2199
>>>> Starting execution of program
>>>> 
>>>>  The program finished with the following exception:
>>>> org.apache.flink.client.program.ProgramInvocationException: Could not
>>>> retrieve the execution result.
>>>> at
>>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
>>>> at
>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>>>> at
>>>> org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> 

Re: First try running I am getting org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.

2018-06-29 Thread Mich Talebzadeh
thanks Hequn.

This the port I started with

hduser@rhes75: /data6/hduser/flink-1.5.0> nc -l 2219
hello

and as I expected I should collect from port 2219? However, I did what you
suggested

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2199
Starting execution of program

 The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result.

unfortunately I am getting the same error.

Cheers



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 29 Jun 2018 at 14:11, Hequn Cheng  wrote:

> Hi Mich,
>
> You port is not matching. Start netcat with "nc -l 2219 &", but run flink
> job with "--port 2199".
>
>
>
>
> On Fri, Jun 29, 2018 at 8:40 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> I have installed flink 1.5 in standalone mode and trying a basic run as
>> per this example
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html
>>
>> started netcat on port 2199
>>
>>  nc -l 2219 &
>>
>> Run the example
>>
>> ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2199
>> Starting execution of program
>> 
>>  The program finished with the following exception:
>> org.apache.flink.client.program.ProgramInvocationException: Could not
>> retrieve the execution result.
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>> at
>> org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
>> Caused by: org.apache.flink.runtime.client.JobSubmissionException:
>> Failed to submit JobGraph.
>>
>> Appreciate any suggestions.
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


First try running I am getting org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.

2018-06-29 Thread Mich Talebzadeh
Hi,

I have installed flink 1.5 in standalone mode and trying a basic run as per
this example

https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html

started netcat on port 2199

 nc -l 2219 &

Run the example

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2199
Starting execution of program

 The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result.
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.

Appreciate any suggestions.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Testing Kafka interface using Flink interactive shell

2016-04-18 Thread Mich Talebzadeh
Thanks Chiwan. It worked.

Now I have this simple streaming program in Spark Scala that gets streaming
data via Kafka. It is pretty simple. Please see attached.

I am trying to make it work with Flink + Kafka

Any hints will be appreciated.

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 18 April 2016 at 02:43, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Mich,
>
> You can add external dependencies to Scala shell using `--addclasspath`
> option. There is more detail description in documentation [1].
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/scala_shell.html#adding-external-dependencies
>
> Regards,
> Chiwan Park
>
> > On Apr 17, 2016, at 6:04 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > IN Spark shell I can load Kafka jar file through spark-shell option --jar
> >
> > spark-shell --master spark://50.140.197.217:7077 --jars
> ,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar
> >
> > This works fine.
> >
> > In Flink I have added the jar file
> /home/hduser/jars/flink-connector-kafka-0.10.1.jar to the CLASSPATH.
> >
> > However I don't get any support for it within flink shell
> >
> > Scala-Flink> import org.apache.flink.streaming.connectors.kafka
> > :54: error: object connectors is not a member of package
> org.apache.flink.streaming
> >     import org.apache.flink.streaming.connectors.kafka
> >
> >
> > Any ideas will be appreciated
> >   ^
> >
> > Dr Mich Talebzadeh
> >
> > LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> > http://talebzadehmich.wordpress.com
> >
>
>
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
object TestStream_assembly {
  def main(args: Array[String]) {
  val conf = new SparkConf().
   setAppName("TestStream_assembly").
   setMaster("local[2]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val ssc = new StreamingContext(conf, Seconds(55))

val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", 
"schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect" -> 
"rhes564:2181", "group.id" -> "StreamTest" )
val topic = Set("newtopic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topic)
messages.cache()
//
// Get the lines
//
val lines = messages.map(_._2)
// Check for message
val showResults = lines.filter(_.contains("Sending messages")).flatMap(line => 
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}


Testing Kafka interface using Flink interactive shell

2016-04-17 Thread Mich Talebzadeh
Hi,

IN Spark shell I can load Kafka jar file through spark-shell option --jar

spark-shell --master spark://50.140.197.217:7077 --jars
,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar

This works fine.

In Flink I have added the jar file
/home/hduser/jars/flink-connector-kafka-0.10.1.jar to the CLASSPATH.

However I don't get any support for it within flink shell

Scala-Flink> import org.apache.flink.streaming.connectors.kafka
:54: error: object connectors is not a member of package
org.apache.flink.streaming
import org.apache.flink.streaming.connectors.kafka


Any ideas will be appreciated
  ^

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


Re: Flink support for Scala

2016-04-16 Thread Mich Talebzadeh
Hi,

I just found out it does

Scala-Flink>

thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 16 April 2016 at 17:22, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> Hi,
>
> Just downloaded Flink and coming from Spark and Hive background.
>
> My interest for Flink is its support for Complex Event Processing (CEP)
> libraries.
>
> Couple of questions if I may
>
> 1) is there a quick start guide for Splink (I am not referring to a simple
> example that comes with quick start guide
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/setup_quickstart.html
> 2) Does Flink support Scala language or what is its default programming
> language if any.
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Flink support for Scala

2016-04-16 Thread Mich Talebzadeh
Hi,

Just downloaded Flink and coming from Spark and Hive background.

My interest for Flink is its support for Complex Event Processing (CEP)
libraries.

Couple of questions if I may

1) is there a quick start guide for Splink (I am not referring to a simple
example that comes with quick start guide


https://ci.apache.org/projects/flink/flink-docs-release-0.8/setup_quickstart.html
2) Does Flink support Scala language or what is its default programming
language if any.

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com