Dropping the entire batch even if one entry is invalid seems to be too stringent . Thoughts?
Regards Ananth > On 6 Dec. 2016, at 5:08 pm, Priyanka Gugale <[email protected]> wrote: > > Hi Max, > > Right now the operator doesn't provide a way to set retry limit and drop the > statement / batch in case of exceptions. We should add this support to > CassandraTransactionalStore. Created a Malhar jira ticket to track this: > https://issues.apache.org/jira/browse/APEXMALHAR-2367 > > -Priyanka > >> On Tue, Dec 6, 2016 at 12:51 AM, Max Bridgewater <[email protected]> >> wrote: >> Thanks, that's exactly what happened. One of my input event had Id as null. >> What happened from there is what confused me. But it now makes complete >> sense. Because this event was not delivered successfully, Apex kept >> retrying. This means that messages that came after the malformed one were >> stuck. I wonder if there is a way to limit the number of retries or if this >> is left to the application layer. >> >>> On Mon, Dec 5, 2016 at 11:53 AM, Priyanka Gugale <[email protected]> wrote: >>> Is it possible that your input has "null" value for id field? id seems to >>> be your primary key, so it cannot accept null values. >>> >>> -Priyanka >>> >>>> On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater >>>> <[email protected]> wrote: >>>> Folks, >>>> >>>> I am trying to write sample stuff to Cassandra. The operator keeps dying >>>> and being restated. The failure trace is below. This failure happens even >>>> if no data is going through the pipeline. >>>> >>>> Here is how I create the Cassandra operator: >>>> >>>> List<FieldInfo> fieldInfos = Lists.newArrayList(); >>>> fieldInfos.add(new FieldInfo("id", "id", null)); >>>> fieldInfos.add(new FieldInfo("city", "city", null)); >>>> fieldInfos.add(new FieldInfo("fname", "firstName", null)); >>>> fieldInfos.add(new FieldInfo("lname", "lastName", null)); >>>> >>>> KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new >>>> KafkaSinglePortInputOperator()); >>>> >>>> in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); >>>> JsonParser parser = dag.addOperator("jsonParser", new >>>> JsonParser()); >>>> CassandraTransactionalStore transactionalStore = new >>>> CassandraTransactionalStore(); >>>> CassandraPOJOOutputOperator out = new >>>> CassandraPOJOOutputOperator(); >>>> out.setStore(transactionalStore); >>>> out.setFieldInfos(fieldInfos); >>>> dag.addOperator("CassandraDataWriter", out); >>>> dag.addStream("parse", in.outputPort, parser.in); >>>> dag.addStream("data", parser.out, out.input); >>>> >>>> The json parser seems to work well and deserializes Kafka events into >>>> POJOs that I then want to write to Cassandra. >>>> >>>> The Cassandra schema is as follows: >>>> >>>> CREATE TABLE testapp.testuser ( >>>> id uuid PRIMARY KEY, >>>> city text, >>>> fname text, >>>> lname text >>>> ) WITH bloom_filter_fp_chance = 0.01 >>>> AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' >>>> AND comment = '' >>>> AND compaction = {'class': >>>> 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'} >>>> AND compression = {'sstable_compression': >>>> 'org.apache.cassandra.io.compress.LZ4Compressor'} >>>> AND dclocal_read_repair_chance = 0.1 >>>> AND default_time_to_live = 0 >>>> AND gc_grace_seconds = 864000 >>>> AND max_index_interval = 2048 >>>> AND memtable_flush_period_in_ms = 0 >>>> AND min_index_interval = 128 >>>> AND read_repair_chance = 0.0 >>>> AND speculative_retry = '99.0PERCENTILE'; >>>> >>>> Again, even without sending data, the exception happens. What am I >>>> missing? Any hint would be appreciated. >>>> >>>> 2016-12-05 16:24:31,643 INFO >>>> com.datatorrent.common.util.AsyncFSStorageAgent: using >>>> /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/application_1480950234717_0002/container_1480950234717_0002_01_000137/tmp/chkp6701939091095420196 >>>> as the basepath for checkpointing. >>>> 2016-12-05 16:24:31,704 ERROR >>>> com.datatorrent.stram.engine.StreamingContainer: Operator set >>>> [OperatorDeployInfo[id=3,name=CassandraDataWriter,type=GENERIC,checkpoint={58458e1b00000a4f, >>>> 0, >>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=data,sourceNodeId=2,sourcePortName=out,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] >>>> stopped running due to an exception. >>>> com.datastax.driver.core.exceptions.InvalidQueryException: Invalid null >>>> value for partition key part id >>>> at >>>> com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50) >>>> at >>>> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37) >>>> at >>>> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245) >>>> at >>>> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:64) >>>> >>>> >>> >> >
