Hello there,

My use case is relatively simple: I want to be able to save an RDD in Spark
to two different caches in Ignite, inside the same Spark Context.

When I try to save an RDD to a single IgniteCache, everything works well:

  case class Sensor_Att(
                         @(QuerySqlField @field)(index = false)    active:
String,
                         @(QuerySqlField @field)(index = false)    `type`:
String,
                         @(QuerySqlField @field)(index = true)    name:
String,
                       )

    val sqlContext: SparkSession =
SparkSession.builder().master("local[*]").appName("DataProcessing").getOrCreate()
    val sc: SparkContext = sqlContext.sparkContext
     val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    val ignitec:IgniteContext = new IgniteContext(sc,()=>new
IgniteConfiguration().setClientMode(false).setLocalHost("127.0.0.1").setActiveOnStart(true).
      setCacheConfiguration(new
CacheConfiguration[String,Sensor_Att]().setIndexedTypes(classOf[String],classOf[Sensor_Att]).setName("sensorData").
       
setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC)).setDiscoverySpi(new
TcpDiscoverySpi().
      setLocalAddress("127.0.0.1").setLocalPort(47090).setIpFinder(new
TcpDiscoveryMulticastIpFinder().
      setAddresses(new util.ArrayList[String]))).setCommunicationSpi(new
TcpCommunicationSpi().setLocalPort(47000)))


      val cachedRDD:IgniteRDD[String,Sensor_Att]=ignitec.fromCache("Data1")
      val  RDD_with_key: RDD[(String, Sensor_Att)]
=df_RDD_NEW_CLASS.map(x=>(x.name,x))
      cachedRDD.savePairs(RDD_with_key)
      val df=cachedRDD.sql("select * from Sensor_Att")
      df.show()


If however I try to add a second IgniteContext, using the same class as an
index, and try to save an RDD to its cache, like so:

(code above...)
 val ignitec=...
 val ignitec2:IgniteContext = new IgniteContext(sc,()=>new
IgniteConfiguration().setClientMode(false).setLocalHost("127.0.0.1").setActiveOnStart(true).
      setCacheConfiguration(new
CacheConfiguration[String,Sensor_Att]().setIndexedTypes(classOf[String],classOf[Sensor_Att]).setName("historicsensorData").
       
setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC)).setDiscoverySpi(new
TcpDiscoverySpi().
      setLocalAddress("127.0.0.1").setLocalPort(47091).setIpFinder(new
TcpDiscoveryMulticastIpFinder().
      setAddresses(new util.ArrayList[String]))).setCommunicationSpi(new
TcpCommunicationSpi().setLocalPort(47007)))

(code above....)
      val df=cachedRDD.sql("select * from Sensor_Att")
      df.show()

      val cachedRDD2:IgniteRDD[String,Sensor_Att]=ignitec.fromCache("Data2")
      cachedRDD2.savePairs(RDD_with_key)
      val df2=cachedRDD2.sql("select * from Sensor_Att")
      df2.show()


I get the following error:

javax.cache.CacheException: class
org.apache.ignite.internal.processors.query.IgniteSQLException: Failed to
parse query: select * from Sensor_Att
        at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:807)
        at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:765)
        at org.apache.ignite.spark.IgniteRDD.sql(IgniteRDD.scala:147)
        at
sensorApp.SensorDataProcessing$.sensorApp$SensorDataProcessing$$data_proces
        (...)


It seems that I can't derive a second IgniteContext from the same
SparkContext, because it seems that the "Data2" cache was not created.
Do you have any suggestions about this?

Thank you.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to