Hi Max, Right now the operator doesn't provide a way to set retry limit and drop the statement / batch in case of exceptions. We should add this support to CassandraTransactionalStore. Created a Malhar jira ticket to track this: https://issues.apache.org/jira/browse/APEXMALHAR-2367
-Priyanka On Tue, Dec 6, 2016 at 12:51 AM, Max Bridgewater <[email protected]> wrote: > Thanks, that's exactly what happened. One of my input event had Id as > null. What happened from there is what confused me. But it now makes > complete sense. Because this event was not delivered successfully, Apex > kept retrying. This means that messages that came after the malformed one > were stuck. I wonder if there is a way to limit the number of retries or if > this is left to the application layer. > > On Mon, Dec 5, 2016 at 11:53 AM, Priyanka Gugale <[email protected]> > wrote: > >> Is it possible that your input has "null" value for id field? id seems to >> be your primary key, so it cannot accept null values. >> >> -Priyanka >> >> On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater < >> [email protected]> wrote: >> >>> Folks, >>> >>> I am trying to write sample stuff to Cassandra. The operator keeps dying >>> and being restated. The failure trace is below. This failure happens even >>> if no data is going through the pipeline. >>> >>> Here is how I create the Cassandra operator: >>> >>> List<FieldInfo> fieldInfos = Lists.newArrayList(); >>> fieldInfos.add(new FieldInfo("id", "id", null)); >>> fieldInfos.add(new FieldInfo("city", "city", null)); >>> fieldInfos.add(new FieldInfo("fname", "firstName", null)); >>> fieldInfos.add(new FieldInfo("lname", "lastName", null)); >>> >>> KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", >>> new KafkaSinglePortInputOperator()); >>> in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset >>> .EARLIEST.name()); >>> JsonParser parser = dag.addOperator("jsonParser", new >>> JsonParser()); >>> CassandraTransactionalStore transactionalStore = new >>> CassandraTransactionalStore(); >>> CassandraPOJOOutputOperator out = new >>> CassandraPOJOOutputOperator(); >>> out.setStore(transactionalStore); >>> out.setFieldInfos(fieldInfos); >>> dag.addOperator("CassandraDataWriter", out); >>> dag.addStream("parse", in.outputPort, parser.in); >>> dag.addStream("data", parser.out, out.input); >>> >>> The json parser seems to work well and deserializes Kafka events into >>> POJOs that I then want to write to Cassandra. >>> >>> The Cassandra schema is as follows: >>> >>> CREATE TABLE testapp.testuser ( >>> id uuid PRIMARY KEY, >>> city text, >>> fname text, >>> lname text >>> ) WITH bloom_filter_fp_chance = 0.01 >>> AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' >>> AND comment = '' >>> AND compaction = {'class': 'org.apache.cassandra.db.compa >>> ction.SizeTieredCompactionStrategy'} >>> AND compression = {'sstable_compression': 'org.apache.cassandra.io >>> .compress.LZ4Compressor'} >>> AND dclocal_read_repair_chance = 0.1 >>> AND default_time_to_live = 0 >>> AND gc_grace_seconds = 864000 >>> AND max_index_interval = 2048 >>> AND memtable_flush_period_in_ms = 0 >>> AND min_index_interval = 128 >>> AND read_repair_chance = 0.0 >>> AND speculative_retry = '99.0PERCENTILE'; >>> >>> Again, even without sending data, the exception happens. What am I >>> missing? Any hint would be appreciated. >>> >>> 2016-12-05 16:24:31,643 INFO >>> com.datatorrent.common.util.AsyncFSStorageAgent: >>> using /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/appl >>> ication_1480950234717_0002/container_1480950234717_0002_01_ >>> 000137/tmp/chkp6701939091095420196 as the basepath for checkpointing. >>> 2016-12-05 16:24:31,704 ERROR >>> com.datatorrent.stram.engine.StreamingContainer: >>> Operator set [OperatorDeployInfo[id=3,name= >>> CassandraDataWriter,type=GENERIC,checkpoint={58458e1b00000a4f, 0, >>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input >>> ,streamId=data,sourceNodeId=2,sourcePortName=out,locality=<n >>> ull>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] stopped >>> running due to an exception. >>> com.datastax.driver.core.exceptions.InvalidQueryException: Invalid null >>> value for partition key part id >>> at com.datastax.driver.core.exceptions.InvalidQueryException.co >>> py(InvalidQueryException.java:50) >>> at com.datastax.driver.core.DriverThrowables.propagateCause(Dri >>> verThrowables.java:37) >>> at com.datastax.driver.core.DefaultResultSetFuture.getUninterru >>> ptibly(DefaultResultSetFuture.java:245) >>> at com.datastax.driver.core.AbstractSession.execute(AbstractSes >>> sion.java:64) >>> >>> >>> >> >
