[ 
https://issues.apache.org/jira/browse/IGNITE-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553886#comment-16553886
 ] 

Saikat Maitra commented on IGNITE-8697:
---------------------------------------

Hi [~amashenkov]

Thank you for reviewing and sharing feedback, Flink Sink can be used in 
standalone mode or in cluster mode. In standalone mode we may reuse Ignite 
instance but in cluster mode each jvm have separate ignite instances.

We purposefully remove the static fields as they may not guarantee for reuse of 
the same Ignite instance in cluster mode. Flink by default serialize everything 
and send to each cluster member.

I can make the change to use Ignition.ignite() and if no Ignite instance is 
found then I can use Ignition.start() but my understanding is it will throw 
IgniteIllegalStateException when no Ignite instance is running and we will need 
to catch that and call Ignition.start().

 

Please let me know your feedback.

 

Regards,

Saikat

 

> Flink sink throws java.lang.IllegalArgumentException when running in flink 
> cluster mode.
> ----------------------------------------------------------------------------------------
>
>                 Key: IGNITE-8697
>                 URL: https://issues.apache.org/jira/browse/IGNITE-8697
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 2.3, 2.4, 2.5
>            Reporter: Ray
>            Priority: Blocker
>
> if I submit the Application to the Flink Cluster using Ignite flink sink I 
> get this error
>  
> java.lang.ExceptionInInitializerError
>       at 
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201)
>       at 
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175)
>       at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165)
>       at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>       at 
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97)
>       at 
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1)
>       at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>       at 
> org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Ouch! Argument is invalid: 
> Cache name must not be null or empty.
>       at 
> org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109)
>       at 
> org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581)
>       at 
> org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284)
>       at 
> org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.<clinit>(IgniteSink.java:183)
>       ... 27 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to