[ 
https://issues.apache.org/jira/browse/IGNITE-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504591#comment-16504591
 ] 

Andrew Mashenkov commented on IGNITE-8697:
------------------------------------------

Static fields looks very suspicous.

Is it possible, Sink object created with one class loader and then transferred 
to thread with another classloader?
In that case static content will not be transfered with the object as it is not 
belongs to object.

Also. Why we have static datastreamer instance here? 
It is a closable object and it looks like can be reused by several threads. And 
once streamer has been closed, it can be used.

> Flink sink throws java.lang.IllegalArgumentException when running in flink 
> cluster mode.
> ----------------------------------------------------------------------------------------
>
>                 Key: IGNITE-8697
>                 URL: https://issues.apache.org/jira/browse/IGNITE-8697
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 2.3, 2.4, 2.5
>            Reporter: Ray
>            Assignee: Roman Shtykh
>            Priority: Blocker
>
> if I submit the Application to the Flink Cluster using Ignite flink sink I 
> get this error
>  
> java.lang.ExceptionInInitializerError
>       at 
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201)
>       at 
> org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175)
>       at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165)
>       at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>       at 
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97)
>       at 
> org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1)
>       at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>       at 
> org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Ouch! Argument is invalid: 
> Cache name must not be null or empty.
>       at 
> org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109)
>       at 
> org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581)
>       at 
> org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284)
>       at 
> org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.<clinit>(IgniteSink.java:183)
>       ... 27 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to