Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-07-23 Thread Ray
Hello Saikat,

Thanks for the fix.
I validated this fix by running my WordCount application in both standalone
mode and cluster mode.
The data can be inserted.

But I found another problem here.
The data written into Ignite is not correct.
My application counts the word occurrence in this the following sentence.
 "To be, or not to be,--that is the question:--",
 "Whether 'tis nobler in the mind to suffer",
 "The slings and arrows of outrageous fortune",
 "Or to take arms against a sea of troubles,

The count of word "to" should be 9.
But when I check the result in Ignite, all the values of every word is 1.
Clearly it's wrong.
The reproducer program is the same as I attached in the  JIRA ticket.

Please let me know if you can reproduce this issue.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-07-21 Thread Saikat Maitra
Hi Ray, Andrew

As discussed I have fixed the issue with IgniteSink when running in cluster
mode.

Please review the below PR and share feedback.

PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat




On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra 
wrote:

> Hi Ray,
>
> Thank you for validating the changes, I see that in cluster mode when I am
> checking the IgniteSink it is working as desired. In stand alone mode I can
> see we are getting the exception class org.apache.ignite.IgniteException:
> Default Ignite instance has already been started.
>
> Please take a look into this sample application https://github.
> com/samaitra/streamers which I used to run it with flink in cluster mode.
>
> I am considering if I should make changes to run the IgniteSink in client
> mode similar to the ways flink connector for redis and flume were
> implemented in Apache Bahir
>
> https://github.com/apache/bahir-flink
>
> I will share update soon.
>
> Regards,
> Saikat
>
>
>
> On Sun, Jul 15, 2018 at 10:07 PM, Ray  wrote:
>
>> Hello Saikat,
>>
>> I tried your newest code and wrote a simple word count application to test
>> the sink.
>> It appears there's still problems.
>> Here's my code.
>>
>>
>>
>> import org.apache.flink.api.scala._
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.scala.extensions._
>> import org.apache.flink.configuration.Configuration
>> import org.apache.ignite.Ignition
>> import org.apache.ignite.configuration.CacheConfiguration
>>
>> import scala.collection.JavaConverters._
>>
>>
>> object WordCount {
>>
>> def main(args: Array[String]) {
>>
>> val ignite = Ignition.start("ignite.xml")
>> val cacheConfig = new CacheConfiguration[Any, Any]()
>> ignite.destroyCache("aaa")
>> cacheConfig.setName("aaa")
>> cacheConfig.setSqlSchema("PUBLIC")
>> ignite.createCache(cacheConfig)
>> ignite.close()
>>
>>
>> // set up the execution environment
>> val env = StreamExecutionEnvironment.get
>> ExecutionEnvironment
>>
>> val igniteSink = new IgniteSink[java.util.Map[String,
>> Int]]("aaa",
>> "ignite.xml")
>>
>> igniteSink.setAllowOverwrite(false)
>> igniteSink.setAutoFlushFrequency(1)
>>
>> igniteSink.open(new Configuration)
>>
>>
>> // get input data
>> val text = env.fromElements(
>> "To be, or not to be,--that is the question:--",
>> "Whether 'tis nobler in the mind to suffer",
>> "The slings and arrows of outrageous fortune",
>> "Or to take arms against a sea of troubles,")
>>
>>
>> val counts = text
>> // split up the lines in pairs (2-tuples)
>> containing: (word,1)
>> .flatMap(_.toLowerCase.split("\\W+"))
>> .filter(_.nonEmpty)
>> .map((_, 1))
>> // group by the tuple field "0" and sum up tuple
>> field "1"
>> .keyBy(0)
>> .sum(1)
>> // Convert to key/value format before ingesting
>> to Ignite
>> .mapWith { case (k: String, v: Int) => Map(k ->
>> v).asJava }
>> .addSink(igniteSink)
>>
>> try
>> env.execute("Streaming WordCount1")
>> catch {
>> case e: Exception =>
>>
>> // Exception handling.
>> } finally igniteSink.close()
>>
>> }
>> }
>>
>> I tried running this application in Idea and the error log snippet is as
>> follows
>>
>> 07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched
>> to
>> FAILED
>> class org.apache.ignite.IgniteException: Default Ignite instance has
>> already
>> been started.
>> at
>> org.apache.ignite.internal.util.IgniteUtils.convertException
>> (IgniteUtils.java:990)
>> at org.apache.ignite.Ignition.start(Ignition.java:355)
>> at IgniteSink.open(IgniteSink.java:135)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:111)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:376)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: class org.apache.ignite.Ign

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-07-16 Thread Saikat Maitra
Hi Ray,

