[ 
https://issues.apache.org/jira/browse/IGNITE-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16545977#comment-16545977
 ] 

Ray edited comment on IGNITE-8697 at 7/17/18 3:07 AM:
------------------------------------------------------

[~samaitra]

I tried your newest code and wrote a simple word count application to test 
 the sink. 
 It appears there's still problems. 
 Here's my code. 

import org.apache.flink.api.scala._ 
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
 import org.apache.flink.streaming.api.scala.extensions._ 
 import org.apache.flink.configuration.Configuration 
 import org.apache.ignite.Ignition 
 import org.apache.ignite.configuration.CacheConfiguration 

import scala.collection.JavaConverters._ 

object WordCount { 

        def main(args: Array[String])

{                 

val env = StreamExecutionEnvironment.getExecutionEnvironment

val igniteSink = new IgniteSink[java.util.Map[String, Int]]("aaa", "ignite.xml")

igniteSink.setAllowOverwrite(false)
igniteSink.setAutoFlushFrequency(1)

igniteSink.open(new Configuration)


// get input data
val text = env.fromElements(
 "To be, or not to be,--that is the question:--",
 "Whether 'tis nobler in the mind to suffer",
 "The slings and arrows of outrageous fortune",
 "Or to take arms against a sea of troubles,")


val counts = text
 // split up the lines in pairs (2-tuples) containing: (word,1)
 .flatMap(_.toLowerCase.split("\\W+"))
 .filter(_.nonEmpty)
 .map((_, 1))
 // group by the tuple field "0" and sum up tuple field "1"
 .keyBy(0)
 .sum(1)
 // Convert to key/value format before ingesting to Ignite
 .mapWith \{ case (k: String, v: Int) => Map(k -> v).asJava }
 .addSink(igniteSink)

try
 env.execute("Streaming WordCount1")
catch {
 case e: Exception =>

 // Exception handling.
} finally igniteSink.close()

        } 
 } 

I tried running this application in Idea and the error log snippet is as 
 follows 

07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched to 
 FAILED 
 class org.apache.ignite.IgniteException: Default Ignite instance has already 
 been started. 
         at 
 
org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990)
 
         at org.apache.ignite.Ignition.start(Ignition.java:355) 
         at IgniteSink.open(IgniteSink.java:135) 
         at 
 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 
         at 
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
 
         at 
 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
 
         at 
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) 
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
         at java.lang.Thread.run(Thread.java:745) 
 Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite 
 instance has already been started. 
         at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134) 
         at 
 
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069) 
         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955) 
         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854) 
         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724) 
         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693) 
         at org.apache.ignite.Ignition.start(Ignition.java:352) 
         ... 7 more 

07/16/2018 11:05:30 Job execution switched to status FAILING. 


was (Author: ldz):
[~samaitra]

I tried your newest code and wrote a simple word count application to test 
the sink. 
It appears there's still problems. 
Here's my code. 



import org.apache.flink.api.scala._ 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
import org.apache.flink.streaming.api.scala.extensions._ 
import org.apache.flink.configuration.Configuration 
import org.apache.ignite.Ignition 
import org.apache.ignite.configuration.CacheConfiguration 

import scala.collection.JavaConverters._ 


object WordCount { 

        def main(args: Array[String]) { 

                val ignite = Ignition.start("ignite.xml") 
                val cacheConfig = new CacheConfiguration[Any, Any]() 
                ignite.destroyCache("aaa") 
                cacheConfig.setName("aaa") 
                cacheConfig.setSqlSchema("PUBLIC") 
                ignite.createCache(cacheConfig) 
                ignite.close() 


                // set up the execution environment 
                val env = StreamExecutionEnvironment.getExecutionEnvironment 

                val igniteSink = new IgniteSink[java.util.Map[String, 
Int]]("aaa", 
"ignite.xml") 

                igniteSink.setAllowOverwrite(false) 
                igniteSink.setAutoFlushFrequency(1) 

                igniteSink.open(new Configuration) 


                // get input data 
                val text = env.fromElements( 
                        "To be, or not to be,--that is the question:--", 
                        "Whether 'tis nobler in the mind to suffer", 
                        "The slings and arrows of outrageous fortune", 
                        "Or to take arms against a sea of troubles,") 


                val counts = text 
                        // split up the lines in pairs (2-tuples) containing: 
(word,1) 
                        .flatMap(_.toLowerCase.split("\\W+")) 
                        .filter(_.nonEmpty) 
                        .map((_, 1)) 
                        // group by the tuple field "0" and sum up tuple field 
"1" 
                        .keyBy(0) 
                        .sum(1) 
                        // Convert to key/value format before ingesting to 
Ignite 
                        .mapWith \{ case (k: String, v: Int) => Map(k -> 
v).asJava } 
                        .addSink(igniteSink) 

                try 
                        env.execute("Streaming WordCount1") 
                catch { 
                        case e: Exception => 

                        // Exception handling. 
                } finally igniteSink.close() 

        } 
} 

I tried running this application in Idea and the error log snippet is as 
follows 

07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched to 
FAILED 
class org.apache.ignite.IgniteException: Default Ignite instance has already 
been started. 
        at 
org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990)
 
        at org.apache.ignite.Ignition.start(Ignition.java:355) 
        at IgniteSink.open(IgniteSink.java:135) 
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
 
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
 
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) 
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
        at java.lang.Thread.run(Thread.java:745) 
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite 
instance has already been started. 
        at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134) 
        at 
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069) 
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955) 
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854) 
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724) 
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693) 
        at org.apache.ignite.Ignition.start(Ignition.java:352) 
        ... 7 more 

07/16/2018 11:05:30 Job execution switched to status FAILING. 

> Flink sink throws java.lang.IllegalArgumentException when running in flink 
> cluster mode.
> ----------------------------------------------------------------------------------------
>
>                 Key: IGNITE-8697
>                 URL: https://issues.apache.org/jira/browse/IGNITE-8697
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 2.3, 2.4, 2.5
>            Reporter: Ray
>            Priority: Blocker
>
> if I submit the Application to the Flink Cluster using Ignite flink sink I 
> get this error
>  
> java.lang.ExceptionInInitializerError
>       at 
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201)
>       at 
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175)
>       at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165)
>       at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>       at 
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97)
>       at 
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1)
>       at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>       at 
> org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Ouch! Argument is invalid: 
> Cache name must not be null or empty.
>       at 
> org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109)
>       at 
> org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581)
>       at 
> org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284)
>       at 
> org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.<clinit>(IgniteSink.java:183)
>       ... 27 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to