Hello,

It looks like a known issue
https://issues.apache.org/jira/browse/IGNITE-8788. Unfortunately, it is not
resolved yet.
I would suggest the following workaround:

1. create your own CacheStoreFactory

public class CustomCassandraCacheStoreFactory extends
CassandraCacheStoreFactory {
    private final String persistenceSettingsXml;

    public CustomCassandraCacheStoreFactory(String persistenceSettingsXml) {
        this.persistenceSettingsXml = persistenceSettingsXml;
    }

    @Override public CassandraCacheStore create() {
        setPersistenceSettings(new
KeyValuePersistenceSettings(persistenceSettingsXml));
        return super.create();
    }
}



2. configure Ignite cache as follows

CacheConfiguration<String, Integer> configuration = new CacheConfiguration<>();

configuration.setName("test-cache");
configuration.setCacheMode(CacheMode.PARTITIONED);
...

String persistenceSettingsXml = new
ClassPathResource("cassandra.xml").getPath();
CassandraCacheStoreFactory cacheStoreFactory = new
CustomCassandraCacheStoreFactory(persistenceSettingsXml);
cacheStoreFactory.setDataSource(dataSource);

configuration.setCacheStoreFactory(cacheStoreFactory);
configuration.setWriteThrough(true);
configuration.setWriteBehindEnabled(true);
...

I assume that `cassandra.xml` is available on all nodes.

Thanks,
S.


пн, 11 февр. 2019 г. в 20:15, Chris Genrich <chris.genr...@aexp.com>:

