[ https://issues.apache.org/jira/browse/IGNITE-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16552657#comment-16552657 ]
Andrew Mashenkov edited comment on IGNITE-8697 at 7/23/18 10:57 AM: -------------------------------------------------------------------- Hi [~samaitra], I've looked at the PR and have some notes. Is it possible to use same Ignite instance with different Flink Sink objects? If so, then Ignite instance can be closed with one Sink while another Sink make some progress. I'd suggest to try to use Ignition.ignite() if it is already exists and do not close Ignite instance as it was not created with Flink Sink. Otherwise, if no local Ignite found in current JVM, then start named Ignite instance and track number of Sinks that use Ignite instance to close Ignite properly. Also, you can add some synchronization on private static Object to linearlize Ignite instance creation\closing. Thus, try-catch wrapper for Ignition.start() call can be removed. Streamer usage look ok as it is created and closed within same Sink instance. was (Author: amashenkov): Hi [~samaitra], I've looked at the PR and have some notes. Is it possible to use same Ignite instance with different Flink Sink objects? If so, then Ignite instance can be closed with one Sink while another Sink make some progress. I'd suggest to try to use Ignition.ignite() if it is already exists and do not close Ignite instance if it was not created with Flink Sink. Otherwise, if no local Ignite found in current JVM, then start named Ignite instance and track number of Sinks that use Ignite instance to close Ignite properly. Also, you can add some synchronization on private static Object to linearlize Ignite instance creation\closing. Thus, try-catch wrapper for Ignition.start() call can be removed. Streamer usage look ok as it is created and closed within same Sink instance. > Flink sink throws java.lang.IllegalArgumentException when running in flink > cluster mode. > ---------------------------------------------------------------------------------------- > > Key: IGNITE-8697 > URL: https://issues.apache.org/jira/browse/IGNITE-8697 > Project: Ignite > Issue Type: Bug > Affects Versions: 2.3, 2.4, 2.5 > Reporter: Ray > Priority: Blocker > > if I submit the Application to the Flink Cluster using Ignite flink sink I > get this error > > java.lang.ExceptionInInitializerError > at > org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201) > at > org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175) > at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97) > at > org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalArgumentException: Ouch! Argument is invalid: > Cache name must not be null or empty. > at > org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109) > at > org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581) > at > org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284) > at > org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.<clinit>(IgniteSink.java:183) > ... 27 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)