Re: Getting compilation error in Array[TypeInformation]
Thanks those suggestions helped Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 9 Aug 2018 at 16:41, Timo Walther wrote: > Hi Mich, > > I strongly recommend to read a good Scala programming tutorial before > writing on a mailing list. > > As the error indicates you are missing generic parameters. If you don't > know the parameter use `Array[TypeInformation[_]]` or `TableSink[_]`. For > the types class you need to import the types class > "org.apache.flink.table.api.Types". > > Regards, > Timo > > > Am 09.08.18 um 17:18 schrieb Mich Talebzadeh: > > This is the code in Scala > > val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) > tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, > 'timeissued, 'price) > val result = tableEnv.scan("priceTable").filter('ticker === "VOD" && > 'price > 99.0).select('key, 'ticker, 'timeissued, 'price) > > val fieldNames: Array[String] = Array("key", "ticker", "timeissued", > "price") > val fieldTypes: Array[TypeInformation] = Array(Types.STRING, > Types.STRING, Types.STRING, Types.Float) > val sink: TableSink = new CsvTableSink(writeDirectory+fileName, > fieldDelim = ",") > tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, > sink) > result.insertInto("CsvSinkTable") > > When compiling I get the following error > > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: > class TypeInformation takes type parameters > [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING, > Types.STRING, Types.STRING, Types.Float) > [error] ^ > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: > not found: value Types > [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING, > Types.STRING, Types.STRING, Types.Float) > [error]^ > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: > not found: value Types > [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING, > Types.STRING, Types.STRING, Types.Float) > [error] ^ > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: > not found: value Types > [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING, > Types.STRING, Types.STRING, Types.Float) > [error] > ^ > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: > not found: value Types > [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING, > Types.STRING, Types.STRING, Types.Float) > [error] > ^ > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:172: > trait TableSink takes type parameters > [error] val sink: TableSink = new > CsvTableSink(writeDirectory+fileName, fieldDelim = ",") > [error] ^ > [error] 6 errors found > > May be I am not importing the correct dependencies. > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > >
Getting compilation error in Array[TypeInformation]
This is the code in Scala val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price) val result = tableEnv.scan("priceTable").filter('ticker === "VOD" && 'price > 99.0).select('key, 'ticker, 'timeissued, 'price) val fieldNames: Array[String] = Array("key", "ticker", "timeissued", "price") val fieldTypes: Array[TypeInformation] = Array(Types.STRING, Types.STRING, Types.STRING, Types.Float) val sink: TableSink = new CsvTableSink(writeDirectory+fileName, fieldDelim = ",") tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink) result.insertInto("CsvSinkTable") When compiling I get the following error [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: class TypeInformation takes type parameters [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING, Types.STRING, Types.STRING, Types.Float) [error] ^ [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: not found: value Types [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING, Types.STRING, Types.STRING, Types.Float) [error]^ [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: not found: value Types [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING, Types.STRING, Types.STRING, Types.Float) [error] ^ [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: not found: value Types [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING, Types.STRING, Types.STRING, Types.Float) [error] ^ [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:171: not found: value Types [error] val fieldTypes: Array[TypeInformation] = Array(Types.STRING, Types.STRING, Types.STRING, Types.Float) [error] ^ [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:172: trait TableSink takes type parameters [error] val sink: TableSink = new CsvTableSink(writeDirectory+fileName, fieldDelim = ",") [error] ^ [error] 6 errors found May be I am not importing the correct dependencies. Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Working out through individual messages in Flink
Hi Jorn, Thanks I uploaded the Scala code to my GitHub --> md_streaming.scala https://github.com/michTalebzadeh/Flink Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 7 Aug 2018 at 23:29, Jörn Franke wrote: > Hi Mich, > > Would it be possible to share the full source code ? > I am missing a call to streamExecEnvironment.execute > > Best regards > > On 8. Aug 2018, at 00:02, Mich Talebzadeh > wrote: > > Hi Fabian, > > Reading your notes above I have converted the table back to DataStream. > > val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) > tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, > 'timeissued, 'price) > >val key = > tableEnv.scan("priceTable").select('key).toDataStream[Row] >val ticker = > tableEnv.scan("priceTable").select('ticker).toDataStream[Row] >val timeissued = > tableEnv.scan("priceTable").select('timeissued).toDataStream[Row] >val price = > tableEnv.scan("priceTable").select('price).toDataStream[Row] > > My intension is to create an Hbase sink as follows: > > // Save prices to Hbase table > var p = new Put(new String(key).getBytes()) > p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(), > new String(ticker).getBytes()) > p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new > String(timeissued).getBytes()) > p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(), > new String(priceToString).getBytes()) > p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), > new String(CURRENCY).getBytes()) > p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), > new String(1.toString).getBytes()) > p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), > new String(System.currentTimeMillis.toString).getBytes()) > HbaseTable.put(p) > HbaseTable.flushCommits() > > However, I don't seem to be able to get the correct values for the columns! > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 30 Jul 2018 at 09:58, Fabian Hueske wrote: > >> A *Table*Source [1], is a special input connector for Flink's relational >> APIs (Table API and SQL) [2]. >> You can transform and filter with these APIs as well (it's probably even >> easier). In SQL this would be the SELECT and WHERE clauses of a query. >> >> However, there is no *Table*Sink for HBase and you would need to convert >> the Table back to a DataStream [3]. >> That's not very difficult since the APIs are integrated with each other. >> >> Best, Fabian >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset >> >> 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh : >> >>> Thanks Fabian. That was very useful. >>> >>> How about an operation like below? >>> >>> // create builder >>> val KafkaTableSource = Kafka011JsonTableSource.builder() >>> // set Kafka topic >>> .forTopic(topicsValue) >>> // set Kafka consumer properties >>> .w
Re: Working out through individual messages in Flink
Hi Fabian, Reading your notes above I have converted the table back to DataStream. val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price) val key = tableEnv.scan("priceTable").select('key).toDataStream[Row] val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row] val timeissued = tableEnv.scan("priceTable").select('timeissued).toDataStream[Row] val price = tableEnv.scan("priceTable").select('price).toDataStream[Row] My intension is to create an Hbase sink as follows: // Save prices to Hbase table var p = new Put(new String(key).getBytes()) p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(), new String(ticker).getBytes()) p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new String(timeissued).getBytes()) p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(), new String(priceToString).getBytes()) p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), new String(CURRENCY).getBytes()) p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), new String(1.toString).getBytes()) p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), new String(System.currentTimeMillis.toString).getBytes()) HbaseTable.put(p) HbaseTable.flushCommits() However, I don't seem to be able to get the correct values for the columns! Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 30 Jul 2018 at 09:58, Fabian Hueske wrote: > A *Table*Source [1], is a special input connector for Flink's relational > APIs (Table API and SQL) [2]. > You can transform and filter with these APIs as well (it's probably even > easier). In SQL this would be the SELECT and WHERE clauses of a query. > > However, there is no *Table*Sink for HBase and you would need to convert > the Table back to a DataStream [3]. > That's not very difficult since the APIs are integrated with each other. > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset > > 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh : > >> Thanks Fabian. That was very useful. >> >> How about an operation like below? >> >> // create builder >> val KafkaTableSource = Kafka011JsonTableSource.builder() >> // set Kafka topic >> .forTopic(topicsValue) >> // set Kafka consumer properties >> .withKafkaProperties(properties) >> // set Table schema >> .withSchema(TableSchema.builder() >> .field("key", Types.STRING) >> .field("ticker", Types.STRING) >> .field("timeissued", Types.STRING) >> .field("price", Types.FLOAT) >> .build()) >> >> Will that be OK? >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske wrote: >> >>> Hi, >>> >>> Flink processes streams record by record, instead of micro-batchin
Re: Passing the individual table coilumn values to the local variables
I need this operation to stored filtered rows in an Hbase table. I can access an existing Hbase table through flink API My challenge is to put rows into Hbase table. Something like below and I don't seem to be able to extract individual column values from priceTable *val key = tableEnv.scan("priceTable").select('key).toDataStream[Row].print() val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row].print() val timeissued = tableEnv.scan("priceTable").select('timeissued).toDataStream[Row].print() val price = tableEnv.scan("priceTable").select('price).toDataStream[Row].print()* val CURRENCY = "GBP" val op_type = "1" val op_time = System.currentTimeMillis.toString /* if (price > 99.0) { // Save prices to Hbase table var p = new Put(new String(key).getBytes()) p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(), new String(ticker).getBytes()) p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new String(timeissued).getBytes()) p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(), new String(priceToString).getBytes()) p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), new String(CURRENCY).getBytes()) p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), new String(1.toString).getBytes()) p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), new String(System.currentTimeMillis.toString).getBytes()) HbaseTable.put(p) HbaseTable.flushCommits() if(tableEnv.scan("priceTable").filter('ticker == "VOD" && 'price > 99.0)) { sqltext = Calendar.getInstance.getTime.toString + ", Price on "+ticker+" hit " +price.toString //java.awt.Toolkit.getDefaultToolkit().beep() println(sqltext) } } Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 7 Aug 2018 at 17:07, Mich Talebzadeh wrote: > Hi, > > The following works fine > >tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, > 'timeissued, 'price) > val result = tableEnv.scan("priceTable").filter('ticker === "VOD" && > 'price > 99.0).select('key, 'ticker, 'timeissued, 'price) > val r = result.toDataStream[Row] > r.print() > > Now I would like to get the individual column values from priceTable into > local variables > > This does not seem to work > > val key = tableEnv.scan("priceTable").select('key).toDataStream[Row] > val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row] > val timeissued = > tableEnv.scan("priceTable").select('timeissued).toDataStream[Row] > val price = tableEnv.scan("priceTable").select('price).toDataStream[Row] > > What alternatives are there? > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >
Passing the individual table coilumn values to the local variables
Hi, The following works fine tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price) val result = tableEnv.scan("priceTable").filter('ticker === "VOD" && 'price > 99.0).select('key, 'ticker, 'timeissued, 'price) val r = result.toDataStream[Row] r.print() Now I would like to get the individual column values from priceTable into local variables This does not seem to work val key = tableEnv.scan("priceTable").select('key).toDataStream[Row] val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row] val timeissued = tableEnv.scan("priceTable").select('timeissued).toDataStream[Row] val price = tableEnv.scan("priceTable").select('price).toDataStream[Row] What alternatives are there? Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Increase parallelism according to the number of tasks
This worked streamExecEnv.setParallelism(2) Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 7 Aug 2018 at 08:48, Mich Talebzadeh wrote: > Hi, > > In my test environment I have two task managers. > > I would like to increase the parallelism to 2 from default of 1. Can it be > done through properties > >properties.setProperty("parallelism", "2") > > Although that does not change anything. > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >
Increase parallelism according to the number of tasks
Hi, In my test environment I have two task managers. I would like to increase the parallelism to 2 from default of 1. Can it be done through properties properties.setProperty("parallelism", "2") Although that does not change anything. Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Converting a DataStream into a Table throws error
Ok gents thanks for clarification. Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 7 Aug 2018 at 02:21, Hequn Cheng wrote: > Hi Mich, > > I think this is the behavior of the compiler. When run your job in local, > you have to remove the provided or add jar to the lib path. But if run on > cluster, you have to add the provided to ignore flink classes, since these > classes are already exist in your installation version. > > Best, Hequn > > On Tue, Aug 7, 2018 at 1:34 AM, Mich Talebzadeh > wrote: > >> Thanks Fabian, >> >> I looked at the maven and this is what it says *provided* >> >> >> [image: image.png] >> However, this jar file is not shipped with Flink? Is this deliberate? >> >> Thanks >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Mon, 6 Aug 2018 at 18:14, Fabian Hueske wrote: >> >>> The problem is that you declared it as provided. >>> This means the build tool assumes it will be there and therefore does >>> not include it in the Jar file. >>> By adding it to the lib folder you are providing the dependency. >>> >>> Best, Fabian >>> >>> 2018-08-06 18:58 GMT+02:00 Mich Talebzadeh : >>> >>>> Hi, >>>> >>>> I resolved this issue of >>>> >>>> java.lang.NoClassDefFoundError: org/apache/flink/table/api/ >>>> TableEnvironment >>>> >>>> By adding the jar file >>>> >>>> flink-table_2.11-1.5.0.jar >>>> >>>> To $FLINK_HOME/lib >>>> >>>> It compiles and run OK now. >>>> >>>> Rather strange as I had this dependency in my SBT >>>> >>>> libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % >>>> "provided" >>>> >>>> >>>> HTH >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Thu, 2 Aug 2018 at 08:26, Mich Talebzadeh >>>> wrote: >>>> >>>>> Thanks everyone for the advice >>>>> >>>>> This worked and passed the compilation error >>>>> >>>>> import org.apache.flink.table.api.TableEnvironment >>>>> import org.apache.flink.table.api.scala._ >>>>> import org.apache.flink.api.scala._ >>>>> >>>>> …. >>>>> >>>>> val dataStream = streamExecEnv >>>>>.addSource(new FlinkKafkaConsumer011[String](t
Running SQL to print to Std Out
Hi, This is the streaming program I have for trade prices following the doc for result set for tables https://flink.apache.org/news/2017/03/29/table-sql-api-update.html val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment val ds = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) val splitStream = ds.map(new MapFunction[String, Tuple4[String, String, String, Float]] { override def map(value: String): Tuple4[String, String, String, Float] = { var cols = value.split(',') return (cols(0).toString, cols(1).toString, cols(2).toString, cols(3).toFloat) } }) val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price) val result = tableEnv.scan("priceTable").filter('ticker.isNotNull).select('key, 'ticker, 'timeissued, 'price) val r = result.toDataStream[Row] r.print() This compiles and runs but I do not see any ouput to screen. This is the output from Flink GUI [image: image.png] I can verify that data being streamed in so there is no issue there. However, I don't see any output and Flink GUI does not look healthy (circles). Appreciate any input. Thanks, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Converting a DataStream into a Table throws error
Thanks Fabian, I looked at the maven and this is what it says *provided* [image: image.png] However, this jar file is not shipped with Flink? Is this deliberate? Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 6 Aug 2018 at 18:14, Fabian Hueske wrote: > The problem is that you declared it as provided. > This means the build tool assumes it will be there and therefore does not > include it in the Jar file. > By adding it to the lib folder you are providing the dependency. > > Best, Fabian > > 2018-08-06 18:58 GMT+02:00 Mich Talebzadeh : > >> Hi, >> >> I resolved this issue of >> >> java.lang.NoClassDefFoundError: org/apache/flink/table/api/ >> TableEnvironment >> >> By adding the jar file >> >> flink-table_2.11-1.5.0.jar >> >> To $FLINK_HOME/lib >> >> It compiles and run OK now. >> >> Rather strange as I had this dependency in my SBT >> >> libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % >> "provided" >> >> >> HTH >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Thu, 2 Aug 2018 at 08:26, Mich Talebzadeh >> wrote: >> >>> Thanks everyone for the advice >>> >>> This worked and passed the compilation error >>> >>> import org.apache.flink.table.api.TableEnvironment >>> import org.apache.flink.table.api.scala._ >>> import org.apache.flink.api.scala._ >>> >>> …. >>> >>> val dataStream = streamExecEnv >>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new >>> SimpleStringSchema(), properties)) >>>val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) >>> tableEnv.registerDataStream("priceTable", dataStream, "key, ticker, >>> timeissued, price") >>> sqltext = "SELECT key from priceTable"; >>> val result = tableEnv.sql(sqltext); >>> >>> Now I get this runtime error >>> >>> …. >>> [success] Total time: 16 s, completed Aug 2, 2018 8:23:12 AM >>> Completed compiling >>> Thu Aug 2 08:23:12 BST 2018 , Running in **Standalone mode** >>> Starting execution of program >>> java.lang.NoClassDefFoundError: >>> org/apache/flink/table/api/TableEnvironment$ >>> at md_streaming$.main(md_streaming.scala:140) >>> at md_streaming.main(md_streaming.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) >>> at >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) >>>
Re: Converting a DataStream into a Table throws error
Hi, I resolved this issue of java.lang.NoClassDefFoundError: org/apache/flink/table/api/TableEnvironment By adding the jar file flink-table_2.11-1.5.0.jar To $FLINK_HOME/lib It compiles and run OK now. Rather strange as I had this dependency in my SBT libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % "provided" HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 2 Aug 2018 at 08:26, Mich Talebzadeh wrote: > Thanks everyone for the advice > > This worked and passed the compilation error > > import org.apache.flink.table.api.TableEnvironment > import org.apache.flink.table.api.scala._ > import org.apache.flink.api.scala._ > > …. > > val dataStream = streamExecEnv >.addSource(new FlinkKafkaConsumer011[String](topicsValue, new > SimpleStringSchema(), properties)) >val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) > tableEnv.registerDataStream("priceTable", dataStream, "key, ticker, > timeissued, price") > sqltext = "SELECT key from priceTable"; > val result = tableEnv.sql(sqltext); > > Now I get this runtime error > > …. > [success] Total time: 16 s, completed Aug 2, 2018 8:23:12 AM > Completed compiling > Thu Aug 2 08:23:12 BST 2018 , Running in **Standalone mode** > Starting execution of program > java.lang.NoClassDefFoundError: > org/apache/flink/table/api/TableEnvironment$ > at md_streaming$.main(md_streaming.scala:140) > at md_streaming.main(md_streaming.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.table.api.TableEnvironment$ > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 2 Aug 2018 at 03:07, vino yang wrote: > >> Hi Mich, >> >> It seems that the type of your DataStream stream is always wrong. >> If you want to s
Re: Converting a DataStream into a Table throws error
Apologies should read Vino and Timo Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 3 Aug 2018 at 09:14, Mich Talebzadeh wrote: > Thanks a lot Timo. > > I will try the changes suggested. > > Appreciated > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 3 Aug 2018 at 03:56, vino yang wrote: > >> Hi Mich, >> >> I have reviewed your code in the github you provided. >> >> I copied your code to org.apache.flink.table.examples.scala under >> flink-examples-table. It passed the compilation and didn't report the >> exception you provided, although there are other exceptions (it's about >> hdfs, this is because of my environment). >> First of all, the necessary dependencies are just a few: >> >> ** >> *org.apache.flink* >> *flink-table_${scala.binary.version}* >> *${project.version}* >> ** >> >> ** >> *org.apache.flink* >> *flink-streaming-scala_${scala.binary.version}* >> *${project.version}* >> ** >> >> ** >> *org.apache.flink* >> >> *flink-connector-kafka-0.11_${scala.binary.version}* >> *${project.version}* >> ** >> >> Please remove irrelevant dependencies. >> >> In addition, your StreamExecutionEnvironment object has a problem with >> its reference. The correct path should be: >> Import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >> >> In addition, you specify four fields, but your stream object is >> DataStream[String]. This is also a mistake. You are advised to parse it >> first with a MapFunction. The example is as follows: >> >> *Val dataStream = streamExecEnv* >> * .addSource(new FlinkKafkaConsumer011[String](topicsValue, new >> SimpleStringSchema(), properties))* >> >> *Val newDateStream = dataStream.map(new MapFunction[String, >> Tuple4[String, String, String, String]] {* >> * Override def map(value: String): Tuple4[String, String, String, >> String] = {* >> * Return null* >> * }* >> *})* >> >> *Val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)* >> *//tableEnv.registerDataStream("table1", streamExecEnv, 'key, >> 'ticker, 'timeissued, 'price)* >> *tableEnv.registerDataStream("priceTable", newDateStream, 'key, >> 'ticker, 'timeissued, 'price)* >> >> Thanks, vino. >> >> 2018-08-02 20:36 GMT+08:00 Mich Talebzadeh : >> >>> Appreciate if anyone had a chance to look at the Scala code in GitHub >>> and advise >>> >>> https://github.com/michTalebzadeh/Flink >>> >>> Regards, >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Thu, 2 Aug 2018 at 09:06, Mich Talebzadeh >>> wrote: >>>
Re: Converting a DataStream into a Table throws error
Thanks a lot Timo. I will try the changes suggested. Appreciated Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 3 Aug 2018 at 03:56, vino yang wrote: > Hi Mich, > > I have reviewed your code in the github you provided. > > I copied your code to org.apache.flink.table.examples.scala under > flink-examples-table. It passed the compilation and didn't report the > exception you provided, although there are other exceptions (it's about > hdfs, this is because of my environment). > First of all, the necessary dependencies are just a few: > > ** > *org.apache.flink* > *flink-table_${scala.binary.version}* > *${project.version}* > ** > > ** > *org.apache.flink* > *flink-streaming-scala_${scala.binary.version}* > *${project.version}* > ** > > ** > *org.apache.flink* > > *flink-connector-kafka-0.11_${scala.binary.version}* > *${project.version}* > ** > > Please remove irrelevant dependencies. > > In addition, your StreamExecutionEnvironment object has a problem with its > reference. The correct path should be: > Import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > > In addition, you specify four fields, but your stream object is > DataStream[String]. This is also a mistake. You are advised to parse it > first with a MapFunction. The example is as follows: > > *Val dataStream = streamExecEnv* > * .addSource(new FlinkKafkaConsumer011[String](topicsValue, new > SimpleStringSchema(), properties))* > > *Val newDateStream = dataStream.map(new MapFunction[String, > Tuple4[String, String, String, String]] {* > * Override def map(value: String): Tuple4[String, String, String, > String] = {* > *Return null* > * }* > *})* > > *Val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)* > *//tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, > 'timeissued, 'price)* > *tableEnv.registerDataStream("priceTable", newDateStream, 'key, > 'ticker, 'timeissued, 'price)* > > Thanks, vino. > > 2018-08-02 20:36 GMT+08:00 Mich Talebzadeh : > >> Appreciate if anyone had a chance to look at the Scala code in GitHub and >> advise >> >> https://github.com/michTalebzadeh/Flink >> >> Regards, >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Thu, 2 Aug 2018 at 09:06, Mich Talebzadeh >> wrote: >> >>> Thanks Timo, >>> >>> Did as suggested getting this compilation error >>> >>> [info] Compiling 1 Scala source to >>> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... >>> [error] >>> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:136: >>> could not find implicit value for evidence parameter of type >>> org.apache.flink.api.common.typeinfo.TypeInformation[String] >>> [error].addSource(new FlinkKafkaConsumer011[String](topicsValue, >>> new SimpleStringSchema(), properties)) >>> [error] ^ >>> [error] one error found >>> [error] (compile:compileIncremental) Compilation failed >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>>
Re: Converting a DataStream into a Table throws error
Appreciate if anyone had a chance to look at the Scala code in GitHub and advise https://github.com/michTalebzadeh/Flink Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 2 Aug 2018 at 09:06, Mich Talebzadeh wrote: > Thanks Timo, > > Did as suggested getting this compilation error > > [info] Compiling 1 Scala source to > /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:136: > could not find implicit value for evidence parameter of type > org.apache.flink.api.common.typeinfo.TypeInformation[String] > [error].addSource(new FlinkKafkaConsumer011[String](topicsValue, > new SimpleStringSchema(), properties)) > [error] ^ > [error] one error found > [error] (compile:compileIncremental) Compilation failed > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 2 Aug 2018 at 09:01, Timo Walther wrote: > >> Whenever you use Scala and there is a Scala specific class use it. >> >> remove: import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >> add: import org.apache.flink.streaming.api.scala._ >> >> This will use >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment. >> >> Timo >> >> Am 02.08.18 um 09:47 schrieb Mich Talebzadeh: >> >> Tremendous. Many thanks. >> >> Put the sbt build file and the Scala code here >> >> https://github.com/michTalebzadeh/Flink >> >> Regards, >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Thu, 2 Aug 2018 at 08:27, Timo Walther wrote: >> >>> Hi Mich, >>> >>> could you share your project with us (maybe on github)? Then we can >>> import it and debug what the problem is. >>> >>> Regards, >>> Timo >>> >>> Am 02.08.18 um 07:37 schrieb Mich Talebzadeh: >>> >>> Hi Jorn, >>> >>> Here you go the dependencies >>> >>> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" >>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6" >>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" >>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" >>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" >>> libraryDependencies += "org.apache.flink" %% >>> "flink-connector-kafka-0.11" % "1.5.0" >>> libraryDependencies += "org.apache.flink" %% >>> "flink-connector-kafka-base" % "1.5.0" >>> libraryDependencies += "org.apache.flink" %% "flink-scala" % &
Re: Converting a DataStream into a Table throws error
Thanks Timo, Did as suggested getting this compilation error [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:136: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] [error].addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 2 Aug 2018 at 09:01, Timo Walther wrote: > Whenever you use Scala and there is a Scala specific class use it. > > remove: import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > add: import org.apache.flink.streaming.api.scala._ > > This will use > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment. > > Timo > > Am 02.08.18 um 09:47 schrieb Mich Talebzadeh: > > Tremendous. Many thanks. > > Put the sbt build file and the Scala code here > > https://github.com/michTalebzadeh/Flink > > Regards, > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 2 Aug 2018 at 08:27, Timo Walther wrote: > >> Hi Mich, >> >> could you share your project with us (maybe on github)? Then we can >> import it and debug what the problem is. >> >> Regards, >> Timo >> >> Am 02.08.18 um 07:37 schrieb Mich Talebzadeh: >> >> Hi Jorn, >> >> Here you go the dependencies >> >> libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" >> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6" >> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" >> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" >> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" >> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" >> % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" >> % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" >> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0" >> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % >> "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % >> "provided" >> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" >> >> Thanks >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from
Re: Converting a DataStream into a Table throws error
Tremendous. Many thanks. Put the sbt build file and the Scala code here https://github.com/michTalebzadeh/Flink Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 2 Aug 2018 at 08:27, Timo Walther wrote: > Hi Mich, > > could you share your project with us (maybe on github)? Then we can import > it and debug what the problem is. > > Regards, > Timo > > Am 02.08.18 um 07:37 schrieb Mich Talebzadeh: > > Hi Jorn, > > Here you go the dependencies > > libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" > libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6" > libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" > libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" > libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" > libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" > % "1.5.0" > libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" > % "1.5.0" > libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" > libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0" > libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % > "1.5.0" > libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % > "provided" > libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 2 Aug 2018 at 06:19, Jörn Franke wrote: > >> >> How does your build.sbt looks especially dependencies? >> On 2. Aug 2018, at 00:44, Mich Talebzadeh >> wrote: >> >> Changed as suggested >> >>val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment >> val dataStream = streamExecEnv >>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new >> SimpleStringSchema(), properties)) >> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) >> tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, >> 'timeissued, 'price) >> >> Still the same error >> >> [info] Compiling 1 Scala source to >> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... >> [error] >> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139: >> overloaded method value registerDataStream with alternatives: >> [error] [T](name: String, dataStream: >> org.apache.flink.streaming.api.datastream.DataStream[T], fields: >> String)Unit >> [error] [T](name: String, dataStream: >> org.apache.flink.streaming.api.datastream.DataStream[T])Unit >> [error] cannot be applied to (String, >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, >> Symbol, Symbol, Symbol, Symbol) >> [error] tableEnv.registerDataStream("table1", streamExecEnv, 'key, >> 'ticker, 'timeissued, 'price) >> [error]^ >> [error] one error found >> [error] (compile:compileIncremental) Compilation failed >> [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM >> >> Thanks anyway. >> >> Dr Mich Talebzadeh >> >> >> &
Re: Converting a DataStream into a Table throws error
Thanks everyone for the advice This worked and passed the compilation error import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._ …. val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) tableEnv.registerDataStream("priceTable", dataStream, "key, ticker, timeissued, price") sqltext = "SELECT key from priceTable"; val result = tableEnv.sql(sqltext); Now I get this runtime error …. [success] Total time: 16 s, completed Aug 2, 2018 8:23:12 AM Completed compiling Thu Aug 2 08:23:12 BST 2018 , Running in **Standalone mode** Starting execution of program java.lang.NoClassDefFoundError: org/apache/flink/table/api/TableEnvironment$ at md_streaming$.main(md_streaming.scala:140) at md_streaming.main(md_streaming.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.api.TableEnvironment$ at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 2 Aug 2018 at 03:07, vino yang wrote: > Hi Mich, > > It seems that the type of your DataStream stream is always wrong. > If you want to specify four fields, usually the DataStream type should be > similar: DataStream[(Type1, Type2, Type3, Type4)], not DataStream[String], > you can try it. > > Thanks, vino > > 2018-08-02 6:44 GMT+08:00 Mich Talebzadeh : > >> Changed as suggested >> >>val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment >> val dataStream = streamExecEnv >>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new >> SimpleStringSchema(), properties)) >> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) >> tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, >> 'timeissued, 'price) >> >> Still the same error >> >> [info] Compiling 1 Scala source to >> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... >> [error] >> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139: >> overloaded method value registerDataStream with alternatives: >> [error] [T](name: String, dataStream: >> org.apache.flink.streaming.api.datastream.DataStream[T], fields: >> String)Unit >> [error] [T](name: String, dataStream: >> org.apache.flink.streaming.api.datastream.DataStream[T])Unit >> [error] cannot be applied to (String, >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, >&g
Re: Converting a DataStream into a Table throws error
Hi Jorn, Here you go the dependencies libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0" libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % "provided" libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 2 Aug 2018 at 06:19, Jörn Franke wrote: > > How does your build.sbt looks especially dependencies? > On 2. Aug 2018, at 00:44, Mich Talebzadeh > wrote: > > Changed as suggested > >val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment > val dataStream = streamExecEnv >.addSource(new FlinkKafkaConsumer011[String](topicsValue, new > SimpleStringSchema(), properties)) > val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) > tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, > 'timeissued, 'price) > > Still the same error > > [info] Compiling 1 Scala source to > /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139: > overloaded method value registerDataStream with alternatives: > [error] [T](name: String, dataStream: > org.apache.flink.streaming.api.datastream.DataStream[T], fields: > String)Unit > [error] [T](name: String, dataStream: > org.apache.flink.streaming.api.datastream.DataStream[T])Unit > [error] cannot be applied to (String, > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, > Symbol, Symbol, Symbol, Symbol) > [error] tableEnv.registerDataStream("table1", streamExecEnv, 'key, > 'ticker, 'timeissued, 'price) > [error]^ > [error] one error found > [error] (compile:compileIncremental) Compilation failed > [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM > > Thanks anyway. > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 1 Aug 2018 at 23:34, Fabian Hueske wrote: > >> Hi, >> >> You have to pass the StreamExecutionEnvironment to the >> getTableEnvironment() method, not the DataStream (or DataStreamSource). >> Change >> >> val tableEnv = TableEnvironment.getTableEnvironment(dataStream) >> >> to >> >> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) >> >> Best, >> Fabian >> >> 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh : >> >>> Hi, >>> >>> FYI, these are my imports >>> >>> import java.util.Properties >>> import java.util.Arrays >>> import org.apache.flink.api.com
Re: Converting a DataStream into a Table throws error
Changed as suggested val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, 'timeissued, 'price) Still the same error [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:139: overloaded method value registerDataStream with alternatives: [error] [T](name: String, dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)Unit [error] [T](name: String, dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])Unit [error] cannot be applied to (String, org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, Symbol, Symbol, Symbol, Symbol) [error] tableEnv.registerDataStream("table1", streamExecEnv, 'key, 'ticker, 'timeissued, 'price) [error]^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed Aug 1, 2018 11:40:47 PM Thanks anyway. Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 1 Aug 2018 at 23:34, Fabian Hueske wrote: > Hi, > > You have to pass the StreamExecutionEnvironment to the > getTableEnvironment() method, not the DataStream (or DataStreamSource). > Change > > val tableEnv = TableEnvironment.getTableEnvironment(dataStream) > > to > > val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) > > Best, > Fabian > > 2018-08-02 0:10 GMT+02:00 Mich Talebzadeh : > >> Hi, >> >> FYI, these are my imports >> >> import java.util.Properties >> import java.util.Arrays >> import org.apache.flink.api.common.functions.MapFunction >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >> import org.apache.flink.streaming.api.scala >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >> import org.apache.flink.table.api.TableEnvironment >> import org.apache.flink.table.api.scala._ >> import org.apache.flink.api.scala._ >> import org.apache.kafka.clients.consumer.ConsumerConfig >> import org.apache.kafka.clients.consumer.ConsumerRecord >> import org.apache.kafka.clients.consumer.KafkaConsumer >> import org.apache.flink.core.fs.FileSystem >> import org.apache.flink.streaming.api.TimeCharacteristic >> import org.slf4j.LoggerFactory >> import >> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, >> FlinkKafkaProducer011} >> import java.util.Calendar >> import java.util.Date >> import java.text.DateFormat >> import java.text.SimpleDateFormat >> import org.apache.log4j.Logger >> import org.apache.log4j.Level >> import sys.process.stringSeqToProcess >> import java.io.File >> >> And this is the simple code >> >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", bootstrapServers) >> properties.setProperty("zookeeper.connect", zookeeperConnect) >> properties.setProperty("group.id", flinkAppName) >> properties.setProperty("auto.offset.reset", "latest") >> val streamExecEnv = >> StreamExecutionEnvironment.getExecutionEnvironment >> val dataStream = streamExecEnv >>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new >> SimpleStringSchema(), properties)) >> val tableEnv = TableEnvironment.getTableEnvironment(dataStream) >> tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker, >> 'timeissued, 'price) >> >> And this is the compilation error >> >> info] Compiling 1 Scala source to >> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... >> [error] >> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138: >> overloaded me
Re: Converting a DataStream into a Table throws error
Hi, FYI, these are my imports import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.flink.core.fs.FileSystem import org.apache.flink.streaming.api.TimeCharacteristic import org.slf4j.LoggerFactory import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011} import java.util.Calendar import java.util.Date import java.text.DateFormat import java.text.SimpleDateFormat import org.apache.log4j.Logger import org.apache.log4j.Level import sys.process.stringSeqToProcess import java.io.File And this is the simple code val properties = new Properties() properties.setProperty("bootstrap.servers", bootstrapServers) properties.setProperty("zookeeper.connect", zookeeperConnect) properties.setProperty("group.id", flinkAppName) properties.setProperty("auto.offset.reset", "latest") val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) val tableEnv = TableEnvironment.getTableEnvironment(dataStream) tableEnv.registerDataStream("table1", dataStream, 'key, 'ticker, 'timeissued, 'price) And this is the compilation error info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:138: overloaded method value getTableEnvironment with alternatives: [error] (executionEnvironment: org.apache.flink.streaming.api.scala.StreamExecutionEnvironment)org.apache.flink.table.api.scala.StreamTableEnvironment [error] (executionEnvironment: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment)org.apache.flink.table.api.java.StreamTableEnvironment [error] (executionEnvironment: org.apache.flink.api.scala.ExecutionEnvironment)org.apache.flink.table.api.scala.BatchTableEnvironment [error] (executionEnvironment: org.apache.flink.api.java.ExecutionEnvironment)org.apache.flink.table.api.java.BatchTableEnvironment [error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String]) [error] val tableEnv = TableEnvironment.getTableEnvironment(dataStream) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed Aug 1, 2018 11:02:33 PM Completed compiling which is really strange Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 1 Aug 2018 at 13:42, Fabian Hueske wrote: > Hi I think you are mixing Java and Scala dependencies. > > org.apache.flink.streaming.api.datastream.DataStream is the DataStream of > the Java DataStream API. > You should use the DataStream of the Scala DataStream API. > > Best, Fabian > > 2018-08-01 14:01 GMT+02:00 Mich Talebzadeh : > >> Hi, >> >> I believed I tried Hequn's suggestion and tried again >> >> import org.apache.flink.table.api.Table >> import org.apache.flink.table.api.TableEnvironment >> >> *import org.apache.flink.table.api.scala._* >> Unfortunately I am still getting the same error! >> >> [info] Compiling 1 Scala source to >> /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... >> [error] >> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151: >> overloaded method value fromDataStream with alternatives: >> [error] [T](dataStream: >> org.apache.flink.streaming.api.datastream.DataStream[T], fields: >> String)org.apache.flink.table.api.Table >> [error] [T](dataStream: >> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Tab
Re: Converting a DataStream into a Table throws error
Hi, I believed I tried Hequn's suggestion and tried again import org.apache.flink.table.api.Table import org.apache.flink.table.api.TableEnvironment *import org.apache.flink.table.api.scala._* Unfortunately I am still getting the same error! [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:151: overloaded method value fromDataStream with alternatives: [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)org.apache.flink.table.api.Table [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table [error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) [error]^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed Aug 1, 2018 12:59:44 PM Completed compiling Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 1 Aug 2018 at 10:03, Timo Walther wrote: > If these two imports are the only imports that you added, then you did not > follow Hequn's advice or the link that I sent you. > > You need to add the underscore imports to let Scala do its magic. > > Timo > > > Am 01.08.18 um 10:28 schrieb Mich Talebzadeh: > > Hi Timo, > > These are my two flink table related imports > > import org.apache.flink.table.api.Table > import org.apache.flink.table.api.TableEnvironment > > And these are my dependencies building with SBT > > libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" > libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6" > libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" > libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" > libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" > libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" > % "1.5.0" > libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" > % "1.5.0" > libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" > libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0" > libraryDependencies += "org.apache.flink" %% "flink-streaming-java" % > "1.5.0" % "provided" > > *libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % > "provided" *libraryDependencies += "org.apache.kafka" %% "kafka" % > "0.11.0.0" > > There appears to be conflict somewhere that cause this error > > [info] Compiling 1 Scala source to > /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: > overloaded method value fromDataStream with alternatives: > [error] [T](dataStream: > org.apache.flink.streaming.api.datastream.DataStream[T], fields: > String)org.apache.flink.table.api.Table > [error] [T](dataStream: > org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table > [error] cannot be applied to > (org.apache.flink.streaming.api.datastream.DataStreamSource[String], > Symbol, Symbol, Symbol, Symbol) > [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, > 'ticker, 'timeissued, 'price) > [error]^ > [error] one error found > [error] (compile:compileIncremental) Compilation failed > > Thanks > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >
Re: Converting a DataStream into a Table throws error
Hi Timo, These are my two flink table related imports import org.apache.flink.table.api.Table import org.apache.flink.table.api.TableEnvironment And these are my dependencies building with SBT libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1" libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0" libraryDependencies += "org.apache.flink" %% "flink-streaming-java" % "1.5.0" % "provided" *libraryDependencies += "org.apache.flink" %% "flink-table" % "1.5.0" % "provided"*libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" There appears to be conflict somewhere that cause this error [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: overloaded method value fromDataStream with alternatives: [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)org.apache.flink.table.api.Table [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table [error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) [error]^ [error] one error found [error] (compile:compileIncremental) Compilation failed Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 1 Aug 2018 at 09:17, Timo Walther wrote: > Hi Mich, > > I would check you imports again [1]. This is a pure compiler issue that is > unrelated to your actual data stream. Also check your project dependencies. > > Regards, > Timo > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#implicit-conversion-for-scala > > Am 01.08.18 um 09:30 schrieb Mich Talebzadeh: > > > Hi both, > > I added the import as Hequn suggested. > > My stream is very simple and consists of 4 values separated by "," as > below > > 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48 > > So this is what I have been trying to do > > Code > > val dataStream = streamExecEnv >.addSource(new FlinkKafkaConsumer011[String](topicsValue, new > SimpleStringSchema(), properties)) > // > // > val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) > val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, > 'timeissued, 'price) > > note those four columns in Table1 definition > > And this is the error being thrown > > [info] Compiling 1 Scala source to > /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: > overloaded method value fromDataStream with alternatives: > [error] [T](dataStream: > org.apache.flink.streaming.api.datastream.DataStream[T], fields: > String)org.apache.flink.table.api.Table > [error] [T](dataStream: > org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table > [error] cannot be applied to > (org.apache.flink.streaming.api.datastream.DataStreamSource[String], > Symbol, Symbol, Symbol, Symbol) > [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, > 'ticker, 'ti
Re: Converting a DataStream into a Table throws error
Hi both, I added the import as Hequn suggested. My stream is very simple and consists of 4 values separated by "," as below 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48 So this is what I have been trying to do Code val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) // // val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) note those four columns in Table1 definition And this is the error being thrown [info] Compiling 1 Scala source to /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/classes... [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:152: overloaded method value fromDataStream with alternatives: [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)org.apache.flink.table.api.Table [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table [error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) [error]^ [error] one error found [error] (compile:compileIncremental) Compilation failed I suspect dataStream may not be compatible with this operation? Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 1 Aug 2018 at 04:51, Hequn Cheng wrote: > Hi, Mich > > You can try adding "import org.apache.flink.table.api.scala._", so that > the Symbol can be recognized as an Expression. > > Best, Hequn > > On Wed, Aug 1, 2018 at 6:16 AM, Mich Talebzadeh > wrote: > >> Hi, >> >> I am following this example >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api >> >> This is my dataStream which is built on a Kafka topic >> >>// >> //Create a Kafka consumer >> // >> val dataStream = streamExecEnv >>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new >> SimpleStringSchema(), properties)) >> // >> // >> val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) >> val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, >> 'timeissued, 'price) >> >> While compiling it throws this error >> >> [error] >> /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169: >> overloaded method value fromDataStream with alternatives: >> [error] [T](dataStream: >> org.apache.flink.streaming.api.datastream.DataStream[T], fields: >> String)org.apache.flink.table.api.Table >> [error] [T](dataStream: >> org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table >> [error] cannot be applied to >> (org.apache.flink.streaming.api.datastream.DataStreamSource[String], >> Symbol, Symbol, Symbol, Symbol) >> [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, >> 'ticker, 'timeissued, 'price) >> [error]^ >> [error] one error found >> [error] (compile:compileIncremental) Compilation failed >> >> The topic is very simple, it is comma separated prices. I tried >> mapFunction and flatMap but neither worked! >> >> Thanks, >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> > >
Converting a DataStream into a Table throws error
Hi, I am following this example https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#integration-with-datastream-and-dataset-api This is my dataStream which is built on a Kafka topic // //Create a Kafka consumer // val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) // // val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) While compiling it throws this error [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:169: overloaded method value fromDataStream with alternatives: [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T], fields: String)org.apache.flink.table.api.Table [error] [T](dataStream: org.apache.flink.streaming.api.datastream.DataStream[T])org.apache.flink.table.api.Table [error] cannot be applied to (org.apache.flink.streaming.api.datastream.DataStreamSource[String], Symbol, Symbol, Symbol, Symbol) [error] val table1: Table = tableEnv.fromDataStream(dataStream, 'key, 'ticker, 'timeissued, 'price) [error]^ [error] one error found [error] (compile:compileIncremental) Compilation failed The topic is very simple, it is comma separated prices. I tried mapFunction and flatMap but neither worked! Thanks, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: splitting DataStream throws error
Thanks So the assumption is that one cannot perform split on DataStream[String] directly? Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 30 Jul 2018 at 21:54, Chesnay Schepler wrote: > You define a flatMap function that takes a string, calls String#split on > it and collects the array. > > On 30.07.2018 22:04, Mich Talebzadeh wrote: > > > Hi, > > I have Kafka streaming feeds where a row looks like below where fields are > separated by "," > I can split them easily with split function > > scala> val oneline = > "05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48" > oneline: String = > 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48 > scala> oneline.split(",") > res26: Array[String] = Array(05521df6-4ccf-4b2f-b874-eb27d461b305, IBM, > 2018-07-30T19:51:50, 190.48) > > I can get the individual columns as below > > scala>val key = oneline.split(",").map(_.trim).view(0).toString > key: String = 05521df6-4ccf-4b2f-b874-eb27d461b305 > scala>val key = oneline.split(",").map(_.trim).view(1).toString > key: String = IBM > scala>val key = oneline.split(",").map(_.trim).view(2).toString > key: String = 2018-07-30T19:51:50 > scala>val key = oneline.split(",").map(_.trim).view(3).toFloat > key: Float = 190.48 > > Now when I apply the same to dataStream in flink it fails > > val dataStream = streamExecEnv >.addSource(new FlinkKafkaConsumer011[String](topicsValue, new > SimpleStringSchema(), properties)) > > > *dataStream.split(",") * > [error] > /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:154: > type mismatch; > [error] found : String(",") > [error] required: > org.apache.flink.streaming.api.collector.selector.OutputSelector[String] > [error] dataStream.split(",") > [error] ^ > [error] one error found > [error] (compile:compileIncremental) Compilation failed > > What operation do I need to do on dataStream to make this split work? > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > >
splitting DataStream throws error
Hi, I have Kafka streaming feeds where a row looks like below where fields are separated by "," I can split them easily with split function scala> val oneline = "05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48" oneline: String = 05521df6-4ccf-4b2f-b874-eb27d461b305,IBM,2018-07-30T19:51:50,190.48 scala> oneline.split(",") res26: Array[String] = Array(05521df6-4ccf-4b2f-b874-eb27d461b305, IBM, 2018-07-30T19:51:50, 190.48) I can get the individual columns as below scala>val key = oneline.split(",").map(_.trim).view(0).toString key: String = 05521df6-4ccf-4b2f-b874-eb27d461b305 scala>val key = oneline.split(",").map(_.trim).view(1).toString key: String = IBM scala>val key = oneline.split(",").map(_.trim).view(2).toString key: String = 2018-07-30T19:51:50 scala>val key = oneline.split(",").map(_.trim).view(3).toFloat key: Float = 190.48 Now when I apply the same to dataStream in flink it fails val dataStream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) *dataStream.split(",")* [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:154: type mismatch; [error] found : String(",") [error] required: org.apache.flink.streaming.api.collector.selector.OutputSelector[String] [error] dataStream.split(",") [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed What operation do I need to do on dataStream to make this split work? Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Working out through individual messages in Flink
Thanks again. The Hbase connector works fine in Flink // Start Hbase table stuff val tableName = "MARKETDATAHBASESPEEDFLINK" val hbaseConf = HBaseConfiguration.create() // Connecting to remote Hbase hbaseConf.set("hbase.master", hbaseHost) hbaseConf.set("hbase.zookeeper.quorum",zookeeperHost) hbaseConf.set("hbase.zookeeper.property.clientPort",zooKeeperClientPort) hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName) // create this table with column family val admin = new HBaseAdmin(hbaseConf) if(!admin.isTableAvailable(tableName)) { println("Creating table " + tableName) val tableDesc = new HTableDescriptor(tableName) tableDesc.addFamily(new HColumnDescriptor("PRICE_INFO".getBytes())) tableDesc.addFamily(new HColumnDescriptor("OPERATION".getBytes())) admin.createTable(tableDesc) } else { println("Table " + tableName + " already exists!!") } val HbaseTable = new HTable(hbaseConf, tableName) // End Hbase table stuff So I just need to split every row into columns and put it into Hbase as follows: // Save prices to Hbase table var p = new Put(new String(key).getBytes()) //p.add("PRICE_INFO".getBytes(), "key".getBytes(), new String(ticker).getBytes()) p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(), new String(ticker).getBytes()) p.add("PRICE_INFO".getBytes(), "SSUED".getBytes(), new String(timeissued).getBytes()) p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(), new String(priceToString).getBytes()) p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), new String(CURRENCY).getBytes()) p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), new String(1.toString).getBytes()) p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), new String(System.currentTimeMillis.toString).getBytes()) HbaseTable.put(p) HbaseTable.flushCommits() Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 30 Jul 2018 at 09:58, Fabian Hueske wrote: > A *Table*Source [1], is a special input connector for Flink's relational > APIs (Table API and SQL) [2]. > You can transform and filter with these APIs as well (it's probably even > easier). In SQL this would be the SELECT and WHERE clauses of a query. > > However, there is no *Table*Sink for HBase and you would need to convert > the Table back to a DataStream [3]. > That's not very difficult since the APIs are integrated with each other. > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset > > 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh : > >> Thanks Fabian. That was very useful. >> >> How about an operation like below? >> >> // create builder >> val KafkaTableSource = Kafka011JsonTableSource.builder() >> // set Kafka topic >> .forTopic(topicsValue) >> // set Kafka consumer properties >> .withKafkaProperties(properties) >> // set Table schema >> .withSchema(TableSchema.builder() >> .field("key", Types.STRING) >> .field("ticker", Types.STRING) >> .field("timeissued", Types.STRING) >> .field("price", Types.FLOAT) >> .build()) >> >> Will that be OK? >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility f
Re: Working out through individual messages in Flink
Thanks Fabian. That was very useful. How about an operation like below? // create builder val KafkaTableSource = Kafka011JsonTableSource.builder() // set Kafka topic .forTopic(topicsValue) // set Kafka consumer properties .withKafkaProperties(properties) // set Table schema .withSchema(TableSchema.builder() .field("key", Types.STRING) .field("ticker", Types.STRING) .field("timeissued", Types.STRING) .field("price", Types.FLOAT) .build()) Will that be OK? Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 30 Jul 2018 at 09:19, Fabian Hueske wrote: > Hi, > > Flink processes streams record by record, instead of micro-batching > records together. Since every record comes by itself, there is no for-each. > Simple record-by-record transformations can be done with a MapFunction, > filtering out records with a FilterFunction. You can also implement a > FlatMapFunction to do both in one step. > > Once the stream is transformed and filtered, you can write it to HBase > with a sink function. > > > 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh : > >> Just to clarify these are the individual prices separated by ','. The >> below shows three price lines in the topic >> >> UUID,Security, Time,Price >> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88 >> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94 >> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33 >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh >> wrote: >> >>> >>> Hi, >>> >>> I have a Kafka topic that transmits 100 security prices ever 2 seconds. >>> >>> In Spark streaming I go through the RDD and walk through rows one by one >>> and check prices >>> In prices are valuable I store them into an Hbase table >>> >>> val dstream = KafkaUtils.createDirectStream[String, String, >>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) >>> dstream.cache() >>> dstream.foreachRDD >>> { pricesRDD => >>> // Work on individual messages >>> * for(line <- pricesRDD.collect.toArray)* >>> { >>>var key = line._2.split(',').view(0).toString >>>var ticker = line._2.split(',').view(1).toString >>>var timeissued = line._2.split(',').view(2).toString >>>var price = line._2.split(',').view(3).toFloat >>>val priceToString = line._2.split(',').view(3) >>> if (price > 90.0) >>>{ >>>//save to Hbase table >>>} >>> } >>> } >>> >>> That works fine. >>> >>> In Flink I define my source as below >>> >>> val streamExecEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment >>> >>> streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> val stream = streamExecEnv >>>.addSource(new FlinkKafkaConsumer011[String](topicsValue, new >>> SimpleStringSchema(), properties)) >>> >>> Is there anyway I can perform similar operation in Flink? I need to
Working out through individual messages in Flink
Hi, I have a Kafka topic that transmits 100 security prices ever 2 seconds. In Spark streaming I go through the RDD and walk through rows one by one and check prices In prices are valuable I store them into an Hbase table val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) dstream.cache() dstream.foreachRDD { pricesRDD => // Work on individual messages * for(line <- pricesRDD.collect.toArray)* { var key = line._2.split(',').view(0).toString var ticker = line._2.split(',').view(1).toString var timeissued = line._2.split(',').view(2).toString var price = line._2.split(',').view(3).toFloat val priceToString = line._2.split(',').view(3) if (price > 90.0) { //save to Hbase table } } } That works fine. In Flink I define my source as below val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of *for(line <- pricesRDD.collect.toArray)* In flink? Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
not found: type CustomWatermarkEmitter
Emitter()) [error] ^ [error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:99: type mismatch; [error] found : org.apache.flink.streaming.api.datastream.DataStreamSource[String] [error] required: org.apache.flink.streaming.api.functions.source.SourceFunction[?] [error] env.addSource(myConsumer).print() [error] ^ [error] two errors found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed Jul 29, 2018 9:31:05 AM Completed compiling Sun Jul 29 09:31:05 BST 2018 , Running in **Standalone mode** Could not build the program from JAR file. I don't see why it is failing. Appreciate any suggestions. Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Does Flink release Hadoop(R) 2.8 work with Hadoop 3?
Hi Vino, Many thanks. I can confirm that Flink version flink-1.5.0-bin-hadoop28-scala_2.11 works fine with Hadoop 3.1.0. I did not need to build it from source. Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sat, 28 Jul 2018 at 07:48, vino yang wrote: > Hi Mich, > > I think this depends on the backward compatibility of the Hadoop client > API. In theory, there is no problem. > Hadoop 2.8 to Hadoop 3.0 is a very large upgrade, and personally recommend > using a client version that is consistent with the Hadoop cluster. > By compiling and packaging from source, you can let Flink bundle specific > Hadoop versions. Flink has shaded hadoop see here[1]. You can try to change > the maven version and repackage the project. > > [1]: https://github.com/apache/flink/tree/master/flink-shaded-hadoop > > Thanks, vino. > > 2018-07-27 23:31 GMT+08:00 Mich Talebzadeh : > >> Hi, >> >> I can run Flink without bundled Hadoop fine. I was wondering if Flink >> with Hadoop 2.8 works with Hadoop 3 as well? >> >> Thanks, >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> > >
Does Flink release Hadoop(R) 2.8 work with Hadoop 3?
Hi, I can run Flink without bundled Hadoop fine. I was wondering if Flink with Hadoop 2.8 works with Hadoop 3 as well? Thanks, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Real time streaming as a microservice
Hi Deepak, I will put it there once all the bits and pieces come together. At the moment I am drawing the diagrams. I will let you know. Definitely everyone's contribution is welcome. Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 15 Jul 2018 at 09:16, Deepak Sharma wrote: > Is it on github Mich ? > I would love to use the flink and spark edition and add some use cases > from my side. > > Thanks > Deepak > > On Sun, Jul 15, 2018, 13:38 Mich Talebzadeh > wrote: > >> Hi all, >> >> I have now managed to deploy both ZooKeeper and Kafka as microservices >> using docker images. >> >> The idea came to me as I wanted to create lightweight processes for both >> ZooKeeper and Kafka to be used as services for Flink and Spark >> simultaneously. >> >> In this design both Flink and Spark rely on streaming market data >> messages published through Kafka. My current design is simple one docker >> for Zookeeper and another for Kafka >> >> [root@rhes75 ~]# docker ps -a >> CONTAINER IDIMAGE COMMAND >> CREATED STATUS >> PORTSNAMES >> 05cf097ac139ches/kafka "/start.sh" 9 hours >> ago Up 9 hours *0.0.0.0:7203->7203/tcp, >> 0.0.0.0:9092->9092/tcp* kafka >> b173e455cc80jplock/zookeeper"/opt/zookeeper/bin/…" 10 hours >> agoUp 10 hours (healthy) *2888/tcp, 0.0.0.0:2181->2181/tcp, >> 3888/tcp* zookeeper >> >> Note that the docker ports are exposed to the physical host that running >> the containers. >> >> A test message is simply created as follows: >> ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181 >> --replication-factor 1 --partitions 1 --topic test >> >> Note that rhes75 is the host that houses the dockers and port 2181 is the >> zookeeper port used by the zookeeper docker and mapped >> >> The spark streaming uses speed layer in Lambda architecture to write to >> an Hbase table for selected market data (Hbase requires connectivity to a >> Zookeeper). For Hbase I specified a zookeeper instance running on another >> host and Hbase works fine. >> >> Anyway I will provide further info and diagrams. >> >> Cheers, >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Sun, 15 Jul 2018 at 08:40, Mich Talebzadeh >> wrote: >> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> Thanks got it sorted. >>> >>> Regards, >>> >>> >>> On Tue, 10 Jul 2018 at 09:24, Mich Talebzadeh >>> wrote: >>> >>>> Thanks R
Re: Real time streaming as a microservice
Hi all, I have now managed to deploy both ZooKeeper and Kafka as microservices using docker images. The idea came to me as I wanted to create lightweight processes for both ZooKeeper and Kafka to be used as services for Flink and Spark simultaneously. In this design both Flink and Spark rely on streaming market data messages published through Kafka. My current design is simple one docker for Zookeeper and another for Kafka [root@rhes75 ~]# docker ps -a CONTAINER IDIMAGE COMMAND CREATED STATUS PORTSNAMES 05cf097ac139ches/kafka "/start.sh" 9 hours ago Up 9 hours *0.0.0.0:7203->7203/tcp, 0.0.0.0:9092->9092/tcp* kafka b173e455cc80jplock/zookeeper"/opt/zookeeper/bin/…" 10 hours agoUp 10 hours (healthy) *2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp* zookeeper Note that the docker ports are exposed to the physical host that running the containers. A test message is simply created as follows: ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181 --replication-factor 1 --partitions 1 --topic test Note that rhes75 is the host that houses the dockers and port 2181 is the zookeeper port used by the zookeeper docker and mapped The spark streaming uses speed layer in Lambda architecture to write to an Hbase table for selected market data (Hbase requires connectivity to a Zookeeper). For Hbase I specified a zookeeper instance running on another host and Hbase works fine. Anyway I will provide further info and diagrams. Cheers, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 15 Jul 2018 at 08:40, Mich Talebzadeh wrote: > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > Thanks got it sorted. > > Regards, > > > On Tue, 10 Jul 2018 at 09:24, Mich Talebzadeh > wrote: > >> Thanks Rahul. >> >> This is the outcome of >> >> [root@rhes75 ~]# iptables -t nat -L -n >> Chain PREROUTING (policy ACCEPT) >> target prot opt source destination >> DOCKER all -- 0.0.0.0/00.0.0.0/0ADDRTYPE >> match dst-type LOCAL >> Chain INPUT (policy ACCEPT) >> target prot opt source destination >> Chain OUTPUT (policy ACCEPT) >> target prot opt source destination >> DOCKER all -- 0.0.0.0/0 !127.0.0.0/8 ADDRTYPE >> match dst-type LOCAL >> Chain POSTROUTING (policy ACCEPT) >> target prot opt source destination >> MASQUERADE all -- 172.17.0.0/160.0.0.0/0 >> MASQUERADE all -- 172.18.0.0/160.0.0.0/0 >> RETURN all -- 192.168.122.0/24 224.0.0.0/24 >> RETURN all -- 192.168.122.0/24 255.255.255.255 >> MASQUERADE tcp -- 192.168.122.0/24!192.168.122.0/24 masq >> ports: 1024-65535 >> MASQUERADE udp -- 192.168.122.0/24!192.168.122.0/24 masq >> ports: 1024-65535 >> MASQUERADE all -- 192.168.122.0/24!192.168.122.0/24 >> Chain DOCKER (2 references) >> target prot opt source destination >> RETURN all -- 0.0.0.0/00.0.0.0/0 >> RETURN all -- 0.0.0.0/00.0.0.0/0 >> >> So basically I need to connect to container from another host as the link >> points it out. >> >> My docker is already running. >> >> [root@rhes75 ~]# docker ps -a >> CONTAINER ID IMAGE COMMAND >> CREATED STATUS PORTS NAMES >> 8dd84a174834ubuntu "
Re: Real time streaming as a microservice
Thanks Jorn. So I gather as you correctly suggested, microservices do provide value in terms of modularisation. However, there will always "inevitably" be scenarios where the receiving artefact say Flink needs communication protocol changes? thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 8 Jul 2018 at 10:25, Jörn Franke wrote: > That they are loosely coupled does not mean they are independent. For > instance, you would not be able to replace Kafka with zeromq in your > scenario. Unfortunately also Kafka sometimes needs to introduce breaking > changes and the dependent application needs to upgrade. > You will not be able to avoid these scenarios in the future (this is only > possible if micro services don’t communicate with each other or if they > would never need to change their communication protocol - pretty impossible > ). However there are ways of course to reduce it, eg kafka could reduce the > number of breaking changes or you can develop a very lightweight > microservice that is very easy to change and that only deals with the > broker integration and your application etc. > > > On 8. Jul 2018, at 10:59, Mich Talebzadeh > wrote: > > > > Hi, > > > > I have created the Kafka messaging architecture as a microservice that > > feeds both Spark streaming and Flink. Spark streaming uses micro-batches > > meaning "collect and process data" and flink as an event driven > > architecture (a stateful application that reacts to incoming events by > > triggering computations etc. > > > > According to Wikipedia, A Microservice is a technique that structures an > > application as a collection of loosely coupled services. In a > microservices > > architecture, services are fine-grained and the protocols are > lightweight. > > > > Ok for streaming data among other things I have to create and configure > > topic (or topics), design a robust zookeeper ensemble and create Kafka > > brokers with scalability and resiliency. Then I can offer the streaming > as > > a microservice to subscribers among them Spark and Flink. I can upgrade > > this microservice component in isolation without impacting either Spark > or > > Flink. > > > > The problem I face here is the dependency on Flink etc on the jar files > > specific for the version of Kafka deployed. For example kafka_2.12-1.1.0 > is > > built on Scala 2.12 and Kafka version 1.1.0. To make this work in Flink > 1.5 > > application, I need to use the correct dependency in sbt build. For > > example: > > libraryDependencies += "org.apache.flink" %% > "flink-connector-kafka-0.11" % > > "1.5.0" > > libraryDependencies += "org.apache.flink" %% > "flink-connector-kafka-base" % > > "1.5.0" > > libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" > > libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0" > > libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % > > "1.5.0" > > libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" > > > > and the Scala code needs to change: > > > > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 > > … > >val stream = env > > .addSource(new FlinkKafkaConsumer011[String]("md", new > > SimpleStringSchema(), properties)) > > > > So in summary some changes need to be made to Flink to be able to > interact > > with the new version of Kafka. And more importantly if one can use an > > abstract notion of microservice here? > > > > Dr Mich Talebzadeh > > > > > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > < > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >* > > > > > > > > http://talebzadehmich.wordpress.com > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > > loss, damage or destruction of data or any other property which may arise > > from relying on this email's technical content is explicitly disclaimed. > > The author will in no case be liable for any monetary damages arising > from > > such loss, damage or destruction. >
Real time streaming as a microservice
Hi, I have created the Kafka messaging architecture as a microservice that feeds both Spark streaming and Flink. Spark streaming uses micro-batches meaning "collect and process data" and flink as an event driven architecture (a stateful application that reacts to incoming events by triggering computations etc. According to Wikipedia, A Microservice is a technique that structures an application as a collection of loosely coupled services. In a microservices architecture, services are fine-grained and the protocols are lightweight. Ok for streaming data among other things I have to create and configure topic (or topics), design a robust zookeeper ensemble and create Kafka brokers with scalability and resiliency. Then I can offer the streaming as a microservice to subscribers among them Spark and Flink. I can upgrade this microservice component in isolation without impacting either Spark or Flink. The problem I face here is the dependency on Flink etc on the jar files specific for the version of Kafka deployed. For example kafka_2.12-1.1.0 is built on Scala 2.12 and Kafka version 1.1.0. To make this work in Flink 1.5 application, I need to use the correct dependency in sbt build. For example: libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0" libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" and the Scala code needs to change: import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 … val stream = env .addSource(new FlinkKafkaConsumer011[String]("md", new SimpleStringSchema(), properties)) So in summary some changes need to be made to Flink to be able to interact with the new version of Kafka. And more importantly if one can use an abstract notion of microservice here? Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: A use-case for Flink and reactive systems
Hi, What you are saying is I can either use Flink and forget database layer, or make a java microservice with a database. Mixing Flink with a Database doesn't make any sense. I would have thought that moving with microservices concept, Flink handling streaming data from the upstream microservice is an independent entity and microservice on its own assuming loosely coupled terminologies here Database tier is another microservice that interacts with your Flink and can serve other consumers. Indeed in classic CEP like Aleri or StreamBase you may persist your data to an external database for analytice. HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 5 Jul 2018 at 12:26, Yersinia Ruckeri wrote: > It helps, at least it's fairly clear now. > I am not against storing the state into Flink, but as per your first > point, I need to get it persisted, asynchronously, in an external database > too to let other possible application/services to retrieve the state. > What you are saying is I can either use Flink and forget database layer, > or make a java microservice with a database. Mixing Flink with a Database > doesn't make any sense. > My concerns with the database is how do you work out the previous state to > calculate the new one? Is it good and fast? (moving money from account A to > B isn't a problem cause you have two separate events). > > Moreover, a second PoC I was considering is related to Flink CEP. Let's > say I am elaborating sensor data, I want to have a rule which is working on > the following principle: > - If the temperature is more than 40 > - If the temperature yesterday at noon was more than 40 > - If no one used vents yesterday and two days ago > then do something/emit some event. > > This simple CEP example requires you to mine previous data/states from a > DB, right? Can Flink be considered for it without an external DB but only > relying on its internal RockDB ? > > Hope I am not generating more confusion here. > > Y > > On Thu, Jul 5, 2018 at 11:21 AM, Fabian Hueske wrote: > >> Hi Yersinia, >> >> The main idea of an event-driven application is to hold the state (i.e., >> the account data) in the streaming application and not in an external >> database like Couchbase. >> This design is very scalable (state is partitioned) and avoids look-ups >> from the external database because all state interactions are local. >> Basically, you are moving the database into the streaming application. >> >> There are a few things to consider if you maintain the state in the >> application: >> - You might need to mirror the state in an external database to make it >> queryable and available while the streaming application is down. >> - You need to have a good design to ensure that your consistency >> requirements are met in case of a failure (upsert writes can temporarily >> reset the external state). >> - The design becomes much more challenging if you need to access the >> state of two accounts to perform a transaction (subtract from the first and >> add to the second account) because Flink state is distributed per key and >> does not support remote lookups. >> >> If you do not want to store the state in the Flink application, I agree >> with Jörn that there's no need for Flink. >> >> Hope this helps, >> Fabian >> >> 2018-07-05 10:45 GMT+02:00 Yersinia Ruckeri : >> >>> My idea started from here: https://flink.apache.org/usecases.html >>> First use case describes what I am trying to realise ( >>> https://flink.apache.org/img/usecases-eventdrivenapps.png) >>> My application is Flink, listening to incoming events, changing the >>> state of an object (really an aggregate here) and pushing out another event. >>> States can be persisted asynchronously; this is ok. >>> >>> My point on top to this picture is that the "state" it's not just >>> persisting something, but retrieving the current state, manipulate it new >>> information and persist the updated state. >>> >>> >>> Y >>> >>> On Wed, Jul 4, 2018 at 10:16 PM, Mich Talebzadeh < >>> mich.talebza...@gmail.co
Re: A use-case for Flink and reactive systems
Hi Fabian, On your point below … Basically, you are moving the database into the streaming application. This assumes a finite size for the data in the streaming application to persist. In terms of capacity planning how this works? Some applications like Fraud try to address this by deploying databases like Aerospike <https://www.aerospike.com/> that the Keys are kept in the memory and indexed and the values are stored on SSD devices. I was wondering how Flink in general can address this? Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 5 Jul 2018 at 11:22, Fabian Hueske wrote: > Hi Yersinia, > > The main idea of an event-driven application is to hold the state (i.e., > the account data) in the streaming application and not in an external > database like Couchbase. > This design is very scalable (state is partitioned) and avoids look-ups > from the external database because all state interactions are local. > Basically, you are moving the database into the streaming application. > > There are a few things to consider if you maintain the state in the > application: > - You might need to mirror the state in an external database to make it > queryable and available while the streaming application is down. > - You need to have a good design to ensure that your consistency > requirements are met in case of a failure (upsert writes can temporarily > reset the external state). > - The design becomes much more challenging if you need to access the state > of two accounts to perform a transaction (subtract from the first and add > to the second account) because Flink state is distributed per key and does > not support remote lookups. > > If you do not want to store the state in the Flink application, I agree > with Jörn that there's no need for Flink. > > Hope this helps, > Fabian > > 2018-07-05 10:45 GMT+02:00 Yersinia Ruckeri : > >> My idea started from here: https://flink.apache.org/usecases.html >> First use case describes what I am trying to realise ( >> https://flink.apache.org/img/usecases-eventdrivenapps.png) >> My application is Flink, listening to incoming events, changing the state >> of an object (really an aggregate here) and pushing out another event. >> States can be persisted asynchronously; this is ok. >> >> My point on top to this picture is that the "state" it's not just >> persisting something, but retrieving the current state, manipulate it new >> information and persist the updated state. >> >> >> Y >> >> On Wed, Jul 4, 2018 at 10:16 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Looks interesting. >>> >>> As I understand you have a microservice based on ingestion where a topic >>> is defined for streaming messages that include transactional data. These >>> transactions should already exist in your DB. For now we look at DB as part >>> of your microservices and we take a logical view of it. >>> >>> So >>> >>> >>>1. First microservice M1 provides ingestion of kafka yopic >>>2. Second microservice M2 deploys Flink or Spark Streaming to >>>manipulate the incoming messages. We can look at this later. >>>3. Third microservice M3 consist of the database that provides >>>current records for accounts identified by the account number in your >>>message queue >>>4. M3 will have to validate the incoming account number, update the >>>transaction and provide completion handshake. Effectively you are >>> providing >>>DPaaS >>> >>> >>> So far we have avoided interfaces among these services. But I gather M1 >>> and M2 are well established. Assuming that Couchbase is your choice of DB I >>> believe it provides JDBC drivers of some sort. It does not have to be >>> Couchbase. You can achieve the same with Hbase as well or MongoDB. Anyhow >>> the only challenge I see here is the interface between your Flink >>> application in M2 and M3 >>> >>> HTH >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>
Re: A use-case for Flink and reactive systems
Looks interesting. As I understand you have a microservice based on ingestion where a topic is defined for streaming messages that include transactional data. These transactions should already exist in your DB. For now we look at DB as part of your microservices and we take a logical view of it. So 1. First microservice M1 provides ingestion of kafka yopic 2. Second microservice M2 deploys Flink or Spark Streaming to manipulate the incoming messages. We can look at this later. 3. Third microservice M3 consist of the database that provides current records for accounts identified by the account number in your message queue 4. M3 will have to validate the incoming account number, update the transaction and provide completion handshake. Effectively you are providing DPaaS So far we have avoided interfaces among these services. But I gather M1 and M2 are well established. Assuming that Couchbase is your choice of DB I believe it provides JDBC drivers of some sort. It does not have to be Couchbase. You can achieve the same with Hbase as well or MongoDB. Anyhow the only challenge I see here is the interface between your Flink application in M2 and M3 HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 4 Jul 2018 at 17:56, Yersinia Ruckeri wrote: > Hi all, > > I am working on a prototype which should include Flink in a reactive > systems software. The easiest use-case with a traditional bank system where > I have one microservice for transactions and another one for > account/balances. > Both are connected with Kafka. > > Transactions record a transaction and then send, via Kafka, a message > which include account identifer and the amount. > On the other microservice I want to have Flink consuming this topic and > updating the balance of my account based on the incoming message. it needs > to pull from the DB my data and make the update. > The DB is Couchbase. > > I spent few hours online today, but so far I only found there's no sink > for Couchbase, I need to build one which shouldn't be a big deal. I haven't > found information on how I can make Flink able to interact with a DB to > retrieve information and store information. > > I guess the case is a good case, as updating state in an event sourcing > based application is part of the first page of the manual. I am not looking > to just dump a state into a DB, but to interact with the DB: retrieving > data, elaborating them with the input coming from my queue, and persisting > them (especially if I want to make a second prototype using Flink CEP). > > I probably even read how to do it, but I didn't recognize. > > Can anybody help me to figure out better this part? > > Thanks, > Y. >
Handling back pressure in Flink.
Hi, In spark one can handle back pressure by setting the spark conf parameter: sparkConf.set("spark.streaming.backpressure.enabled","true") With backpressure you make Spark Streaming application stable, i.e. receives data only as fast as it can process it. In general one needs to ensure that your microbatching processing time is less that your batch interval, i.e the rate that your producer sends data into Kafka. For example this is shown in Spark GUI below for batch interval = 2 seconds [image: image.png] Is there such procedure in Flink please? Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: run time error java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09
yes indeed thanks. It is all working fine. But only writing to a text file. I want to emulate what I do with Flink as I do with Spark streaming writing high value events to Hbase on HDFS. Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Wed, 4 Jul 2018 at 09:18, Fabian Hueske wrote: > Looking at the other threads, I assume you solved this issue. > > The problem should have been that FlinkKafka09Consumer is not included in > the flink-connector-kafka-0.11 module, because it is the connector for > Kafka 0.9 and not Kafka 0.11. > > Best, Fabian > > 2018-07-02 11:20 GMT+02:00 Mich Talebzadeh : > >> Hi, >> >> I created a jar file with sbt with this sbt file >> >> cat md_streaming.sbt >> name := "md_streaming" >> version := "1.0" >> scalaVersion := "2.11.8" >> >> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.3" >> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6" >> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6" >> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6" >> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" >> % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" >> % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % >> "1.5.0" >> libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" >> >> and the Scala code is very basic >> >> import java.util.Properties >> import java.util.Arrays >> import org.apache.flink.api.common.functions.MapFunction >> import org.apache.flink.api.java.utils.ParameterTool >> import org.apache.flink.streaming.api.datastream.DataStream >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >> import org.apache.flink.streaming.util.serialization.DeserializationSchema >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >> object md_streaming >> { >> def main(args: Array[String]) >> { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", "rhes75:9092") >> properties.setProperty("zookeeper.connect", "rhes75:2181") >> properties.setProperty("group.id", "md_streaming") >> val stream = env >> .addSource(new FlinkKafkaConsumer09[String]("md", new >> SimpleStringSchema(), properties)) >> .writeAsText("/tmp/md_streaming.txt") >> env.execute("Flink Kafka Example") >> } >> } >> >> It compiles OK as follows >> >> Compiling md_streaming >> [info] Set current project to md_streaming (in build >> file:/home/hduser/dba/bin/flink/md_streaming/) >> [success] Total time: 0 s, completed Jul 2, 2018 10:16:05 AM >> [info] Set current project to md_streaming (in build >> file:/home/hduser/dba/bin/flink/md_streaming/) >> [info] Updating >> {file:/home/hduser/dba/bin/flink/md_streaming/}md_streaming... >> [info] Resolving jline#jline;2.12.1 ... >> [info] Done updating. >> [warn] Scala version was updated by one of library dependencies: >> [warn] * org.scala-lang:scala-library:(2.11.8, 2.11.11, 2.11.6, 2.11.7) >> -> 2.11.12 >> [warn] To force scalaVersion, add the following: >> [warn] ivyScala := ivyScala.value map { _.copy(overrideScalaVers
Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign
Hi Fabian. Thanks. Great contribution! It is working info] SHA-1: 98d78b909631e5d30664df6a7a4a3f421d4fd33b [info] Packaging /home/hduser/dba/bin/flink/md_streaming/target/scala-2.11/md_streaming-assembly-1.0.jar ... [info] Done packaging. [success] Total time: 14 s, completed Jul 3, 2018 9:32:25 AM Completed compiling Tue Jul 3 09:32:25 BST 2018 , Running in **Standalone mode** *Starting execution of program* And the test prices generated in the topic are added to the file for each security ad534c19-fb77-4966-86a8-dc411d7a0607,MKS,2018-07-03T09:50:03,319.2 e96e5d96-03fc-4603-899f-5ea5151fc280,IBM,2018-07-03T09:50:07,124.67 a64a51a4-f27c-439b-b9cc-28dc593acab3,MRW,2018-07-03T09:50:07,286.27 2d15089d-3635-4a7e-b2d5-20b92dd67186,MSFT,2018-07-03T09:50:07,22.8 415d1215-18e6-46ca-a771-98c614e8c3fb,ORCL,2018-07-03T09:50:07,32.57 e0dd5832-20bd-4951-a4dd-3a3a10c99a01,SAP,2018-07-03T09:50:07,69.32 4222eea9-d9a7-46e1-8b1e-c2b634170fad,SBRY,2018-07-03T09:50:07,235.22 4f2e0927-29ff-44ff-aa2e-9b16fdcd0024,TSCO,2018-07-03T09:50:07,403.64 49437f8b-5e2b-42e9-b3d7-f95eee9c5432,VOD,2018-07-03T09:50:07,239.08 0f96463e-40a5-47c5-b2c6-7191992ab0b1,BP,2018-07-03T09:50:07,587.75 d0041bf1-a313-4623-a7cc-2ce8204590bb,MKS,2018-07-03T09:50:07,406.02 c443ac53-f762-4fad-b11c-0fd4e98812fb,IBM,2018-07-03T09:50:10,168.52 67f2d372-f918-445e-8dac-7556a2dfd0aa,MRW,2018-07-03T09:50:10,293.2 57372392-53fd-48eb-aa94-c317c75d6545,MSFT,2018-07-03T09:50:10,46.53 c3839c12-be63-416c-a404-8d0333071559,ORCL,2018-07-03T09:50:10,31.57 29eca46c-bd4c-475e-a9c9-7bf105fcc935,SAP,2018-07-03T09:50:10,77.81 89f98ad0-dc34-476f-baa5-fc2fa92aa2d5,SBRY,2018-07-03T09:50:10,239.95 431494f3-1215-48ae-a534-5bf3fbe20f2f,TSCO,2018-07-03T09:50:10,331.12 2203095f-8826-424d-a1e3-fa212194ac35,VOD,2018-07-03T09:50:10,232.05 816ddc9b-f403-4ea9-8d55-c3afd0eae110,BP,2018-07-03T09:50:10,506.4 23c07878-d64d-4d1e-84a4-c14c23357467,MKS,2018-07-03T09:50:10,473.06 kind regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 3 Jul 2018 at 09:11, Fabian Hueske wrote: > Hi Mich, > > FlinkKafkaConsumer09 is the connector for Kafka 0.9.x. > Have you tried to use FlinkKafkaConsumer011 instead of > FlinkKafkaConsumer09? > > Best, Fabian > > > > 2018-07-02 22:57 GMT+02:00 Mich Talebzadeh : > >> This is becoming very tedious. >> >> As suggested I changed the kafka dependency from >> >> ibraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" >> to >> >> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0" >> >> and compiled and ran the job again anf failed. This is the log file >> >> 2018-07-02 21:38:38,656 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> version : 1.1.0 >> 2018-07-02 21:38:38,656 INFO >> org.apache.kafka.common.utils.AppInfoParser - Kafka >> commitId : fdcf75ea326b8e07 >> 2018-07-02 21:38:38,696 INFO >> org.apache.kafka.clients.Metadata - Cluster ID: >> 3SqEt4DcTruOr_SlQ6fqTQ >> 2018-07-02 21:38:38,698 INFO >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> Consumer subtask 0 will start reading the following 1 partitions from the >> committed group offsets in Kafka: [KafkaTopicPartition{topic='md', >> partition=0}] >> 2018-07-02 21:38:38,702 INFO >> org.apache.kafka.clients.consumer.ConsumerConfig - >> ConsumerConfig values: >> auto.commit.interval.ms = 5000 >> auto.offset.reset = latest >> bootstrap.servers = [rhes75:9092] >> check.crcs = true >> client.id = >> connections.max.idle.ms = 54 >> enable.auto.commit = true >> exclude.internal.topics = true >> fetch.max.bytes = 52428800 >> fetch.max.wait.ms = 500 >> fetch.min.bytes = 1 >> group.id = md_streaming >> heartbeat.interval.ms = 3000 >> interceptor.classes = [] >> internal.leave.group.on.close = true >> isolation.level = read_uncommitted >> key.deserializer = class >> org.apache.kafka.common.serialization.ByteArray
Re: Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.
thanks Hequn and Jorn that helped. But I am still getting this error for a simple streaming program at execution! import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.flink.core.fs.FileSystem object md_streaming { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "rhes75:9092") //properties.setProperty("zookeeper.connect", "rhes75:2181") properties.setProperty("group.id", "md_streaming") properties.setProperty("auto.offset.reset", "latest") val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("/tmp/md_streaming.txt", FileSystem.WriteMode.OVERWRITE) env.execute("Flink Kafka Example") } } Completed compiling Starting execution of program The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at md_streaming$.main(md_streaming.scala:32) at md_streaming.main(md_streaming.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary d
Re: Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.
thanks Hequn. When I use as suggested, I am getting this error error] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:30: not found: value FileSystem [error] .writeAsText("/tmp/md_streaming.txt", FileSystem.WriteMode.OVERWRITE) [error]^ [error] one error found Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 3 Jul 2018 at 03:16, Hequn Cheng wrote: > Hi Mich, > > It seems the writeMode has not been set correctly. Have you ever tried > >> .writeAsText("/tmp/md_streaming.txt", FileSystem.WriteMode.OVERWRITE); > > > On Mon, Jul 2, 2018 at 10:44 PM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Flink 1.5 >> >> This streaming data written to a file >> >> val stream = env >> .addSource(new FlinkKafkaConsumer09[String]("md", new >> SimpleStringSchema(), properties)) >> .writeAsText("/tmp/md_streaming.txt") >> env.execute("Flink Kafka Example") >> >> The error states >> >> Caused by: java.io.IOException: File or directory /tmp/md_streaming.txt >> already exists. Existing files and directories are not overwritten in >> NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and >> directories. >> >> Is there any append in writeAsText? I tried OVERWRITE but did not work. >> >> Thanks >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> > >
Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign
KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:482) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:171) 2018-07-02 21:38:38,709 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Error while closing Kafka consumer java.lang.NullPointerException at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:286) 2018-07-02 21:38:38,710 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b) switched from RUNNING to FAILED. java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) 2018-07-02 21:38:38,713 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160dd11e09ba05b). 2018-07-02 21:38:38,713 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Unnamed (1/1) (bcb46879e709768c9160d Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 2 Jul 2018 at 20:59, Ted Yu wrote: > From flink-connector-kafka-0.11 dependency, we know the version of Kafka > used is (flink-connectors/flink-connector-kafka-0.11/pom.xml): > > 0.11.0.2 > From Kafka side, you specified 1.1.0 > > I think these versions produce what you experienced. > If you use Kafka 0.11, this problem should go away. > > On Mon, Jul 2, 2018 at 11:24 AM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Hi, >> >> This is the code >> >> import java.util.Properties >> import java.util.Arrays >> import org.apache.flink.api.common.functions.MapFunction >> import org.apache.flink.api.java.utils.ParameterTool >> import org.apache.flink.streaming.api.datastream.DataStream >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >> import org.apache.flink.streaming.util.serialization.DeserializationSchema >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >> import org.apache.kafka.clients.consumer.ConsumerConfig; >> import org.apache.kafka.clients.consumer.ConsumerRecord; >> import org.apache.kafka.clients.consumer.ConsumerRecords; >> import org.apache.kafka.clients.consumer.KafkaConsumer; >> >> object md_streaming >> { >> def main(args: Array[String]) >> { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val properties = new Properties() >> properties.setProperty("bootstrap.servers", "rhes75:9092") >> properties.setProperty("zookeeper.connect", "rhes75:2181") >> properties.setProperty("group.id", "md_streaming") >> val stream = env >> .addSource(new FlinkKafkaConsumer09[String]("md", new >> SimpleStringSchema(), properties)) >> .writeAsText("/tmp/md_streaming.txt") >> env.execute("Flink Kafka Example") >> } >> >> and this is the sbt dependencies >> >> libraryDependencies += "org.apache.flink" %% >> "flink-connector-kafka-0.11" % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" >> % "1.5.0" >> libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" >> libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0" >> libraryDependencies += "org.apache.flink&q
Re: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign
Hi, This is the code import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; object md_streaming { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "rhes75:9092") properties.setProperty("zookeeper.connect", "rhes75:2181") properties.setProperty("group.id", "md_streaming") val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("/tmp/md_streaming.txt") env.execute("Flink Kafka Example") } and this is the sbt dependencies libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-0.11" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-connector-kafka-base" % "1.5.0" libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0" libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.5.0" libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 2 Jul 2018 at 17:45, Ted Yu wrote: > Here is the signature of assign : > > public void assign(Collection partitions) { > > Looks like RestClusterClient was built against one version of Kafka but > runs against a different version. > > Please check the sbt dependency and the version of Kafka jar on the > classpath. > > Thanks > > On Mon, Jul 2, 2018 at 9:35 AM, Mich Talebzadeh > wrote: > >> Have you seen this error by any chance in flink streaming with Kafka >> please? >> >> org.apache.flink.client.program.ProgramInvocationException: >> java.lang.NoSuchMethodError: >> org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V >> at >> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >> at md_streaming$.main(md_streaming.scala:30) >> at md_streaming.main(md_streaming.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) >> at >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) >>
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign
Have you seen this error by any chance in flink streaming with Kafka please? org.apache.flink.client.program.ProgramInvocationException: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at md_streaming$.main(md_streaming.scala:30) at md_streaming.main(md_streaming.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243) thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.
Flink 1.5 This streaming data written to a file val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("/tmp/md_streaming.txt") env.execute("Flink Kafka Example") The error states Caused by: java.io.IOException: File or directory /tmp/md_streaming.txt already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories. Is there any append in writeAsText? I tried OVERWRITE but did not work. Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
run time error java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09
nk.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) Do I need to pass additional classes? I suspect the jar file is not complete! Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: compiling flink job with maven is failing with error: object flink is not a member of package org.apache
Thanks Just to do a test I have under root directory md_streaming the following hduser@rhes75: /home/hduser/dba/bin/flink/md_streaming> ltr total 24 drwxr-xr-x 3 hduser hadoop 4096 Jun 30 16:10 src drwxr-xr-x 4 hduser hadoop 4096 Jun 30 16:13 .. -rw-r--r-- 1 hduser hadoop 7732 Jul 1 22:40 pom.xml drwxr-xr-x 4 hduser hadoop 4096 Jul 1 22:43 . drwxr-xr-x 4 hduser hadoop 4096 Jul 2 03:35 target My source scala file is here ltr src/main/scala/md_streaming.scala -rw-r--r-- 1 hduser hadoop 1355 Jul 1 21:30 src/main/scala/md_streaming.scala and I have attached my pom xml Now when I run mvn package I get no source hduser@rhes75: /home/hduser/dba/bin/flink/md_streaming> mvn package [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building md_streaming 1.5.0 [INFO] [INFO] [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ md_streaming --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /home/hduser/dba/bin/flink/md_streaming/src/main/resources [INFO] [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ md_streaming --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ md_streaming --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /home/hduser/dba/bin/flink/md_streaming/src/test/resources [INFO] [INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ md_streaming --- [INFO] No sources to compile [INFO] [INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ md_streaming --- [INFO] No tests to run. [INFO] [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ md_streaming --- [WARNING] JAR will be empty - no content was marked for inclusion! [INFO] [INFO] --- maven-shade-plugin:3.0.0:shade (default) @ md_streaming --- [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.7 from the shaded jar. [INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar. [INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar. [INFO] Replacing original artifact with shaded artifact. [INFO] Replacing /home/hduser/dba/bin/flink/md_streaming/target/md_streaming-1.5.0.jar with /home/hduser/dba/bin/flink/md_streaming/target/md_streaming-1.5.0-shaded.jar [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 0.586 s [INFO] Finished at: 2018-07-02T03:44:32+01:00 [INFO] Final Memory: 26M/962M [INFO] So I guess my pom.xml is incorrect! Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Mon, 2 Jul 2018 at 03:12, zhangminglei <18717838...@163.com> wrote: > Hi, Mich > > From your jar contents, it seems not correct. There is not md_streaming > class in your jar. You should put this to your jar file and kafka lib also. > By the way, you need to use maven-shade-plugin or maven-assembly-plugin to > help you. > > Cheers > Minglei > > 在 2018年7月2日,上午2:18,Mich Talebzadeh 写道: > > Hi, > > I am still not there. > > This is the simple Scala program that I created a jar file for using mvn > > import java.util.Properties > import java.util.Arrays > import org.apache.flink.api.common.functions.MapFunction > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.datastream.DataStream > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 > import org.apache.flink.streaming.util.serialization.SimpleStringSchema > import org.apache.flink.streaming.util.serialization.DeserializationSchema > import org.apache.flink.streaming.util.serialization.SimpleStringSchema > object md_streaming > { > def main(args: Array[String]) { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val properties = new Properties() >//val kafkaParams = Map[String, String]("bootstrap.servers" -> > "rhes75:9092", "schema.registry.url&quo
Re: compiling flink job with maven is failing with error: object flink is not a member of package org.apache
Hi, I am still not there. This is the simple Scala program that I created a jar file for using mvn import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema import org.apache.flink.streaming.util.serialization.SimpleStringSchema object md_streaming { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() //val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes75:9092", "schema.registry.url" -> "http://rhes75:8081;, "zookeeper.connect" -> "rhes75:2181", "group.id" -> flinkAppName ) properties.setProperty("bootstrap.servers", "rhes75:9092") properties.setProperty("zookeeper.connect", "rhes75:2181") properties.setProperty("group.id", "md_streaming") val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .writeAsText("/tmp/md_streaming.txt") env.execute("Flink Kafka Example") } } } Compiles OK and creates this jar file jar tvf /home/hduser/dba/bin/flink/md_streaming/target/md_streaming-1.5.0.jar 157 Sun Jul 01 18:57:46 BST 2018 META-INF/MANIFEST.MF 0 Sun Jul 01 18:57:46 BST 2018 META-INF/ 0 Sun Jul 01 18:57:46 BST 2018 META-INF/maven/ 0 Sun Jul 01 18:57:46 BST 2018 META-INF/maven/flink/ 0 Sun Jul 01 18:57:46 BST 2018 META-INF/maven/flink/md_streaming/ 7641 Sun Jul 01 18:57:34 BST 2018 META-INF/maven/flink/md_streaming/pom.xml 102 Sun Jul 01 18:08:54 BST 2018 META-INF/maven/flink/md_streaming/pom.properties And I try to run is as below bin/flink run /home/hduser/dba/bin/flink/md_streaming/target/md_streaming-1.5.0.jar The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: *The program's entry point class 'md_streaming' was not found in the jar file.* at org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:616) at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:199) at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128) at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:833) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:201) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: java.lang.ClassNotFoundException: md_streaming at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:613) Any ideas? Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 1 Jul 2018 at 18:35, Mich Talebzadeh wrote: > apologies Jorn > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebza
Re: compiling flink job with maven is failing with error: object flink is not a member of package org.apache
Thanks Franke. That did the trick! [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.7 from the shaded jar. [INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar. [INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar. [DEBUG] Processing JAR /home/hduser/dba/bin/flink/md_streaming/target/maven-compiler-plugin-1.5.0.jar [INFO] Replacing original artifact with shaded artifact. [INFO] Replacing /home/hduser/dba/bin/flink/md_streaming/target/maven-compiler-plugin-1.5.0.jar with /home/hduser/dba/bin/flink/md_streaming/target/maven-compiler-plugin-1.5.0-shaded.jar [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 12.868 s [INFO] Finished at: 2018-07-01T17:35:33+01:00 [INFO] Final Memory: 19M/736M [INFO] Completed compiling Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 1 Jul 2018 at 17:26, Jörn Franke wrote: > Shouldn’t it be 1.5.0 instead of 1.5? > > On 1. Jul 2018, at 18:10, Mich Talebzadeh > wrote: > > Ok some clumsy work by me not creating the pom.xml file in flink > sub-directory (it was putting it in spark)! > > Anyhow this is the current issue I am facing > > [INFO] > > [INFO] BUILD FAILURE > [INFO] > > [INFO] Total time: 0.857 s > [INFO] Finished at: 2018-07-01T17:07:10+01:00 > [INFO] Final Memory: 22M/962M > [INFO] > > [ERROR] Failed to execute goal on project maven-compiler-plugin: Could not > resolve dependencies for project flink:maven-compiler-plugin:jar:1.5: The > following artifacts could not be resolved: > org.apache.flink:flink-java:jar:1.5, org > .apache.flink:flink-streaming-java_2.11:jar:1.5: Could not find artifact > org.apache.flink:flink-java:jar:1.5 in central ( > https://repo.maven.apache.org/maven2) -> [Help 1] > org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute > goal on project maven-compiler-plugin: Could not resolve dependencies for > project flink:maven-compiler-plugin:jar:1.5: The following artifacts could > not be re > solved: org.apache.flink:flink-java:jar:1.5, > org.apache.flink:flink-streaming-java_2.11:jar:1.5: Could not find artifact > org.apache.flink:flink-java:jar:1.5 in central ( > https://repo.maven.apache.org/maven2) > > FYI I remove ~/.m2 directory to get rid of anything cached! > > Thanks > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 1 Jul 2018 at 15:07, zhangminglei <18717838...@163.com> wrote: > >> Hi, Mich >> >> [WARNING] Expected all dependencies to require Scala version: 2.10.4 >> [WARNING] spark:scala:1.0 requires scala version: 2.11.7 >> [WARNING] Multiple versions of scala libraries detected! >> >> >> I think you should make your scala version to 2.11 first. And try again. >> >> Cheers >> Minglei >> >> 在 2018年7月1日,下午9:24,Mich Talebzadeh 写道: >> >> Hi Minglei, >> >> Many thanks >> >> My flink version is 1.5 >> >> This is the pom.xml from GitHub as suggested >> >> >> http://maven.apache.org/POM/4.0.0; xmlns:xsi=" >> http://www.w3.org/2001/XMLSchema-instance; >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/xsd/maven-4.0.0.xsd;> >> 4.0.0
Re: compiling flink job with maven is failing with error: object flink is not a member of package org.apache
Ok some clumsy work by me not creating the pom.xml file in flink sub-directory (it was putting it in spark)! Anyhow this is the current issue I am facing [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 0.857 s [INFO] Finished at: 2018-07-01T17:07:10+01:00 [INFO] Final Memory: 22M/962M [INFO] [ERROR] Failed to execute goal on project maven-compiler-plugin: Could not resolve dependencies for project flink:maven-compiler-plugin:jar:1.5: The following artifacts could not be resolved: org.apache.flink:flink-java:jar:1.5, org .apache.flink:flink-streaming-java_2.11:jar:1.5: Could not find artifact org.apache.flink:flink-java:jar:1.5 in central ( https://repo.maven.apache.org/maven2) -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal on project maven-compiler-plugin: Could not resolve dependencies for project flink:maven-compiler-plugin:jar:1.5: The following artifacts could not be re solved: org.apache.flink:flink-java:jar:1.5, org.apache.flink:flink-streaming-java_2.11:jar:1.5: Could not find artifact org.apache.flink:flink-java:jar:1.5 in central ( https://repo.maven.apache.org/maven2) FYI I remove ~/.m2 directory to get rid of anything cached! Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 1 Jul 2018 at 15:07, zhangminglei <18717838...@163.com> wrote: > Hi, Mich > > [WARNING] Expected all dependencies to require Scala version: 2.10.4 > [WARNING] spark:scala:1.0 requires scala version: 2.11.7 > [WARNING] Multiple versions of scala libraries detected! > > > I think you should make your scala version to 2.11 first. And try again. > > Cheers > Minglei > > 在 2018年7月1日,下午9:24,Mich Talebzadeh 写道: > > Hi Minglei, > > Many thanks > > My flink version is 1.5 > > This is the pom.xml from GitHub as suggested > > > http://maven.apache.org/POM/4.0.0; xmlns:xsi=" > http://www.w3.org/2001/XMLSchema-instance; > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd;> > 4.0.0 > ${groupId} > ${artifactId} > 1.5 > jar > Flink Quickstart Job > http://www.myorganization.org > > > UTF-8 > @project.version@ > 1.8 > 2.11 > 2.11 > 2.11 > > > > apache.snapshots > Apache Development Snapshot Repository > > https://repository.apache.org/content/repositories/snapshots/ > > false > > > true > > > > > > > > org.apache.flink > flink-java > 1.5 > provided > > > org.apache.flink > flink-streaming-java_2.11 > 1.5 > provided > > > > > > > org.slf4j > slf4j-log4j12 > 1.7.7 > runtime > > > log4j > log4j > 1.2.17 > runtime > > > > > > > org.apache.maven.plugins > > maven-compiler-plugin > 3.1 > >
Re: compiling flink job with maven is failing with error: object flink is not a member of package org.apache
.apache.flink flink-java 1.5 compile org.apache.flink flink-streaming-java_2.11 1.5 compile But I am still getting the same errors for input [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building scala 1.0 [INFO] [INFO] [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ scala --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /home/hduser/dba/bin/flink/md_streaming/src/main/resources [INFO] [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ scala --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] --- maven-scala-plugin:2.15.2:compile (default) @ scala --- [INFO] Checking for multiple versions of scala [WARNING] Expected all dependencies to require Scala version: 2.10.4 [WARNING] spark:scala:1.0 requires scala version: 2.11.7 [WARNING] Multiple versions of scala libraries detected! [INFO] includes = [**/*.java,**/*.scala,] [INFO] excludes = [] [INFO] /home/hduser/dba/bin/flink/md_streaming/src/main/scala:-1: info: compiling [INFO] Compiling 1 source files to /home/hduser/dba/bin/flink/md_streaming/target/classes at 1530451461171 [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:3: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.api.common.functions.MapFunction [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:4: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.api.java.utils.ParameterTool [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:5: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.api.datastream.DataStream [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:6: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:7: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:8: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.util.serialization.SimpleStringSchema [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:9: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.util.serialization.DeserializationSchema [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:10: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.util.serialization.SimpleStringSchema [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:18: error: not found: value StreamExecutionEnvironment [INFO] val env = StreamExecutionEnvironment.getExecutionEnvironment [INFO] ^ [ERROR] 9 errors found [INFO] [INFO] BUILD FAILURE Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 1 Jul 2018 at 14:07, zhangminglei <18717838...@163.com> wrote: > Hi, Mich. > > Is there a basic MVN pom file for flink? The default one from GitHub does > not seem to be working! > > > Please take a look on > https://github.com/apache/flink/blob/master/flink-quickstart/flink-quickstart-java/src/main/resources
compiling flink job with maven is failing with error: object flink is not a member of package org.apache
I have done many times with sbt or maven for spark streaming. Trying to compile a simple program that compiles ok in flink-scala.sh The imports are as follows import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema import org.apache.flink.streaming.util.serialization.SimpleStringSchema with adding to classpath the following jars it compiles flink-connector-kafka-0.9_2.11-1.5.0.jar flink-connector-kafka-base_2.11-1.5.0.jar I guess my pom.xml is incorrect. I have added these two dependencies to the pom.xml file org.apache.flink flink-streaming-java_2.11 1.4.2 provided org.apache.flink flink-core 1.5.0 However, I am getting these basic errors! [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:4: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.api.java.utils.ParameterTool [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:5: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.api.datastream.DataStream [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:6: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:7: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:8: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.util.serialization.SimpleStringSchema [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:9: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.util.serialization.DeserializationSchema [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:10: error: object flink is not a member of package org.apache [INFO] import org.apache.flink.streaming.util.serialization.SimpleStringSchema [INFO] ^ [ERROR] /home/hduser/dba/bin/flink/md_streaming/src/main/scala/myPackage/md_streaming.scala:18: error: not found: value StreamExecutionEnvironment [INFO] val env = StreamExecutionEnvironment.getExecutionEnvironment Is there a basic MVN pom file for flink? The default one from GitHub does not seem to be working! Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: error: object connectors is not a member of package org.apache.flink.streaming
Thanks Rong This worked. $FLINK_HOME/bin/start-scala-shell.sh local --addclasspath /home/hduser/jars/flink-connector-kafka-0.9_2.11-1.5.0.jar:/home/hduser/jars/flink-connector-kafka-base_2.11-1.5.0.jar Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sat, 30 Jun 2018 at 17:00, Rong Rong wrote: > Hi Mich, > > Ted is correct, Flink release binary does not include any connectors and > you will have to include the appropriate connector version. This is to > avoid dependency conflicts between different Kafka releases. > > You probably need the specific Kafka connector version jar file as well, > so in your case since you are using the scala shell. The following command > should work: > ./bin/start-scala-shell.sh --addclasspath > ": base_2.11>" > > -- > Rong > > On Sat, Jun 30, 2018 at 1:11 AM Ted Yu wrote: > >> Please add flink-connector-kafka-base_2.11 jar to the classpath. >> >> On Sat, Jun 30, 2018 at 1:06 AM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> >>> Great Ted added that jar file to the classpath >>> >>> Running this code >>> >>> import org.apache.flink.streaming.api.scala._ >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082 >>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema >>> import java.util.Properties >>> object Main { >>> def main(args: Array[String]) { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> val properties = new Properties() >>> properties.setProperty("bootstrap.servers", "localhost:9092") >>> properties.setProperty("zookeeper.connect", "localhost:2181") >>> properties.setProperty("group.id", "test") >>> val stream = env >>> .addSource(new FlinkKafkaConsumer082[String]("md", new >>> SimpleStringSchema(), properties)) >>> .print >>> env.execute("Flink Kafka Example") >>> } >>> } >>> >>> I am getting this error now >>> >>> :77: error: Class >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase not >>> found - continuing with a stub. >>> .addSource(new FlinkKafkaConsumer082[String]("md", new >>> SimpleStringSchema(), properties)) >>> ^ >>> :77: error: overloaded method value addSource with alternatives: >>> [T](function: >>> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] >>> => Unit)(implicit evidence$10: >>> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] >>> >>> [T](function: >>> org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit >>> evidence$9: >>> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] >>> cannot be applied to >>> (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String]) >>> .addSource(new FlinkKafkaConsumer082[String]("md", new >>> SimpleStringSchema(), properties)) >>> >>> any ideas please? >>> >>> Regards, >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >&g
Displaying topic data with Flink streaming
Hi, I have a streaming topic called "md" that displays test market data. I have written a simple program to stream data in via kafka into flinl. Flink version 1.5 Kafka version 2.12 This is the sample program in scala that compiles ok in start-scala-shell.sh import java.util.Properties import java.util.Arrays import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.util.serialization.DeserializationSchema //import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.util.serialization.SimpleStringSchema object Main { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "sampleScala") val stream = env .addSource(new FlinkKafkaConsumer09[String]("md", new SimpleStringSchema(), properties)) .print() env.execute("Flink Kafka Example") } } warning: there was one deprecation warning; re-run with -deprecation for details defined object Main But I do not see any streaming output. A naïve question. How do I execute the above compiled object in this shell? Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
error: object connectors is not a member of package org.apache.flink.streaming
I am following this Flink Kafka example https://stackoverflow.com/questions/31446374/can-anyone-share-a-flink-kafka-example-in-scala This is my edited program. I am using Flink 1.5 in flink-scala shell import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import java.util.Properties But I am getting this error scala> import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala._ scala> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082 :76: error: object connectors is not a member of package org.apache.flink.streaming import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082 any reason I am getting this error? Are the jar files missing? Cab one add jar files as parameters to* start-scala-shell.sh local* Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: First try running I am getting org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.
As it turned out in the application log, it could mot find yarn configuraration! not anything to do with port Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.conf.YarnConfiguration. I had installed flink *w**ithout bundled Hadoop* and my version of Hadoop is 3.1 I went back and downloaded flink built with Hadoop 2.8 and that worked Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 29 Jun 2018 at 14:57, Hequn Cheng wrote: > port should be consistent. > > 1> nc -l 2219 > 2>./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2219 > > On Fri, Jun 29, 2018 at 9:21 PM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> thanks Hequn. >> >> This the port I started with >> >> hduser@rhes75: /data6/hduser/flink-1.5.0> nc -l 2219 >> hello >> >> and as I expected I should collect from port 2219? However, I did what >> you suggested >> >> ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2199 >> Starting execution of program >> >> The program finished with the following exception: >> org.apache.flink.client.program.ProgramInvocationException: Could not >> retrieve the execution result. >> >> unfortunately I am getting the same error. >> >> Cheers >> >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Fri, 29 Jun 2018 at 14:11, Hequn Cheng wrote: >> >>> Hi Mich, >>> >>> You port is not matching. Start netcat with "nc -l 2219 &", but run >>> flink job with "--port 2199". >>> >>> >>> >>> >>> On Fri, Jun 29, 2018 at 8:40 PM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have installed flink 1.5 in standalone mode and trying a basic run as >>>> per this example >>>> >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html >>>> >>>> started netcat on port 2199 >>>> >>>> nc -l 2219 & >>>> >>>> Run the example >>>> >>>> ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2199 >>>> Starting execution of program >>>> >>>> The program finished with the following exception: >>>> org.apache.flink.client.program.ProgramInvocationException: Could not >>>> retrieve the execution result. >>>> at >>>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258) >>>> at >>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>>> at >>>> org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>>
Re: First try running I am getting org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.
thanks Hequn. This the port I started with hduser@rhes75: /data6/hduser/flink-1.5.0> nc -l 2219 hello and as I expected I should collect from port 2219? However, I did what you suggested ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2199 Starting execution of program The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. unfortunately I am getting the same error. Cheers Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Fri, 29 Jun 2018 at 14:11, Hequn Cheng wrote: > Hi Mich, > > You port is not matching. Start netcat with "nc -l 2219 &", but run flink > job with "--port 2199". > > > > > On Fri, Jun 29, 2018 at 8:40 PM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Hi, >> >> I have installed flink 1.5 in standalone mode and trying a basic run as >> per this example >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html >> >> started netcat on port 2199 >> >> nc -l 2219 & >> >> Run the example >> >> ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2199 >> Starting execution of program >> >> The program finished with the following exception: >> org.apache.flink.client.program.ProgramInvocationException: Could not >> retrieve the execution result. >> at >> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >> at >> org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) >> at >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) >> at >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) >> at >> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> at >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) >> Caused by: org.apache.flink.runtime.client.JobSubmissionException: >> Failed to submit JobGraph. >> >> Appreciate any suggestions. >> >> Thanks >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> > >
First try running I am getting org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.
Hi, I have installed flink 1.5 in standalone mode and trying a basic run as per this example https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html started netcat on port 2199 nc -l 2219 & Run the example ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 2199 Starting execution of program The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. Appreciate any suggestions. Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Testing Kafka interface using Flink interactive shell
Thanks Chiwan. It worked. Now I have this simple streaming program in Spark Scala that gets streaming data via Kafka. It is pretty simple. Please see attached. I am trying to make it work with Flink + Kafka Any hints will be appreciated. Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 18 April 2016 at 02:43, Chiwan Park <chiwanp...@apache.org> wrote: > Hi Mich, > > You can add external dependencies to Scala shell using `--addclasspath` > option. There is more detail description in documentation [1]. > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/scala_shell.html#adding-external-dependencies > > Regards, > Chiwan Park > > > On Apr 17, 2016, at 6:04 PM, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > > > > Hi, > > > > IN Spark shell I can load Kafka jar file through spark-shell option --jar > > > > spark-shell --master spark://50.140.197.217:7077 --jars > ,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar > > > > This works fine. > > > > In Flink I have added the jar file > /home/hduser/jars/flink-connector-kafka-0.10.1.jar to the CLASSPATH. > > > > However I don't get any support for it within flink shell > > > > Scala-Flink> import org.apache.flink.streaming.connectors.kafka > > :54: error: object connectors is not a member of package > org.apache.flink.streaming > > import org.apache.flink.streaming.connectors.kafka > > > > > > Any ideas will be appreciated > > ^ > > > > Dr Mich Talebzadeh > > > > LinkedIn > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > http://talebzadehmich.wordpress.com > > > > import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ import _root_.kafka.serializer.StringDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils // object TestStream_assembly { def main(args: Array[String]) { val conf = new SparkConf(). setAppName("TestStream_assembly"). setMaster("local[2]"). set("spark.driver.allowMultipleContexts", "true"). set("spark.hadoop.validateOutputSpecs", "false") val sc = new SparkContext(conf) // Create sqlContext based on HiveContext val sqlContext = new HiveContext(sc) import sqlContext.implicits._ val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val ssc = new StreamingContext(conf, Seconds(55)) val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) val topic = Set("newtopic") val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic) messages.cache() // // Get the lines // val lines = messages.map(_._2) // Check for message val showResults = lines.filter(_.contains("Sending messages")).flatMap(line => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000) ssc.start() ssc.awaitTermination() //ssc.stop() } }
Testing Kafka interface using Flink interactive shell
Hi, IN Spark shell I can load Kafka jar file through spark-shell option --jar spark-shell --master spark://50.140.197.217:7077 --jars ,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar This works fine. In Flink I have added the jar file /home/hduser/jars/flink-connector-kafka-0.10.1.jar to the CLASSPATH. However I don't get any support for it within flink shell Scala-Flink> import org.apache.flink.streaming.connectors.kafka :54: error: object connectors is not a member of package org.apache.flink.streaming import org.apache.flink.streaming.connectors.kafka Any ideas will be appreciated ^ Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com
Re: Flink support for Scala
Hi, I just found out it does Scala-Flink> thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 16 April 2016 at 17:22, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > > Hi, > > Just downloaded Flink and coming from Spark and Hive background. > > My interest for Flink is its support for Complex Event Processing (CEP) > libraries. > > Couple of questions if I may > > 1) is there a quick start guide for Splink (I am not referring to a simple > example that comes with quick start guide > > > https://ci.apache.org/projects/flink/flink-docs-release-0.8/setup_quickstart.html > 2) Does Flink support Scala language or what is its default programming > language if any. > > Thanks > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > >
Flink support for Scala
Hi, Just downloaded Flink and coming from Spark and Hive background. My interest for Flink is its support for Complex Event Processing (CEP) libraries. Couple of questions if I may 1) is there a quick start guide for Splink (I am not referring to a simple example that comes with quick start guide https://ci.apache.org/projects/flink/flink-docs-release-0.8/setup_quickstart.html 2) Does Flink support Scala language or what is its default programming language if any. Thanks Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com