Hello there,
My use case is relatively simple: I want to be able to save an RDD in Spark
to two different caches in Ignite, inside the same Spark Context.
When I try to save an RDD to a single IgniteCache, everything works well:
case class Sensor_Att(
@(QuerySqlField @field)(index = false) active:
String,
@(QuerySqlField @field)(index = false) `type`:
String,
@(QuerySqlField @field)(index = true) name:
String,
)
val sqlContext: SparkSession =
SparkSession.builder().master("local[*]").appName("DataProcessing").getOrCreate()
val sc: SparkContext = sqlContext.sparkContext
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
val ignitec:IgniteContext = new IgniteContext(sc,()=>new
IgniteConfiguration().setClientMode(false).setLocalHost("127.0.0.1").setActiveOnStart(true).
setCacheConfiguration(new
CacheConfiguration[String,Sensor_Att]().setIndexedTypes(classOf[String],classOf[Sensor_Att]).setName("sensorData").
setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC)).setDiscoverySpi(new
TcpDiscoverySpi().
setLocalAddress("127.0.0.1").setLocalPort(47090).setIpFinder(new
TcpDiscoveryMulticastIpFinder().
setAddresses(new util.ArrayList[String]))).setCommunicationSpi(new
TcpCommunicationSpi().setLocalPort(47000)))
val cachedRDD:IgniteRDD[String,Sensor_Att]=ignitec.fromCache("Data1")
val RDD_with_key: RDD[(String, Sensor_Att)]
=df_RDD_NEW_CLASS.map(x=>(x.name,x))
cachedRDD.savePairs(RDD_with_key)
val df=cachedRDD.sql("select * from Sensor_Att")
df.show()
If however I try to add a second IgniteContext, using the same class as an
index, and try to save an RDD to its cache, like so:
(code above...)
val ignitec=...
val ignitec2:IgniteContext = new IgniteContext(sc,()=>new
IgniteConfiguration().setClientMode(false).setLocalHost("127.0.0.1").setActiveOnStart(true).
setCacheConfiguration(new
CacheConfiguration[String,Sensor_Att]().setIndexedTypes(classOf[String],classOf[Sensor_Att]).setName("historicsensorData").
setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC)).setDiscoverySpi(new
TcpDiscoverySpi().
setLocalAddress("127.0.0.1").setLocalPort(47091).setIpFinder(new
TcpDiscoveryMulticastIpFinder().
setAddresses(new util.ArrayList[String]))).setCommunicationSpi(new
TcpCommunicationSpi().setLocalPort(47007)))
(code above....)
val df=cachedRDD.sql("select * from Sensor_Att")
df.show()
val cachedRDD2:IgniteRDD[String,Sensor_Att]=ignitec.fromCache("Data2")
cachedRDD2.savePairs(RDD_with_key)
val df2=cachedRDD2.sql("select * from Sensor_Att")
df2.show()
I get the following error:
javax.cache.CacheException: class
org.apache.ignite.internal.processors.query.IgniteSQLException: Failed to
parse query: select * from Sensor_Att
at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:807)
at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:765)
at org.apache.ignite.spark.IgniteRDD.sql(IgniteRDD.scala:147)
at
sensorApp.SensorDataProcessing$.sensorApp$SensorDataProcessing$$data_proces
(...)
It seems that I can't derive a second IgniteContext from the same
SparkContext, because it seems that the "Data2" cache was not created.
Do you have any suggestions about this?
Thank you.
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/