[ https://issues.apache.org/jira/browse/IGNITE-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16545977#comment-16545977 ]
Ray commented on IGNITE-8697: ----------------------------- [~samaitra] I tried your newest code and wrote a simple word count application to test the sink. It appears there's still problems. Here's my code. import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.extensions._ import org.apache.flink.configuration.Configuration import org.apache.ignite.Ignition import org.apache.ignite.configuration.CacheConfiguration import scala.collection.JavaConverters._ object WordCount { def main(args: Array[String]) { val ignite = Ignition.start("ignite.xml") val cacheConfig = new CacheConfiguration[Any, Any]() ignite.destroyCache("aaa") cacheConfig.setName("aaa") cacheConfig.setSqlSchema("PUBLIC") ignite.createCache(cacheConfig) ignite.close() // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val igniteSink = new IgniteSink[java.util.Map[String, Int]]("aaa", "ignite.xml") igniteSink.setAllowOverwrite(false) igniteSink.setAutoFlushFrequency(1) igniteSink.open(new Configuration) // get input data val text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") val counts = text // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap(_.toLowerCase.split("\\W+")) .filter(_.nonEmpty) .map((_, 1)) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1) // Convert to key/value format before ingesting to Ignite .mapWith \{ case (k: String, v: Int) => Map(k -> v).asJava } .addSink(igniteSink) try env.execute("Streaming WordCount1") catch { case e: Exception => // Exception handling. } finally igniteSink.close() } } I tried running this application in Idea and the error log snippet is as follows 07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched to FAILED class org.apache.ignite.IgniteException: Default Ignite instance has already been started. at org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990) at org.apache.ignite.Ignition.start(Ignition.java:355) at IgniteSink.open(IgniteSink.java:135) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite instance has already been started. at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134) at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693) at org.apache.ignite.Ignition.start(Ignition.java:352) ... 7 more 07/16/2018 11:05:30 Job execution switched to status FAILING. > Flink sink throws java.lang.IllegalArgumentException when running in flink > cluster mode. > ---------------------------------------------------------------------------------------- > > Key: IGNITE-8697 > URL: https://issues.apache.org/jira/browse/IGNITE-8697 > Project: Ignite > Issue Type: Bug > Affects Versions: 2.3, 2.4, 2.5 > Reporter: Ray > Priority: Blocker > > if I submit the Application to the Flink Cluster using Ignite flink sink I > get this error > > java.lang.ExceptionInInitializerError > at > org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201) > at > org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175) > at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97) > at > org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalArgumentException: Ouch! Argument is invalid: > Cache name must not be null or empty. > at > org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109) > at > org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581) > at > org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284) > at > org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.<clinit>(IgniteSink.java:183) > ... 27 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)