[ https://issues.apache.org/jira/browse/IGNITE-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
jeado ko updated IGNITE-10861: ------------------------------ Description: I got following error when I create multiple sink in Flink 1.7.0 {code:java} Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite instance has already been started. at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141) at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700) at org.apache.ignite.Ignition.start(Ignition.java:348) {code} and this is my flink job code {code:java} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.ignite.sink.flink.IgniteSink import scala.collection.JavaConverters._ object IgniteSinkTestJob extends App { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache", "ignite-test.xml") igniteSink.setAllowOverwrite(true) igniteSink.setAutoFlushFrequency(10) // igniteSink.open(new Configuration) val igniteSink2 = new IgniteSink[java.util.Map[String, String]]("testCache2", "ignite-test.xml") igniteSink2.setAllowOverwrite(true) igniteSink2.setAutoFlushFrequency(10) // igniteSink2.open(new Configuration) val igniteSink3 = new IgniteSink[java.util.Map[String, String]]("testCache3", "ignite-test.xml") igniteSink3.setAllowOverwrite(true) igniteSink3.setAutoFlushFrequency(10) // igniteSink3.open(new Configuration) val source = env.fromCollection( Array( Map("key1" -> "hello1"), Map("key1" -> "hello11"), Map("key1" -> "hello144"), Map("key1" -> "hello1155"), Map("key2" -> "hello2"), Map("key2" -> "hello3"), Map("key3" -> "hello23"), Map("key3" -> "hello25") ).map(_.asJava) ) source .filter(v => v.containsKey("key1")) .setParallelism(2) .addSink(igniteSink) .name("sink1") .setParallelism(1) source.filter(v => v.containsKey("key2")) .setParallelism(2) .addSink(igniteSink2) .name("sink2") .setParallelism(1) source.filter(v => v.containsKey("key3")) .setParallelism(2) .addSink(igniteSink3) .name("sink3") .setParallelism(1) env.execute("test ignite sink") } {code} And if I remove comment to open Ignite sink before execute pipeline, I still got a same error. was: I got following error when I create multiple sink in Flink 1.7.0 {code:java} Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite instance has already been started. at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141) at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700) at org.apache.ignite.Ignition.start(Ignition.java:348) {code} and this is my flink job code {code:java} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.ignite.sink.flink.IgniteSink import scala.collection.JavaConverters._ object IgniteSinkTestJob extends App { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache", "ignite-test.xml") igniteSink.setAllowOverwrite(true) igniteSink.setAutoFlushFrequency(10) // igniteSink.open(new Configuration) val igniteSink2 = new IgniteSink[java.util.Map[String, String]]("testCache2", "ignite-test.xml") igniteSink2.setAllowOverwrite(true) igniteSink2.setAutoFlushFrequency(10) // igniteSink2.open(new Configuration) val igniteSink3 = new IgniteSink[java.util.Map[String, String]]("testCache3", "ignite-test.xml") igniteSink3.setAllowOverwrite(true) igniteSink3.setAutoFlushFrequency(10) // igniteSink3.open(new Configuration) val source = env.fromCollection( Array( Map("key1" -> "hello1"), Map("key1" -> "hello11"), Map("key1" -> "hello144"), Map("key1" -> "hello1155"), Map("key2" -> "hello2"), Map("key2" -> "hello3"), Map("key3" -> "hello23"), Map("key3" -> "hello25") ).map(_.asJava) ) source .filter(v => v.containsKey("key1")) .setParallelism(2) .addSink(igniteSink) .name("sink1") .setParallelism(1) source.filter(v => v.containsKey("key2")) .setParallelism(2) .addSink(igniteSink2) .name("sink2") .setParallelism(1) source.filter(v => v.containsKey("key3")) .setParallelism(2) .addSink(igniteSink3) .name("sink3") .setParallelism(1) env.execute("test ignite sink") } {code} > Using multiple Ignite Sink got Ignite instance has already been started Error > ----------------------------------------------------------------------------- > > Key: IGNITE-10861 > URL: https://issues.apache.org/jira/browse/IGNITE-10861 > Project: Ignite > Issue Type: Bug > Components: streaming > Affects Versions: 2.7 > Reporter: jeado ko > Priority: Major > > I got following error when I create multiple sink in Flink 1.7.0 > {code:java} > Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite > instance has already been started. > at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141) > at > org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700) > at org.apache.ignite.Ignition.start(Ignition.java:348) > {code} > and this is my flink job code > {code:java} > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.api.scala._ > import org.apache.ignite.sink.flink.IgniteSink > import scala.collection.JavaConverters._ > object IgniteSinkTestJob extends App { > val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache", > "ignite-test.xml") > igniteSink.setAllowOverwrite(true) > igniteSink.setAutoFlushFrequency(10) > // igniteSink.open(new Configuration) > val igniteSink2 = new IgniteSink[java.util.Map[String, > String]]("testCache2", "ignite-test.xml") > igniteSink2.setAllowOverwrite(true) > igniteSink2.setAutoFlushFrequency(10) > // igniteSink2.open(new Configuration) > val igniteSink3 = new IgniteSink[java.util.Map[String, > String]]("testCache3", "ignite-test.xml") > igniteSink3.setAllowOverwrite(true) > igniteSink3.setAutoFlushFrequency(10) > // igniteSink3.open(new Configuration) > val source = env.fromCollection( > Array( > Map("key1" -> "hello1"), > Map("key1" -> "hello11"), > Map("key1" -> "hello144"), > Map("key1" -> "hello1155"), > Map("key2" -> "hello2"), > Map("key2" -> "hello3"), > Map("key3" -> "hello23"), > Map("key3" -> "hello25") > ).map(_.asJava) > ) > source > .filter(v => v.containsKey("key1")) > .setParallelism(2) > .addSink(igniteSink) > .name("sink1") > .setParallelism(1) > source.filter(v => v.containsKey("key2")) > .setParallelism(2) > .addSink(igniteSink2) > .name("sink2") > .setParallelism(1) > source.filter(v => v.containsKey("key3")) > .setParallelism(2) > .addSink(igniteSink3) > .name("sink3") > .setParallelism(1) > env.execute("test ignite sink") > } > {code} > And if I remove comment to open Ignite sink before execute pipeline, I still > got a same error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)