Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
Hello Saikat, Thanks for the fix. I validated this fix by running my WordCount application in both standalone mode and cluster mode. The data can be inserted. But I found another problem here. The data written into Ignite is not correct. My application counts the word occurrence in this the following sentence. "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles, The count of word "to" should be 9. But when I check the result in Ignite, all the values of every word is 1. Clearly it's wrong. The reproducer program is the same as I attached in the JIRA ticket. Please let me know if you can reproduce this issue. -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
Hi Ray, Andrew As discussed I have fixed the issue with IgniteSink when running in cluster mode. Please review the below PR and share feedback. PR : https://github.com/apache/ignite/pull/4398 Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695 Regards, Saikat On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra wrote: > Hi Ray, > > Thank you for validating the changes, I see that in cluster mode when I am > checking the IgniteSink it is working as desired. In stand alone mode I can > see we are getting the exception class org.apache.ignite.IgniteException: > Default Ignite instance has already been started. > > Please take a look into this sample application https://github. > com/samaitra/streamers which I used to run it with flink in cluster mode. > > I am considering if I should make changes to run the IgniteSink in client > mode similar to the ways flink connector for redis and flume were > implemented in Apache Bahir > > https://github.com/apache/bahir-flink > > I will share update soon. > > Regards, > Saikat > > > > On Sun, Jul 15, 2018 at 10:07 PM, Ray wrote: > >> Hello Saikat, >> >> I tried your newest code and wrote a simple word count application to test >> the sink. >> It appears there's still problems. >> Here's my code. >> >> >> >> import org.apache.flink.api.scala._ >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >> import org.apache.flink.streaming.api.scala.extensions._ >> import org.apache.flink.configuration.Configuration >> import org.apache.ignite.Ignition >> import org.apache.ignite.configuration.CacheConfiguration >> >> import scala.collection.JavaConverters._ >> >> >> object WordCount { >> >> def main(args: Array[String]) { >> >> val ignite = Ignition.start("ignite.xml") >> val cacheConfig = new CacheConfiguration[Any, Any]() >> ignite.destroyCache("aaa") >> cacheConfig.setName("aaa") >> cacheConfig.setSqlSchema("PUBLIC") >> ignite.createCache(cacheConfig) >> ignite.close() >> >> >> // set up the execution environment >> val env = StreamExecutionEnvironment.get >> ExecutionEnvironment >> >> val igniteSink = new IgniteSink[java.util.Map[String, >> Int]]("aaa", >> "ignite.xml") >> >> igniteSink.setAllowOverwrite(false) >> igniteSink.setAutoFlushFrequency(1) >> >> igniteSink.open(new Configuration) >> >> >> // get input data >> val text = env.fromElements( >> "To be, or not to be,--that is the question:--", >> "Whether 'tis nobler in the mind to suffer", >> "The slings and arrows of outrageous fortune", >> "Or to take arms against a sea of troubles,") >> >> >> val counts = text >> // split up the lines in pairs (2-tuples) >> containing: (word,1) >> .flatMap(_.toLowerCase.split("\\W+")) >> .filter(_.nonEmpty) >> .map((_, 1)) >> // group by the tuple field "0" and sum up tuple >> field "1" >> .keyBy(0) >> .sum(1) >> // Convert to key/value format before ingesting >> to Ignite >> .mapWith { case (k: String, v: Int) => Map(k -> >> v).asJava } >> .addSink(igniteSink) >> >> try >> env.execute("Streaming WordCount1") >> catch { >> case e: Exception => >> >> // Exception handling. >> } finally igniteSink.close() >> >> } >> } >> >> I tried running this application in Idea and the error log snippet is as >> follows >> >> 07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched >> to >> FAILED >> class org.apache.ignite.IgniteException: Default Ignite instance has >> already >> been started. >> at >> org.apache.ignite.internal.util.IgniteUtils.convertException >> (IgniteUtils.java:990) >> at org.apache.ignite.Ignition.start(Ignition.java:355) >> at IgniteSink.open(IgniteSink.java:135) >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.ope >> nFunction(FunctionUtils.java:36) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp >> erator.open(AbstractUdfStreamOperator.java:111) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO >> perators(StreamTask.java:376) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:253) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: class org.apache.ignite.Ign
Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
Hi Ray, Thank you for validating the changes, I see that in cluster mode when I am checking the IgniteSink it is working as desired. In stand alone mode I can see we are getting the exception class org.apache.ignite.IgniteException: Default Ignite instance has already been started. Please take a look into this sample application https://github.com/samaitra/streamers which I used to run it with flink in cluster mode. I am considering if I should make changes to run the IgniteSink in client mode similar to the ways flink connector for redis and flume were implemented in Apache Bahir https://github.com/apache/bahir-flink I will share update soon. Regards, Saikat On Sun, Jul 15, 2018 at 10:07 PM, Ray wrote: > Hello Saikat, > > I tried your newest code and wrote a simple word count application to test > the sink. > It appears there's still problems. > Here's my code. > > > > import org.apache.flink.api.scala._ > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.api.scala.extensions._ > import org.apache.flink.configuration.Configuration > import org.apache.ignite.Ignition > import org.apache.ignite.configuration.CacheConfiguration > > import scala.collection.JavaConverters._ > > > object WordCount { > > def main(args: Array[String]) { > > val ignite = Ignition.start("ignite.xml") > val cacheConfig = new CacheConfiguration[Any, Any]() > ignite.destroyCache("aaa") > cacheConfig.setName("aaa") > cacheConfig.setSqlSchema("PUBLIC") > ignite.createCache(cacheConfig) > ignite.close() > > > // set up the execution environment > val env = StreamExecutionEnvironment. > getExecutionEnvironment > > val igniteSink = new IgniteSink[java.util.Map[String, > Int]]("aaa", > "ignite.xml") > > igniteSink.setAllowOverwrite(false) > igniteSink.setAutoFlushFrequency(1) > > igniteSink.open(new Configuration) > > > // get input data > val text = env.fromElements( > "To be, or not to be,--that is the question:--", > "Whether 'tis nobler in the mind to suffer", > "The slings and arrows of outrageous fortune", > "Or to take arms against a sea of troubles,") > > > val counts = text > // split up the lines in pairs (2-tuples) > containing: (word,1) > .flatMap(_.toLowerCase.split("\\W+")) > .filter(_.nonEmpty) > .map((_, 1)) > // group by the tuple field "0" and sum up tuple > field "1" > .keyBy(0) > .sum(1) > // Convert to key/value format before ingesting to > Ignite > .mapWith { case (k: String, v: Int) => Map(k -> > v).asJava } > .addSink(igniteSink) > > try > env.execute("Streaming WordCount1") > catch { > case e: Exception => > > // Exception handling. > } finally igniteSink.close() > > } > } > > I tried running this application in Idea and the error log snippet is as > follows > > 07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched > to > FAILED > class org.apache.ignite.IgniteException: Default Ignite instance has > already > been started. > at > org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils. > java:990) > at org.apache.ignite.Ignition.start(Ignition.java:355) > at IgniteSink.open(IgniteSink.java:135) > at > org.apache.flink.api.common.functions.util.FunctionUtils. > openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open( > AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask. > openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite > instance has already been started. > at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx. > java:1134) > at > org.apache.ignite.internal.IgnitionEx.startConfigurations( > IgnitionEx.java:1069) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx. > java:955) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx. > java:854) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx. > java:7
Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
Hello Saikat, I tried your newest code and wrote a simple word count application to test the sink. It appears there's still problems. Here's my code. import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.extensions._ import org.apache.flink.configuration.Configuration import org.apache.ignite.Ignition import org.apache.ignite.configuration.CacheConfiguration import scala.collection.JavaConverters._ object WordCount { def main(args: Array[String]) { val ignite = Ignition.start("ignite.xml") val cacheConfig = new CacheConfiguration[Any, Any]() ignite.destroyCache("aaa") cacheConfig.setName("aaa") cacheConfig.setSqlSchema("PUBLIC") ignite.createCache(cacheConfig) ignite.close() // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val igniteSink = new IgniteSink[java.util.Map[String, Int]]("aaa", "ignite.xml") igniteSink.setAllowOverwrite(false) igniteSink.setAutoFlushFrequency(1) igniteSink.open(new Configuration) // get input data val text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") val counts = text // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap(_.toLowerCase.split("\\W+")) .filter(_.nonEmpty) .map((_, 1)) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1) // Convert to key/value format before ingesting to Ignite .mapWith { case (k: String, v: Int) => Map(k -> v).asJava } .addSink(igniteSink) try env.execute("Streaming WordCount1") catch { case e: Exception => // Exception handling. } finally igniteSink.close() } } I tried running this application in Idea and the error log snippet is as follows 07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched to FAILED class org.apache.ignite.IgniteException: Default Ignite instance has already been started. at org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990) at org.apache.ignite.Ignition.start(Ignition.java:355) at IgniteSink.open(IgniteSink.java:135) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite instance has already been started. at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134) at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693) at org.apache.ignite.Ignition.start(Ignition.java:352) ... 7 more 07/16/2018 11:05:30 Job execution switched to status FAILING. -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
Hello Ray, I have fixed the issue related to Flink IgniteSink. Please take a look on the changes [https://github.com/samaitra/ignite/blob/IGNITE-8697/ modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java] I am working on the testcases and will raise the PR soon. Regards Saikat On Tue, Jun 5, 2018 at 1:13 AM, Ray wrote: > Yes, the cache is already created before running my flink application. > > The issue can be reproduced when you submit your flink application to your > flink cluster. > > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ >
Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
Yes, the cache is already created before running my flink application. The issue can be reproduced when you submit your flink application to your flink cluster. -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
Hi, When Ignite Sink Data Streamer start it checks if the cache name is already present in the grid before the streaming process can begin. Can you please confirm if cache got created before data sink process get executed Regards, Saikat On Mon, Jun 4, 2018 at 9:24 PM, Ray wrote: > I think it's a code bug in flink sink. > I had this same problem some time ago. > I think it's caused by compiler optimization of variable initialization in > multi thread environment(flink cluster mode). > In this case, the variable "cacheName" is not initialized when being used > because compile will optimize the variable initialize order in multi thread > environment. > > I have created the ticket in jira and assigned to the author of flink sink. > > https://issues.apache.org/jira/browse/IGNITE-8697 > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ >
Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
I think it's a code bug in flink sink. I had this same problem some time ago. I think it's caused by compiler optimization of variable initialization in multi thread environment(flink cluster mode). In this case, the variable "cacheName" is not initialized when being used because compile will optimize the variable initialize order in multi thread environment. I have created the ticket in jira and assigned to the author of flink sink. https://issues.apache.org/jira/browse/IGNITE-8697 -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
Hi, it runs fine in a single VM, but if I build the uber-jar (contains the application and all required ignite dependencies) and submit it to the flink cluster (with master and worker nodes each in different VM's) the cache configuration isn't loaded: 2018-06-01 22:05:30,665 INFO org.apache.ignite.internal.IgniteKernal - Config URL: n/a It seems that both fields in IgniteSink: /** Ignite grid configuration file. */ private static String igniteCfgFile; /** Cache name. */ private static String cacheName; are null. Maybe the Sink isn't initialized correctly after serialization from the master to the worker node? I'm not familiar with the Kryo Serialization used by Apache Flink, but maybe static fields are ignored? If I create my own copy of IgniteSink with hardcoded values private static class Holder { private static final Ignite IGNITE = Ignition.start("flink-config.xml"); private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer("DemoCache"); } then the cache configuration is loaded and everything is working as expected: 2018-06-01 22:28:12,985 INFO org.apache.ignite.internal.IgniteKernal - Config URL: /tmp/blobStore-56228c43-2ed8-46d3-bbf1-631c3ef778b3/job_132b1e78c18e363739fede5fded4214b/blob_p-a85926793530ee5cec5e7c9372ddbfc22020a0d0-d4b791f47ff24933e8d6ff8ccc4fa665!/flink-config.xml - Burt
Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
It is working fine here. Tweaked the main method a bit: public static void main(String[] args) throws Exception { System.setProperty("IGNITE_QUIET", "false"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.getConfig().enableSysoutLogging(); IgniteSink> igniteSink = new IgniteSink<>("DemoCache", "flink-config.xml"); igniteSink.setAllowOverwrite(true); igniteSink.setAutoFlushFrequency(10); igniteSink.start(); System.out.println("\n\nSTARTED SUCCESSFULLY\n\n"); //DataStream text = env.socketTextStream("localhost", 12200); //DataStream> datastream = text.flatMap(new Splitter()); //datastream.addSink(igniteSink); //env.execute("Demo Streamer"); igniteSink.stop(); } I added the following dependency: org.apache.ignite ignite-log4j ${ignite.version} The log: >>> +--+ >>> Ignite ver. 2.5.0#20180523-sha1:86e110c750a340dc9be2d396415f0b80d7ed8813 >>> +--+ >>> OS name: Windows 7 6.1 amd64 >>> CPU(s): 4 >>> Heap: 3.5GB >>> VM name: 13908@WPU8L0031201 >>> Local node [ID=C91AF628-B784-4CB9-8FF9-AA2E0F04D303, order=1, >>> clientMode=false] >>> Local node addresses: [WPU8L0031201.ad.ing.net/0:0:0:0:0:0:0:1, >>> WPU8L0031201.ad.ing.net/127.0.0.1, WPU8L0031201.ad.ing.net/192.168.1.69, >>> /192.168.56.1, /192.168.99.1] >>> Local ports: TCP:10800 TCP:11211 TCP:47100 TCP:47500 17:27:42,928 INFO org.apache.ignite.internal.managers.discovery.GridDiscoveryManager - Topology snapshot [ver=1, servers=1, clients=0, CPUs=4, offheap=5.1GB, heap=3.5GB] 17:27:42,928 INFO org.apache.ignite.internal.managers.discovery.GridDiscoveryManager - ^-- Node [id=C91AF628-B784-4CB9-8FF9-AA2E0F04D303, clusterState=ACTIVE] 17:27:42,928 INFO org.apache.ignite.internal.managers.discovery.GridDiscoveryManager - Data Regions Configured: 17:27:42,930 INFO org.apache.ignite.internal.managers.discovery.GridDiscoveryManager - ^-- default [initSize=256,0 MiB, maxSize=3,1 GiB, persistenceEnabled=false] 17:27:42,930 INFO org.apache.ignite.internal.managers.discovery.GridDiscoveryManager - ^-- Data_Region [initSize=1,0 GiB, maxSize=2,0 GiB, persistenceEnabled=false] STARTED SUCCESSFULLY 17:27:43,105 INFO org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestProtocol - Command protocol successfully stopped: TCP binary 17:27:43,159 INFO org.apache.ignite.internal.processors.cache.GridCacheProcessor - Stopped cache [cacheName=DemoCache] 17:27:43,159 INFO org.apache.ignite.internal.processors.cache.GridCacheProcessor - Stopped cache [cacheName=ignite-sys-cache] 17:27:43,238 INFO org.apache.ignite.internal.IgniteKernal - >>> +-+ >>> Ignite ver. 2.5.0#20180523-sha1:86e110c750a340dc9be2d396415f0b80d7ed8813 >>> stopped OK >>> +-+ >>> Grid uptime: 00:00:00.306 -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Apache Flink Sink + Ignite: Ouch! Argument is invalid
Hi, I'm trying to run an Apache Flink Programm with the flink-ignite Sink. Everything is working fine if I start the Application from my IDE, but if I submit the Application to the Flink Cluster I get this error: java.lang.ExceptionInInitializerError at org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201) at org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175) at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97) at org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Ouch! Argument is invalid: Cache name must not be null or empty. at org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109) at org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581) at org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284) at org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.(IgniteSink.java:183) ... 27 more but the cache name is set: https://github.com/bpark/flink-ignite-demo/blob/master/src/main/java/com/github/bpark/InstrumentStreamer.java and configured: https://github.com/bpark/flink-ignite-demo/blob/master/src/main/resources/flink-config.xml ignite-flink, ignite-spring, ignite-indexing and ignite-core are included in the shaded jar file. The example project is located here: https://github.com/bpark/flink-ignite-demo Any idea what's wrong here? - Burt