Hi Nikolay , Denis If i am disable option of streamer_overwrite , then it works fine .
df1.write .format(IgniteDataFrameSettings.FORMAT_IGNITE) .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, configPath) .option(IgniteDataFrameSettings.OPTION_TABLE, "ENTITY_PLAYABLE") .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "gameId,playableId,companyId,version") // .option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE, "true") .mode(SaveMode.Append) .save() but , after enabling " IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE" i get before mentioned errors intermittently , i guess it may be due to https://stackoverflow.com/questions/5763747/h2-in-memory-database-table-not-found . One more thing , if I create IgniteContext like val configPath = "/Users/harshal/Downloads/Ignite23-project/src/main/resources/META-INF/Ignite23-server.xml" val ic : IgniteContext = new IgniteContext(sc, configPath) like above , i am not able to inject dependencies , so I am doing public static IgniteConfiguration createConfiguration() throws Exception { IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setIgniteInstanceName("Ignite23"); TcpDiscoverySpi discovery = new TcpDiscoverySpi(); TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder(); ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47510")); discovery.setIpFinder(ipFinder); cfg.setDiscoverySpi(discovery); cfg.setCacheConfiguration(new CacheConfiguration[]{cacheEntityPlayableCache()}); return cfg; } public static CacheConfiguration cacheEntityPlayableCache() throws Exception { CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setName("EntityPlayableCache"); ccfg.setCacheMode(CacheMode.PARTITIONED); ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); CacheJdbcPojoStoreFactory cacheStoreFactory = new CacheJdbcPojoStoreFactory(); cacheStoreFactory.setDataSourceFactory(new Factory<DataSource>() { public DataSource create() { return ServerConfigurationFactory.DataSources.INSTANCE_dsPostgreSQL_Rein; } }); cacheStoreFactory.setDialect(new BasicJdbcDialect()); cacheStoreFactory.setTypes(new JdbcType[]{jdbcTypeEntityPlayable(ccfg.getName())}); cacheStoreFactory.setSqlEscapeAll(true); ccfg.setCacheStoreFactory(cacheStoreFactory); ccfg.setReadThrough(true); ccfg.setWriteThrough(true); ArrayList<QueryEntity> qryEntities = new ArrayList(); QueryEntity qryEntity = new QueryEntity(); qryEntity.setKeyType("com.gmail.patil.j.harshal.model.EntityPlayableKey"); qryEntity.setValueType("com.gmail.patil.j.harshal.model.EntityPlayable"); qryEntity.setTableName("entity_playable"); HashSet<String> keyFields = new HashSet(); keyFields.add("gameId"); keyFields.add("playableid"); keyFields.add("companyId"); keyFields.add("version"); qryEntity.setKeyFields(keyFields); LinkedHashMap<String, String> fields = new LinkedHashMap(); fields.put("gameId", "java.lang.Long"); fields.put("playableid", "java.lang.Long"); fields.put("companyId", "java.lang.Long"); fields.put("version", "java.lang.Integer"); fields.put("eventTimestamp", "java.sql.Timestamp"); fields.put("eventTimestampSys", "java.lang.Long"); fields.put("companyIdPartition", "java.lang.Long"); fields.put("partitionkey", "java.lang.Long"); qryEntity.setFields(fields); ArrayList<QueryIndex> indexes = new ArrayList(); QueryIndex index = new QueryIndex(); index.setName("company_id_partition_hash_entity_playable_hash"); index.setIndexType(QueryIndexType.SORTED); LinkedHashMap<String, Boolean> indFlds = new LinkedHashMap(); indFlds.put("companyIdPartition", false); index.setFields(indFlds); indexes.add(index); index = new QueryIndex(); index.setName("companyId_entity_playable_hash"); index.setIndexType(QueryIndexType.SORTED); indFlds = new LinkedHashMap(); indFlds.put("companyId", false); index.setFields(indFlds); indexes.add(index); index = new QueryIndex(); index.setName("gameId_entity_playable_hash"); index.setIndexType(QueryIndexType.SORTED); indFlds = new LinkedHashMap(); indFlds.put("gameId", false); index.setFields(indFlds); indexes.add(index); index = new QueryIndex(); index.setName("company_id_partition_entity_playable_normal"); index.setIndexType(QueryIndexType.SORTED); indFlds = new LinkedHashMap(); indFlds.put("companyIdPartition", false); index.setFields(indFlds); indexes.add(index); index = new QueryIndex(); index.setName("companyId_entity_playable_normal"); index.setIndexType(QueryIndexType.SORTED); indFlds = new LinkedHashMap(); indFlds.put("companyId", false); index.setFields(indFlds); indexes.add(index); index = new QueryIndex(); index.setName("gameId_entity_playable_normal"); index.setIndexType(QueryIndexType.SORTED); indFlds = new LinkedHashMap(); indFlds.put("gameId", false); index.setFields(indFlds); indexes.add(index); qryEntity.setIndexes(indexes); qryEntities.add(qryEntity); ccfg.setQueryEntities(qryEntities); return ccfg; } val cfg = () => ServerConfigurationFactory.createConfiguration() Ignition.start(cfg()) val ic : IgniteContext = new IgniteContext(sc, cfg) which can inject cacheConfigurations . <?xml version="1.0" encoding="UTF-8"?> <!-- This file was generated by Ignite Web Console (03/19/2019, 23:43) --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <!-- Load external properties file. --> <!--<bean id="placeholderConfig" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">--> <!--<property name="location" value="classpath:secret.properties"/>--> <!--</bean>--> <bean id="dsPostgreSQL_Rein" class="org.postgresql.ds.PGPoolingDataSource"> <property name="url" value="jdbc:postgresql://analyticstrack.caumccqvmegm.ap-southeast-1.rds.amazonaws.com:5432/rein"/> <property name="user" value="postgres"/> <property name="password" value="postgres"/> </bean> <bean class="org.apache.ignite.configuration.IgniteConfiguration"> <property name="igniteInstanceName" value="Ignite23"/> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <value>127.0.0.1:47500..47510</value> </list> </property> </bean> </property> </bean> </property> <property name="cacheConfiguration"> <list> <bean class="org.apache.ignite.configuration.CacheConfiguration"> <property name="name" value="EntityPlayableCache"/> <property name="cacheMode" value="PARTITIONED"/> <property name="atomicityMode" value="ATOMIC"/> <property name="cacheStoreFactory"> <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory"> <property name="dataSourceBean" value="dsPostgreSQL_Rein"/> <property name="dialect"> <bean class="org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect"> </bean> </property> <property name="sqlEscapeAll" value="true"></property> <property name="batchSize" value="1000"/> <property name="types"> <list> <bean class="org.apache.ignite.cache.store.jdbc.JdbcType"> <property name="cacheName" value="EntityPlayableCache"/> <property name="keyType" value="com.gmail.patil.j.harshal.model.EntityPlayableKey"/> <property name="valueType" value="com.gmail.patil.j.harshal.model.EntityPlayable"/> <property name="databaseSchema" value="public"/> <property name="databaseTable" value="entity_playable"/> <property name="keyFields"> <list> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <constructor-arg> <util:constant static-field="java.sql.Types.BIGINT"/> </constructor-arg> <constructor-arg value="gameId"/> <constructor-arg value="long"/> <constructor-arg value="gameId"/> </bean> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <constructor-arg> <util:constant static-field="java.sql.Types.BIGINT"/> </constructor-arg> <constructor-arg value="playableId"/> <constructor-arg value="long"/> <constructor-arg value="playableId"/> </bean> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <constructor-arg> <util:constant static-field="java.sql.Types.BIGINT"/> </constructor-arg> <constructor-arg value="companyId"/> <constructor-arg value="long"/> <constructor-arg value="companyId"/> </bean> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <constructor-arg> <util:constant static-field="java.sql.Types.INTEGER"/> </constructor-arg> <constructor-arg value="version"/> <constructor-arg value="int"/> <constructor-arg value="version"/> </bean> </list> </property> <property name="valueFields"> <list> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <constructor-arg> <util:constant static-field="java.sql.Types.TIMESTAMP"/> </constructor-arg> <constructor-arg value="eventTimestamp"/> <constructor-arg value="java.sql.Timestamp"/> <constructor-arg value="eventTimestamp"/> </bean> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <constructor-arg> <util:constant static-field="java.sql.Types.BIGINT"/> </constructor-arg> <constructor-arg value="eventTimestampSys"/> <constructor-arg value="java.lang.Long"/> <constructor-arg value="eventTimestampSys"/> </bean> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <constructor-arg> <util:constant static-field="java.sql.Types.BIGINT"/> </constructor-arg> <constructor-arg value="companyIdPartition"/> <constructor-arg value="java.lang.Long"/> <constructor-arg value="companyIdPartition"/> </bean> <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField"> <constructor-arg> <util:constant static-field="java.sql.Types.BIGINT"/> </constructor-arg> <constructor-arg value="partitionkey"/> <constructor-arg value="java.lang.Long"/> <constructor-arg value="partitionkey"/> </bean> </list> </property> </bean> </list> </property> </bean> </property> <property name="readThrough" value="true"/> <property name="writeBehindEnabled" value="true"/> <property name="writeBehindBatchSize" value="1000"/> <property name="writeBehindFlushSize" value="0"/> <property name="queryEntities"> <list> <bean class="org.apache.ignite.cache.QueryEntity"> <property name="keyType" value="com.gmail.patil.j.harshal.model.EntityPlayableKey"/> <property name="valueType" value="com.gmail.patil.j.harshal.model.EntityPlayable"/> <property name="tableName" value="entity_playable"/> <property name="keyFields"> <list> <value>gameId</value> <value>playableId</value> <value>companyId</value> <value>version</value> </list> </property> <property name="fields"> <map> <entry key="gameId" value="java.lang.Long"/> <entry key="playableId" value="java.lang.Long"/> <entry key="companyId" value="java.lang.Long"/> <entry key="version" value="java.lang.Integer"/> <entry key="eventTimestamp" value="java.sql.Timestamp"/> <entry key="eventTimestampSys" value="java.lang.Long"/> <entry key="companyIdPartition" value="java.lang.Long"/> <entry key="partitionkey" value="java.lang.Long"/> </map> </property> <property name="indexes"> <list> <bean class="org.apache.ignite.cache.QueryIndex"> <property name="name" value="company_id_partition_hash_entity_playable_hash"/> <property name="indexType" value="SORTED"/> <property name="fields"> <map> <entry key="companyIdPartition" value="false"/> </map> </property> </bean> <bean class="org.apache.ignite.cache.QueryIndex"> <property name="name" value="companyId_entity_playable_hash"/> <property name="indexType" value="SORTED"/> <property name="fields"> <map> <entry key="companyId" value="false"/> </map> </property> </bean> <bean class="org.apache.ignite.cache.QueryIndex"> <property name="name" value="gameId_entity_playable_hash"/> <property name="indexType" value="SORTED"/> <property name="fields"> <map> <entry key="gameId" value="false"/> </map> </property> </bean> <bean class="org.apache.ignite.cache.QueryIndex"> <property name="name" value="company_id_partition_entity_playable_normal"/> <property name="indexType" value="SORTED"/> <property name="fields"> <map> <entry key="companyIdPartition" value="false"/> </map> </property> </bean> <bean class="org.apache.ignite.cache.QueryIndex"> <property name="name" value="companyId_entity_playable_normal"/> <property name="indexType" value="SORTED"/> <property name="fields"> <map> <entry key="companyId" value="false"/> </map> </property> </bean> <bean class="org.apache.ignite.cache.QueryIndex"> <property name="name" value="gameId_entity_playable_normal"/> <property name="indexType" value="SORTED"/> <property name="fields"> <map> <entry key="gameId" value="false"/> </map> </property> </bean> </list> </property> </bean> </list> </property> </bean> </list> </property> </bean> and i have xml file , can you suggest here , is the issue because I am using xml file while writingdataframe but i have loaded cache from IgniteContext in which I have injected cacheConfiguration through function call . On Tue, Mar 26, 2019 at 4:05 PM Nikolay Izhikov <nizhi...@apache.org> wrote: > Hello, Harshal > > Can you, please, share your Ignite config? > Especially, "*ENTITY_PLAYABLE*" cache definition > > вт, 26 мар. 2019 г. в 05:35, Denis Magda <dma...@apache.org>: > >> Hi, as far as I can guess from the shared details, you should pass the >> IgniteCache name as a SQL schema if SQL metadata was configured via XML or >> annotations. Try this "INSERT INTO cacheName.ENTITY_PLAYABLE". >> >> - >> Denis >> >> >> On Mon, Mar 25, 2019 at 7:18 AM Harshal Patil < >> harshal.pa...@mindtickle.com> wrote: >> >>> Hi , >>> I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres >>> as cachePersistance store . After loading of cache , i can read and convert >>> data from ignite cache to Spark Dataframe . But while writing back to >>> ignite , I get below error >>> >>> class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table >>> "ENTITY_PLAYABLE" not found*; SQL statement: >>> >>> INSERT INTO >>> ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey) >>> VALUES(?,?,?,?,?,?,?,?) [42102-197] >>> >>> at >>> *org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery* >>> (IgniteH2Indexing.java:1302) >>> >>> at >>> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206) >>> >>> at >>> org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204) >>> >>> at >>> org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36) >>> >>> >>> >>> *Read from Ignite* : >>> >>> >>> loading cache >>> >>> >>> val conf = new SparkConf() >>> conf.setMaster("spark://harshal-patil.local:7077") >>> // conf.setMaster("local[*]") >>> conf.setAppName("IGniteTest") >>> conf.set("spark.executor.heartbeatInterval", "900s") >>> conf.set("spark.network.timeout", "950s") >>> conf.set("spark.default.parallelism", "4") >>> conf.set("spark.cores.max", "4") >>> >>> conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar") >>> >>> val cfg = () => ServerConfigurationFactory.createConfiguration() >>> >>> Ignition.start(ServerConfigurationFactory.createConfiguration()) >>> >>> val ic : IgniteContext = new IgniteContext(sc, cfg) >>> >>> ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_, >>> _]]) >>> >>> >>> >>> >>> *spark.read* >>> >>> .format(IgniteDataFrameSettings.*FORMAT_IGNITE*) >>> >>> .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath) >>> >>> .option(IgniteDataFrameSettings.*OPTION_TABLE*, >>> "ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"), >>> *count*("gameId").as("total")).collect()(0) >>> >>> >>> *Write To Ignite* : >>> >>> >>> *df.write* >>> >>> .format(IgniteDataFrameSettings.*FORMAT_IGNITE*) >>> >>> .option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath) >>> >>> >>> .option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE") >>> >>> .option(IgniteDataFrameSettings. >>> *OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*, >>> "gameId,playableId,companyId,version") >>> >>> .option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*, >>> "true") >>> >>> .mode(SaveMode.*Append*) >>> >>> .save() >>> >>> >>> >>> I think the problem is with *Spring bean Injection on executer node* , >>> please help , what i am doing wrong . >>> >>> >>> >>>