Thank you for validating the changes, I see that in cluster mode when I am
checking the IgniteSink it is working as desired. In stand alone mode I can
see we are getting the exception class org.apache.ignite.IgniteException:
Default Ignite instance has already been started.

Please take a look into this sample application
https://github.com/samaitra/streamers which I used to run it with flink in
cluster mode.

I am considering if I should make changes to run the IgniteSink in client
mode similar to the ways flink connector for redis and flume were
implemented in Apache Bahir

https://github.com/apache/bahir-flink

I will share update soon.

Regards,
Saikat



On Sun, Jul 15, 2018 at 10:07 PM, Ray  wrote:

> Hello Saikat,
>
> I tried your newest code and wrote a simple word count application to test
> the sink.
> It appears there's still problems.
> Here's my code.
>
>
>
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.extensions._
> import org.apache.flink.configuration.Configuration
> import org.apache.ignite.Ignition
> import org.apache.ignite.configuration.CacheConfiguration
>
> import scala.collection.JavaConverters._
>
>
> object WordCount {
>
> def main(args: Array[String]) {
>
> val ignite = Ignition.start("ignite.xml")
> val cacheConfig = new CacheConfiguration[Any, Any]()
> ignite.destroyCache("aaa")
> cacheConfig.setName("aaa")
> cacheConfig.setSqlSchema("PUBLIC")
> ignite.createCache(cacheConfig)
> ignite.close()
>
>
> // set up the execution environment
> val env = StreamExecutionEnvironment.
> getExecutionEnvironment
>
> val igniteSink = new IgniteSink[java.util.Map[String,
> Int]]("aaa",
> "ignite.xml")
>
> igniteSink.setAllowOverwrite(false)
> igniteSink.setAutoFlushFrequency(1)
>
> igniteSink.open(new Configuration)
>
>
> // get input data
> val text = env.fromElements(
> "To be, or not to be,--that is the question:--",
> "Whether 'tis nobler in the mind to suffer",
> "The slings and arrows of outrageous fortune",
> "Or to take arms against a sea of troubles,")
>
>
> val counts = text
> // split up the lines in pairs (2-tuples)
> containing: (word,1)
> .flatMap(_.toLowerCase.split("\\W+"))
> .filter(_.nonEmpty)
> .map((_, 1))
> // group by the tuple field "0" and sum up tuple
> field "1"
> .keyBy(0)
> .sum(1)
> // Convert to key/value format before ingesting to
> Ignite
> .mapWith { case (k: String, v: Int) => Map(k ->
> v).asJava }
> .addSink(igniteSink)
>
> try
> env.execute("Streaming WordCount1")
> catch {
> case e: Exception =>
>
> // Exception handling.
> } finally igniteSink.close()
>
> }
> }
>
> I tried running this application in Idea and the error log snippet is as
> follows
>
> 07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched
> to
> FAILED
> class org.apache.ignite.IgniteException: Default Ignite instance has
> already
> been started.
> at
> org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.
> java:990)
> at org.apache.ignite.Ignition.start(Ignition.java:355)
> at IgniteSink.open(IgniteSink.java:135)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
> AbstractUdfStreamOperator.java:111)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
> instance has already been started.
> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.
> java:1134)
> at
> org.apache.ignite.internal.IgnitionEx.startConfigurations(
> IgnitionEx.java:1069)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:955)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:854)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:7

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-07-15 Thread Ray
Hello Saikat,

I tried your newest code and wrote a simple word count application to test
the sink.
It appears there's still problems.
Here's my code.



import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.configuration.Configuration
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.CacheConfiguration

import scala.collection.JavaConverters._


