Hi Chris, Please try set the cache configuration for IgniteConfiguration as showed below:
CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>(WRITEBEHIND).setCacheMode(CacheMode.PARTITIONED) .setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(Duration.ONE_HOUR)) .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC).setBackups(1) .setReadThrough(false).setWriteThrough(true).setWriteBehindEnabled(true) .setWriteBehindFlushFrequency(100).setCopyOnRead(false).setCacheStoreFactory( new CassandraCacheStoreFactory<>().setDataSource(ds).setPersistenceSettings( new KeyValuePersistenceSettings(new ClassPathResource("cassandra.xml")))); Ignition.getOrStart(localCluster(new IgniteConfiguration().setCacheConfiguration(cacheCfg))); Thanks, Pavel пн, 11 февр. 2019 г. в 20:15, Chris Genrich <chris.genr...@aexp.com>: > I’ve created a small project to demonstrate issues I’ve had with Cassandra > write behind configuration: https://github.com/cgenrich/ignite-cassandra > > > > My intention is to use a partitioned ignite cache that will persist > asynchronously to Cassandra. When using a single node cluster, the data is > sent to Cassandra as expected. > > > > I’d appreciate any suggested changes to the configuration: > > > > DataSource ds = *new* DataSource(); > > ds.setPort(9042); > > ds.setContactPoints("localhost"); > > ds.setReconnectionPolicy(*new* > ConstantReconnectionPolicy(1000)); > > ds.setAuthProvider(*new* PlainTextAuthProvider(*null*, > *null*)); > > ds.setReadConsistency("ONE"); > > ds.setWriteConsistency("ONE"); > > ds.setLoadBalancingPolicy(*new* TokenAwarePolicy(*new* > RoundRobinPolicy())); > > Ignition.*getOrStart*(*localCluster*(*new* > IgniteConfiguration())) > > .getOrCreateCache(*new* CacheConfiguration<>( > *WRITEBEHIND*).setCacheMode(CacheMode.*PARTITIONED*) > > > .setExpiryPolicyFactory(TouchedExpiryPolicy.*factoryOf*(Duration. > *ONE_HOUR*)) > > > .setWriteSynchronizationMode(CacheWriteSynchronizationMode.*FULL_SYNC* > ).setBackups(1) > > .setReadThrough(*false* > ).setWriteThrough(*true*).setWriteBehindEnabled(*true*) > > > .setWriteBehindFlushFrequency(100).setCopyOnRead(*false* > ).setCacheStoreFactory( > > *new* > CassandraCacheStoreFactory<>().setDataSource(ds).setPersistenceSettings( > > *new* > KeyValuePersistenceSettings(*new* ClassPathResource("cassandra.xml"))))); > > > > <?xml version=*"1.0"* encoding=*"UTF-8"*?> > > <persistence keyspace=*"ks"* table=*"tbl"*> > > <keyPersistence class=*"com.example.ignite.cassandra.CompositeKey"* > strategy=*"POJO"*> > > <partitionKey> > > <field name=*"first"* column=*"first"* /> > > </partitionKey> > > <clusterKey> > > <field name=*"first"* column=*"first"* /> > > <field name=*"second"* column=*"second"* /> > > </clusterKey> > > </keyPersistence> > > <valuePersistence class= > *"com.example.ignite.cassandra.CompositeValue"* strategy=*"POJO"*> > > <field name=*"third"* column=*"third"* /> > > <field name=*"fourth"* column=*"fourth"* /> > > <field name=*"fifth"* column=*"fifth"* /> > > <field name=*"sixth"* column=*"sixth"* /> > > <field name=*"seventh"* column=*"seventh"* /> > > <field name=*"eighth"* column=*"eighth"* /> > > </valuePersistence> > > </persistence> > > > > > > With larger clusters I’ve encountered issues such as the following errors: > > > > 2019-02-11 09:59:44 ERROR GridDhtPartitionsExchangeFuture:161 - Failed to > initialize cache(s) (will try to rollback). GridDhtPartitionExchangeId > [topVer=AffinityTopologyVersion [topVer=8, minorTopVer=1], > discoEvt=DiscoveryCustomEvent [customMsg=DynamicCacheChangeBatch > [id=1bbff7dd861-4da0b1ed-6b7a-418f-9d65-d57552e74a30, > reqs=[DynamicCacheChangeRequest [cacheName=writebehind, hasCfg=true, > nodeId=813dd53e-5e05-4e4d-9f63-5a41634937c0, clientStartOnly=false, > stop=false, destroy=false, disabledAfterStartfalse]], > exchangeActions=ExchangeActions [startCaches=[writebehind], > stopCaches=null, startGrps=[writebehind], stopGrps=[], resetParts=null, > stateChangeRequest=null], startCaches=false], > affTopVer=AffinityTopologyVersion [topVer=8, minorTopVer=1], > super=DiscoveryEvent [evtNode=TcpDiscoveryNode > [id=813dd53e-5e05-4e4d-9f63-5a41634937c0, addrs=[0:0:0:0:0:0:0:1%lo0, > 10.68.228.62, 127.0.0.1], sockAddrs=[/0:0:0:0:0:0:0:1%lo0:5021, / > 127.0.0.1:5021], discPort=5021, order=1, intOrder=1, > lastExchangeTime=1549904383332, loc=false, > ver=2.7.0#20181130-sha1:256ae401, isClient=false], topVer=8, > nodeId8=eca7beb4, msg=null, type=DISCOVERY_CUSTOM_EVT, > tstamp=1549904384518]], nodeId=813dd53e, evt=DISCOVERY_CUSTOM_EVT] > > java.lang.NullPointerException > > at > org.apache.ignite.cache.store.cassandra.persistence.PojoField.calculatedField(PojoField.java:155) > > at > org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.prepareLoadStatements(PersistenceController.java:297) > > at > org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.<init>(PersistenceController.java:85) > > at > org.apache.ignite.cache.store.cassandra.CassandraCacheStore.<init>(CassandraCacheStore.java:106) > > at > org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory.create(CassandraCacheStoreFactory.java:59) > > at > org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory.create(CassandraCacheStoreFactory.java:34) > > at > org.apache.ignite.internal.processors.cache.GridCacheProcessor.createCache(GridCacheProcessor.java:1558) > > at > org.apache.ignite.internal.processors.cache.GridCacheProcessor.prepareCacheStart(GridCacheProcessor.java:2146) > > at > org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager.processCacheStartRequests(CacheAffinitySharedManager.java:898) > > at > org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager.onCacheChangeRequest(CacheAffinitySharedManager.java:798) > > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onCacheChangeRequest(GridDhtPartitionsExchangeFuture.java:1231) > > at > org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:738) > > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:2667) > > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:2539) > > at > org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120) > > at java.lang.Thread.run(Thread.java:748) > > > > > > > > SEVERE: Failed to process 1 of 1 elements, during BULK_WRITE operation > with Cassandra > > class org.apache.ignite.IgniteException: Failed to execute Cassandra > BULK_WRITE operation > > at > org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:268) > > at > org.apache.ignite.cache.store.cassandra.CassandraCacheStore.writeAll(CassandraCacheStore.java:355) > > at > org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.updateStore(GridCacheWriteBehindStore.java:816) > > at > org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.applyBatch(GridCacheWriteBehindStore.java:726) > > at > org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.access$2400(GridCacheWriteBehindStore.java:76) > > at > org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.flushCacheCoalescing(GridCacheWriteBehindStore.java:1147) > > at > org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.body(GridCacheWriteBehindStore.java:1018) > > at > org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.NullPointerException > > at > org.apache.ignite.cache.store.cassandra.persistence.PojoField.getValueFromObject(PojoField.java:167) > > at > org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.bindValues(PersistenceController.java:450) > > at > org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.bindKeyValue(PersistenceController.java:202) > > at > org.apache.ignite.cache.store.cassandra.CassandraCacheStore$4.bindStatement(CassandraCacheStore.java:369) > > at > org.apache.ignite.cache.store.cassandra.CassandraCacheStore$4.bindStatement(CassandraCacheStore.java:355) > > at > org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:230) > > ... 8 more > > > > > > Client node when trying to view the Size of the Ignite Cache: > > SEVERE: Critical system error detected. Will be handled accordingly to > configured handler [hnd=class o.a.i.failure.StopNodeOrHaltFailureHandler, > failureCtx=FailureContext [type=SYSTEM_WORKER_TERMINATION, > err=java.lang.AssertionError]] > > java.lang.AssertionError > > at > org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.<init>( > *GridCacheWriteBehindStore.java:914*) > > at > org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.start( > *GridCacheWriteBehindStore.java:342*) > > at > org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.start0( > *GridCacheStoreManagerAdapter.java:205*) > > at > org.apache.ignite.internal.processors.cache.store.CacheOsStoreManager.start0( > *CacheOsStoreManager.java:64*) > > at > org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter.start( > *GridCacheManagerAdapter.java:50*) > > at > org.apache.ignite.internal.processors.cache.GridCacheProcessor.startCache( > *GridCacheProcessor.java:1188*) > > at > org.apache.ignite.internal.processors.cache.GridCacheProcessor.prepareCacheStart( > *GridCacheProcessor.java:1964*) > > at > org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager.processClientCacheStartRequests( > *CacheAffinitySharedManager.java:438*) > > at > org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager.processClientCachesChanges( > *CacheAffinitySharedManager.java:622*) > > at > org.apache.ignite.internal.processors.cache.GridCacheProcessor.processCustomExchangeTask( > *GridCacheProcessor.java:376*) > > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.processCustomTask( > *GridCachePartitionExchangeManager.java:2235*) > > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0( > *GridCachePartitionExchangeManager.java:2371*) > > at > org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body( > *GridCachePartitionExchangeManager.java:2299*) > > at org.apache.ignite.internal.util.worker.GridWorker.run( > *GridWorker.java:110*) > > at java.lang.Thread.run(*Thread.java:748*) > > > > > American Express made the following annotations > > ------------------------------ > > > "This message and any attachments are solely for the intended recipient > and may contain confidential or privileged information. If you are not the > intended recipient, any disclosure, copying, use, or distribution of the > information > > included in this message and any attachments is prohibited. If you have > received this communication in error, please notify us by reply e-mail and > immediately and permanently delete this message and any attachments. Thank > you." > > > American Express a ajouté le commentaire suivant le > > Ce courrier et toute pièce jointe qu'il contient sont réservés au seul > destinataire indiqué et peuvent renfermer des renseignements confidentiels > et privilégiés. Si vous n'êtes pas le destinataire prévu, toute > divulgation, duplication, utilisation ou distribution du courrier ou de > toute pièce jointe est interdite. Si vous avez reçu cette communication par > erreur, veuillez nous en aviser par courrier et détruire immédiatement le > courrier et les pièces jointes. Merci. > > ------------------------------ > -- Regards Pavel Vinokurov