[
https://issues.apache.org/jira/browse/IGNITE-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16552657#comment-16552657
]
Andrew Mashenkov commented on IGNITE-8697:
------------------------------------------
Hi [~samaitra],
I've looked at the PR and have some notes.
Is it possible to use same Ignite instance with different Flink Sink objects?
If so, then Ignite instance can be closed with one Sink while another Sink make
some progress.
I'd suggest to try to use Ignition.ignite() if it is already exists and do not
close Ignite instance if it was not created with Flink Sink.
Otherwise, if no local Ignite found in current JVM, then start named Ignite
instance and track number of Sinks that use Ignite instance to close Ignite
properly.
Also, you can add some synchronization on private static Object to linearlize
Ignite instance creation\closing. Thus, try-catch wrapper for Ignition.start()
call can be removed.
Streamer usage look ok as it is created and closed within same Sink instance.
> Flink sink throws java.lang.IllegalArgumentException when running in flink
> cluster mode.
> ----------------------------------------------------------------------------------------
>
> Key: IGNITE-8697
> URL: https://issues.apache.org/jira/browse/IGNITE-8697
> Project: Ignite
> Issue Type: Bug
> Affects Versions: 2.3, 2.4, 2.5
> Reporter: Ray
> Priority: Blocker
>
> if I submit the Application to the Flink Cluster using Ignite flink sink I
> get this error
>
> java.lang.ExceptionInInitializerError
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201)
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175)
> at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165)
> at
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97)
> at
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Ouch! Argument is invalid:
> Cache name must not be null or empty.
> at
> org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109)
> at
> org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581)
> at
> org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284)
> at
> org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.<clinit>(IgniteSink.java:183)
> ... 27 more
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)