object WordCount {

def main(args: Array[String]) {

val ignite = Ignition.start("ignite.xml")
val cacheConfig = new CacheConfiguration[Any, Any]()
ignite.destroyCache("aaa")
cacheConfig.setName("aaa")
cacheConfig.setSqlSchema("PUBLIC")
ignite.createCache(cacheConfig)
ignite.close()


// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

val igniteSink = new IgniteSink[java.util.Map[String, 
Int]]("aaa",
"ignite.xml")

igniteSink.setAllowOverwrite(false)
igniteSink.setAutoFlushFrequency(1)

igniteSink.open(new Configuration)


// get input data
val text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")


val counts = text
// split up the lines in pairs (2-tuples) containing: 
(word,1)
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
// group by the tuple field "0" and sum up tuple field 
"1"
.keyBy(0)
.sum(1)
// Convert to key/value format before ingesting to 
Ignite
.mapWith { case (k: String, v: Int) => Map(k -> 
v).asJava }
.addSink(igniteSink)

try
env.execute("Streaming WordCount1")
catch {
case e: Exception =>

// Exception handling.
} finally igniteSink.close()

}
}

I tried running this application in Idea and the error log snippet is as
follows

07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched to
FAILED 
class org.apache.ignite.IgniteException: Default Ignite instance has already
been started.
at
org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990)
at org.apache.ignite.Ignition.start(Ignition.java:355)
at IgniteSink.open(IgniteSink.java:135)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
instance has already been started.
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134)
at
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693)
at org.apache.ignite.Ignition.start(Ignition.java:352)
... 7 more

07/16/2018 11:05:30 Job execution switched to status FAILING.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-07-15 Thread Saikat Maitra
Hello Ray,

I have fixed the issue related to Flink IgniteSink. Please take a look on
the changes [https://github.com/samaitra/ignite/blob/IGNITE-8697/
modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java]

I am working on the testcases and will raise the PR soon.

Regards
Saikat

On Tue, Jun 5, 2018 at 1:13 AM, Ray  wrote:

> Yes, the cache is already created before running my flink application.
>
> The issue can be reproduced when you submit your flink application to your
> flink cluster.
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>


Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-06-04 Thread Ray
Yes, the cache is already created before running my flink application.

The issue can be reproduced when you submit your flink application to your
flink cluster.




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-06-04 Thread Saikat Maitra
Hi,

When Ignite Sink Data Streamer start it checks if the cache name is already
present in the grid before the streaming process can begin.

Can you please confirm if cache got created before data sink process get
executed

Regards,
Saikat

On Mon, Jun 4, 2018 at 9:24 PM, Ray  wrote:

> I think it's a code bug in flink sink.
> I had this same problem some time ago.
> I think it's caused by compiler optimization of variable initialization in
> multi thread environment(flink cluster mode).
> In this case, the variable "cacheName" is not initialized when being used
> because compile will optimize the variable initialize order in multi thread
> environment.
>
> I have created the ticket in jira and assigned to the author of flink sink.
>
> https://issues.apache.org/jira/browse/IGNITE-8697
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>


Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-06-04 Thread Ray
I think it's a code bug in flink sink.
I had this same problem some time ago.
I think it's caused by compiler optimization of variable initialization in
multi thread environment(flink cluster mode).
In this case, the variable "cacheName" is not initialized when being used
because compile will optimize the variable initialize order in multi thread
environment.

I have created the ticket in jira and assigned to the author of flink sink.

https://issues.apache.org/jira/browse/IGNITE-8697



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-06-01 Thread Burt Parkers
Hi,

it runs fine in a single VM, but if I build the uber-jar (contains the
application and all required ignite dependencies) and submit it to the
flink cluster (with master and worker nodes each in different VM's) the
cache configuration isn't loaded:

2018-06-01 22:05:30,665 INFO  org.apache.ignite.internal.IgniteKernal
 - Config URL: n/a

It seems that both fields in IgniteSink:

/** Ignite grid configuration file. */
private static String igniteCfgFile;

/** Cache name. */
private static String cacheName;

are null.

Maybe the Sink isn't initialized correctly after serialization from the
master to the worker node? I'm not familiar with the Kryo Serialization
used by Apache Flink, but maybe static fields are ignored?

If I create my own copy of IgniteSink with hardcoded values

private static class Holder {
private static final Ignite IGNITE =
Ignition.start("flink-config.xml");
private static final IgniteDataStreamer STREAMER =
IGNITE.dataStreamer("DemoCache");
}

then the cache configuration is loaded and everything is working as
expected:

2018-06-01 22:28:12,985 INFO  org.apache.ignite.internal.IgniteKernal
 - Config URL:
/tmp/blobStore-56228c43-2ed8-46d3-bbf1-631c3ef778b3/job_132b1e78c18e363739fede5fded4214b/blob_p-a85926793530ee5cec5e7c9372ddbfc22020a0d0-d4b791f47ff24933e8d6ff8ccc4fa665!/flink-config.xml

- Burt


Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-06-01 Thread Humphrey
It is working fine here.

Tweaked the main method a bit:
public static void main(String[] args) throws Exception {
System.setProperty("IGNITE_QUIET", "false");

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().enableSysoutLogging();

IgniteSink> igniteSink = new
IgniteSink<>("DemoCache", "flink-config.xml");
igniteSink.setAllowOverwrite(true);
igniteSink.setAutoFlushFrequency(10);

igniteSink.start();

System.out.println("\n\nSTARTED SUCCESSFULLY\n\n");
//DataStream text = env.socketTextStream("localhost",
12200);

//DataStream> datastream = text.flatMap(new
Splitter());
//datastream.addSink(igniteSink);