> I’ve created a small project to demonstrate issues I’ve had with Cassandra
> write behind configuration: https://github.com/cgenrich/ignite-cassandra
>
>
>
> My intention is to use a partitioned ignite cache that will persist
> asynchronously to Cassandra.  When using a single node cluster, the data is
> sent to Cassandra as expected.
>
>
>
> I’d appreciate any suggested changes to the configuration:
>
>
>
>               DataSource ds = *new* DataSource();
>
>               ds.setPort(9042);
>
>               ds.setContactPoints("localhost");
>
>               ds.setReconnectionPolicy(*new*
> ConstantReconnectionPolicy(1000));
>
>               ds.setAuthProvider(*new* PlainTextAuthProvider(*null*,
> *null*));
>
>               ds.setReadConsistency("ONE");
>
>               ds.setWriteConsistency("ONE");
>
>               ds.setLoadBalancingPolicy(*new* TokenAwarePolicy(*new*
> RoundRobinPolicy()));
>
>               Ignition.*getOrStart*(*localCluster*(*new*
> IgniteConfiguration()))
>
>                            .getOrCreateCache(*new* CacheConfiguration<>(
> *WRITEBEHIND*).setCacheMode(CacheMode.*PARTITIONED*)
>
>
> .setExpiryPolicyFactory(TouchedExpiryPolicy.*factoryOf*(Duration.
> *ONE_HOUR*))
>
>
> .setWriteSynchronizationMode(CacheWriteSynchronizationMode.*FULL_SYNC*
> ).setBackups(1)
>
>                                          .setReadThrough(*false*
> ).setWriteThrough(*true*).setWriteBehindEnabled(*true*)
>
>
> .setWriteBehindFlushFrequency(100).setCopyOnRead(*false*
> ).setCacheStoreFactory(
>
>                                                        *new*
> CassandraCacheStoreFactory<>().setDataSource(ds).setPersistenceSettings(
>
>                                                                      *new*
>  KeyValuePersistenceSettings(*new* ClassPathResource("cassandra.xml")))));
>
>
>
> <?xml version=*"1.0"* encoding=*"UTF-8"*?>
>
> <persistence keyspace=*"ks"* table=*"tbl"*>
>
>        <keyPersistence class=*"com.example.ignite.cassandra.CompositeKey"*
>  strategy=*"POJO"*>
>
>               <partitionKey>
>
>                      <field name=*"first"* column=*"first"* />
>
>               </partitionKey>
>
>               <clusterKey>
>
>                      <field name=*"first"* column=*"first"* />
>
>                      <field name=*"second"* column=*"second"* />
>
>               </clusterKey>
>
>        </keyPersistence>
>
>        <valuePersistence class=
> *"com.example.ignite.cassandra.CompositeValue"* strategy=*"POJO"*>
>
>               <field name=*"third"* column=*"third"* />
>
>               <field name=*"fourth"* column=*"fourth"* />
>
>               <field name=*"fifth"* column=*"fifth"* />
>
>               <field name=*"sixth"* column=*"sixth"* />
>
>               <field name=*"seventh"* column=*"seventh"* />
>
>               <field name=*"eighth"* column=*"eighth"* />
>
>        </valuePersistence>
>
> </persistence>
>
>
>
>
>
> With larger clusters I’ve encountered issues such as the following errors:
>
>
>
> 2019-02-11 09:59:44 ERROR GridDhtPartitionsExchangeFuture:161 - Failed to
> initialize cache(s) (will try to rollback). GridDhtPartitionExchangeId
> [topVer=AffinityTopologyVersion [topVer=8, minorTopVer=1],
> discoEvt=DiscoveryCustomEvent [customMsg=DynamicCacheChangeBatch
> [id=1bbff7dd861-4da0b1ed-6b7a-418f-9d65-d57552e74a30,
> reqs=[DynamicCacheChangeRequest [cacheName=writebehind, hasCfg=true,
> nodeId=813dd53e-5e05-4e4d-9f63-5a41634937c0, clientStartOnly=false,
> stop=false, destroy=false, disabledAfterStartfalse]],
> exchangeActions=ExchangeActions [startCaches=[writebehind],
> stopCaches=null, startGrps=[writebehind], stopGrps=[], resetParts=null,
> stateChangeRequest=null], startCaches=false],
> affTopVer=AffinityTopologyVersion [topVer=8, minorTopVer=1],
> super=DiscoveryEvent [evtNode=TcpDiscoveryNode
> [id=813dd53e-5e05-4e4d-9f63-5a41634937c0, addrs=[0:0:0:0:0:0:0:1%lo0,
> 10.68.228.62, 127.0.0.1], sockAddrs=[/0:0:0:0:0:0:0:1%lo0:5021, /
> 127.0.0.1:5021], discPort=5021, order=1, intOrder=1,
> lastExchangeTime=1549904383332, loc=false,
> ver=2.7.0#20181130-sha1:256ae401, isClient=false], topVer=8,
> nodeId8=eca7beb4, msg=null, type=DISCOVERY_CUSTOM_EVT,
> tstamp=1549904384518]], nodeId=813dd53e, evt=DISCOVERY_CUSTOM_EVT]
>
> java.lang.NullPointerException
>
>        at
> org.apache.ignite.cache.store.cassandra.persistence.PojoField.calculatedField(PojoField.java:155)
>
>        at
> org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.prepareLoadStatements(PersistenceController.java:297)
>
>        at
> org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.<init>(PersistenceController.java:85)
>
>        at
> org.apache.ignite.cache.store.cassandra.CassandraCacheStore.<init>(CassandraCacheStore.java:106)
>
>        at
> org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory.create(CassandraCacheStoreFactory.java:59)
>
>        at
> org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory.create(CassandraCacheStoreFactory.java:34)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCacheProcessor.createCache(GridCacheProcessor.java:1558)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCacheProcessor.prepareCacheStart(GridCacheProcessor.java:2146)
>
>        at
> org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager.processCacheStartRequests(CacheAffinitySharedManager.java:898)
>
>        at
> org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager.onCacheChangeRequest(CacheAffinitySharedManager.java:798)
>
>        at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.onCacheChangeRequest(GridDhtPartitionsExchangeFuture.java:1231)
>
>        at
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.init(GridDhtPartitionsExchangeFuture.java:738)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(GridCachePartitionExchangeManager.java:2667)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(GridCachePartitionExchangeManager.java:2539)
>
>        at
> org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)
>
>        at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
>
>
> SEVERE: Failed to process 1 of 1 elements, during BULK_WRITE operation
> with Cassandra
>
> class org.apache.ignite.IgniteException: Failed to execute Cassandra
> BULK_WRITE operation
>
>        at
> org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:268)
>
>        at
> org.apache.ignite.cache.store.cassandra.CassandraCacheStore.writeAll(CassandraCacheStore.java:355)
>
>        at
> org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.updateStore(GridCacheWriteBehindStore.java:816)
>
>        at
> org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.applyBatch(GridCacheWriteBehindStore.java:726)
>
>        at
> org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.access$2400(GridCacheWriteBehindStore.java:76)
>
>        at
> org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.flushCacheCoalescing(GridCacheWriteBehindStore.java:1147)
>
>        at
> org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.body(GridCacheWriteBehindStore.java:1018)
>
>        at
> org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
>
>        at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.NullPointerException
>
>        at
> org.apache.ignite.cache.store.cassandra.persistence.PojoField.getValueFromObject(PojoField.java:167)
>
>        at
> org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.bindValues(PersistenceController.java:450)
>
>        at
> org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.bindKeyValue(PersistenceController.java:202)
>
>        at
> org.apache.ignite.cache.store.cassandra.CassandraCacheStore$4.bindStatement(CassandraCacheStore.java:369)
>
>        at
> org.apache.ignite.cache.store.cassandra.CassandraCacheStore$4.bindStatement(CassandraCacheStore.java:355)
>
>        at
> org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:230)
>
>        ... 8 more
>
>
>
>
>
> Client node when trying to view the Size of the Ignite Cache:
>
> SEVERE: Critical system error detected. Will be handled accordingly to
> configured handler [hnd=class o.a.i.failure.StopNodeOrHaltFailureHandler,
> failureCtx=FailureContext [type=SYSTEM_WORKER_TERMINATION,
> err=java.lang.AssertionError]]
>
> java.lang.AssertionError
>
>        at
> org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.<init>(
> *GridCacheWriteBehindStore.java:914*)
>
>        at
> org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.start(
> *GridCacheWriteBehindStore.java:342*)
>
>        at
> org.apache.ignite.internal.processors.cache.store.GridCacheStoreManagerAdapter.start0(
> *GridCacheStoreManagerAdapter.java:205*)
>
>        at
> org.apache.ignite.internal.processors.cache.store.CacheOsStoreManager.start0(
> *CacheOsStoreManager.java:64*)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter.start(
> *GridCacheManagerAdapter.java:50*)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCacheProcessor.startCache(
> *GridCacheProcessor.java:1188*)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCacheProcessor.prepareCacheStart(
> *GridCacheProcessor.java:1964*)
>
>        at
> org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager.processClientCacheStartRequests(
> *CacheAffinitySharedManager.java:438*)
>
>        at
> org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager.processClientCachesChanges(
> *CacheAffinitySharedManager.java:622*)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCacheProcessor.processCustomExchangeTask(
> *GridCacheProcessor.java:376*)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.processCustomTask(
> *GridCachePartitionExchangeManager.java:2235*)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body0(
> *GridCachePartitionExchangeManager.java:2371*)
>
>        at
> org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager$ExchangeWorker.body(
> *GridCachePartitionExchangeManager.java:2299*)
>
>        at org.apache.ignite.internal.util.worker.GridWorker.run(
> *GridWorker.java:110*)
>
>        at java.lang.Thread.run(*Thread.java:748*)
>
>
>
>
> American Express made the following annotations
>
> ------------------------------
>
>
> "This message and any attachments are solely for the intended recipient
> and may contain confidential or privileged information. If you are not the
> intended recipient, any disclosure, copying, use, or distribution of the
> information
>
> included in this message and any attachments is prohibited. If you have
> received this communication in error, please notify us by reply e-mail and
> immediately and permanently delete this message and any attachments. Thank
> you."
>
>
> American Express a ajouté le commentaire suivant le
>
> Ce courrier et toute pièce jointe qu'il contient sont réservés au seul
> destinataire indiqué et peuvent renfermer des renseignements confidentiels
> et privilégiés. Si vous n'êtes pas le destinataire prévu, toute
> divulgation, duplication, utilisation ou distribution du courrier ou de
> toute pièce jointe est interdite. Si vous avez reçu cette communication par
> erreur, veuillez nous en aviser par courrier et détruire immédiatement le
> courrier et les pièces jointes. Merci.
>
> ------------------------------
>

Reply via email to