[ 
https://issues.apache.org/jira/browse/IGNITE-10861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jeado ko updated IGNITE-10861:
------------------------------
    Description: 
I got following error when I create multiple sink in Flink 1.7.0
{code:java}
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite 
instance has already been started.
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141)
at 
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700)
at org.apache.ignite.Ignition.start(Ignition.java:348)
{code}
and this is my flink job code
{code:java}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.ignite.sink.flink.IgniteSink

import scala.collection.JavaConverters._

object IgniteSinkTestJob extends App {

  val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
//  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache", 
"ignite-test.xml")
  igniteSink.setAllowOverwrite(true)
  igniteSink.setAutoFlushFrequency(10)
//  igniteSink.open(new Configuration)

  val igniteSink2 = new IgniteSink[java.util.Map[String, String]]("testCache2", 
"ignite-test.xml")
  igniteSink2.setAllowOverwrite(true)
  igniteSink2.setAutoFlushFrequency(10)
//  igniteSink2.open(new Configuration)

  val igniteSink3 = new IgniteSink[java.util.Map[String, String]]("testCache3", 
"ignite-test.xml")
  igniteSink3.setAllowOverwrite(true)
  igniteSink3.setAutoFlushFrequency(10)
//  igniteSink3.open(new Configuration)

  val source = env.fromCollection(
    Array(
      Map("key1" -> "hello1"),
      Map("key1" -> "hello11"),
      Map("key1" -> "hello144"),
      Map("key1" -> "hello1155"),
      Map("key2" -> "hello2"),
      Map("key2" -> "hello3"),
      Map("key3" -> "hello23"),
      Map("key3" -> "hello25")
    ).map(_.asJava)
  )

  source
    .filter(v => v.containsKey("key1"))
    .setParallelism(2)
    .addSink(igniteSink)
      .name("sink1")
      .setParallelism(1)
  source.filter(v => v.containsKey("key2"))
    .setParallelism(2)
    .addSink(igniteSink2)
      .name("sink2")
      .setParallelism(1)
  source.filter(v => v.containsKey("key3"))
    .setParallelism(2)
    .addSink(igniteSink3)
      .name("sink3")
      .setParallelism(1)

  env.execute("test ignite sink")

}
{code}
And if I remove comment to open Ignite sink before execute pipeline, I still 
got a same error.

  was:
I got following error when I create multiple sink in Flink 1.7.0
{code:java}
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite 
instance has already been started.
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141)
at 
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700)
at org.apache.ignite.Ignition.start(Ignition.java:348)
{code}
and this is my flink job code
{code:java}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.ignite.sink.flink.IgniteSink

import scala.collection.JavaConverters._

object IgniteSinkTestJob extends App {

  val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
//  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache", 
"ignite-test.xml")
  igniteSink.setAllowOverwrite(true)
  igniteSink.setAutoFlushFrequency(10)
//  igniteSink.open(new Configuration)

  val igniteSink2 = new IgniteSink[java.util.Map[String, String]]("testCache2", 
"ignite-test.xml")
  igniteSink2.setAllowOverwrite(true)
  igniteSink2.setAutoFlushFrequency(10)
//  igniteSink2.open(new Configuration)

  val igniteSink3 = new IgniteSink[java.util.Map[String, String]]("testCache3", 
"ignite-test.xml")
  igniteSink3.setAllowOverwrite(true)
  igniteSink3.setAutoFlushFrequency(10)
//  igniteSink3.open(new Configuration)

  val source = env.fromCollection(
    Array(
      Map("key1" -> "hello1"),
      Map("key1" -> "hello11"),
      Map("key1" -> "hello144"),
      Map("key1" -> "hello1155"),
      Map("key2" -> "hello2"),
      Map("key2" -> "hello3"),
      Map("key3" -> "hello23"),
      Map("key3" -> "hello25")
    ).map(_.asJava)
  )

  source
    .filter(v => v.containsKey("key1"))
    .setParallelism(2)
    .addSink(igniteSink)
      .name("sink1")
      .setParallelism(1)
  source.filter(v => v.containsKey("key2"))
    .setParallelism(2)
    .addSink(igniteSink2)
      .name("sink2")
      .setParallelism(1)
  source.filter(v => v.containsKey("key3"))
    .setParallelism(2)
    .addSink(igniteSink3)
      .name("sink3")
      .setParallelism(1)

  env.execute("test ignite sink")

}
{code}


> Using multiple Ignite Sink got Ignite instance has already been started Error
> -----------------------------------------------------------------------------
>
>                 Key: IGNITE-10861
>                 URL: https://issues.apache.org/jira/browse/IGNITE-10861
>             Project: Ignite
>          Issue Type: Bug
>          Components: streaming
>    Affects Versions: 2.7
>            Reporter: jeado ko
>            Priority: Major
>
> I got following error when I create multiple sink in Flink 1.7.0
> {code:java}
> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite 
> instance has already been started.
> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141)
> at 
> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731)
> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700)
> at org.apache.ignite.Ignition.start(Ignition.java:348)
> {code}
> and this is my flink job code
> {code:java}
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala._
> import org.apache.ignite.sink.flink.IgniteSink
> import scala.collection.JavaConverters._
> object IgniteSinkTestJob extends App {
>   val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> //  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache", 
> "ignite-test.xml")
>   igniteSink.setAllowOverwrite(true)
>   igniteSink.setAutoFlushFrequency(10)
> //  igniteSink.open(new Configuration)
>   val igniteSink2 = new IgniteSink[java.util.Map[String, 
> String]]("testCache2", "ignite-test.xml")
>   igniteSink2.setAllowOverwrite(true)
>   igniteSink2.setAutoFlushFrequency(10)
> //  igniteSink2.open(new Configuration)
>   val igniteSink3 = new IgniteSink[java.util.Map[String, 
> String]]("testCache3", "ignite-test.xml")
>   igniteSink3.setAllowOverwrite(true)
>   igniteSink3.setAutoFlushFrequency(10)
> //  igniteSink3.open(new Configuration)
>   val source = env.fromCollection(
>     Array(
>       Map("key1" -> "hello1"),
>       Map("key1" -> "hello11"),
>       Map("key1" -> "hello144"),
>       Map("key1" -> "hello1155"),
>       Map("key2" -> "hello2"),
>       Map("key2" -> "hello3"),
>       Map("key3" -> "hello23"),
>       Map("key3" -> "hello25")
>     ).map(_.asJava)
>   )
>   source
>     .filter(v => v.containsKey("key1"))
>     .setParallelism(2)
>     .addSink(igniteSink)
>       .name("sink1")
>       .setParallelism(1)
>   source.filter(v => v.containsKey("key2"))
>     .setParallelism(2)
>     .addSink(igniteSink2)
>       .name("sink2")
>       .setParallelism(1)
>   source.filter(v => v.containsKey("key3"))
>     .setParallelism(2)
>     .addSink(igniteSink3)
>       .name("sink3")
>       .setParallelism(1)
>   env.execute("test ignite sink")
> }
> {code}
> And if I remove comment to open Ignite sink before execute pipeline, I still 
> got a same error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to