//env.execute("Demo Streamer");
igniteSink.stop();
}

I added the following dependency:

org.apache.ignite
ignite-log4j   
${ignite.version}


The log:

>>> +--+
>>> Ignite ver. 2.5.0#20180523-sha1:86e110c750a340dc9be2d396415f0b80d7ed8813
>>> +--+
>>> OS name: Windows 7 6.1 amd64
>>> CPU(s): 4
>>> Heap: 3.5GB
>>> VM name: 13908@WPU8L0031201
>>> Local node [ID=C91AF628-B784-4CB9-8FF9-AA2E0F04D303, order=1,
>>> clientMode=false]
>>> Local node addresses: [WPU8L0031201.ad.ing.net/0:0:0:0:0:0:0:1,
>>> WPU8L0031201.ad.ing.net/127.0.0.1, WPU8L0031201.ad.ing.net/192.168.1.69,
>>> /192.168.56.1, /192.168.99.1]
>>> Local ports: TCP:10800 TCP:11211 TCP:47100 TCP:47500 

17:27:42,928 INFO 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager  -
Topology snapshot [ver=1, servers=1, clients=0, CPUs=4, offheap=5.1GB,
heap=3.5GB]
17:27:42,928 INFO 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager  -   ^--
Node [id=C91AF628-B784-4CB9-8FF9-AA2E0F04D303, clusterState=ACTIVE]
17:27:42,928 INFO 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager  - Data
Regions Configured:
17:27:42,930 INFO 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager  -   ^--
default [initSize=256,0 MiB, maxSize=3,1 GiB, persistenceEnabled=false]
17:27:42,930 INFO 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager  -   ^--
Data_Region [initSize=1,0 GiB, maxSize=2,0 GiB, persistenceEnabled=false]


STARTED SUCCESSFULLY


17:27:43,105 INFO 
org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestProtocol 
- Command protocol successfully stopped: TCP binary
17:27:43,159 INFO 
org.apache.ignite.internal.processors.cache.GridCacheProcessor  - Stopped
cache [cacheName=DemoCache]
17:27:43,159 INFO 
org.apache.ignite.internal.processors.cache.GridCacheProcessor  - Stopped
cache [cacheName=ignite-sys-cache]
17:27:43,238 INFO  org.apache.ignite.internal.IgniteKernal  
- 

>>> +-+
>>> Ignite ver. 2.5.0#20180523-sha1:86e110c750a340dc9be2d396415f0b80d7ed8813
>>> stopped OK
>>> +-+
>>> Grid uptime: 00:00:00.306



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/


Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-06-01 Thread Burt Parkers
Hi,


I'm trying to run an Apache Flink Programm with the flink-ignite Sink.
Everything is working fine if I start the Application from my IDE, but if I
submit the Application to the Flink Cluster I get this error:


java.lang.ExceptionInInitializerError
at 
org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201)
at 
org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175)
at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97)
at 
org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Ouch! Argument is
invalid: Cache name must not be null or empty.
at 
org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109)
at 
org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581)
at 
org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284)
at 
org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.(IgniteSink.java:183)
... 27 more


but the cache name is set:


https://github.com/bpark/flink-ignite-demo/blob/master/src/main/java/com/github/bpark/InstrumentStreamer.java


and configured:


https://github.com/bpark/flink-ignite-demo/blob/master/src/main/resources/flink-config.xml


ignite-flink, ignite-spring, ignite-indexing and ignite-core are included
in the shaded jar file.


The example project is located here:
https://github.com/bpark/flink-ignite-demo


Any idea what's wrong here?


- Burt