Dropping the entire batch even if one entry is invalid seems to be too 
stringent . Thoughts? 

Regards
Ananth

> On 6 Dec. 2016, at 5:08 pm, Priyanka Gugale <[email protected]> wrote:
> 
> Hi Max,
> 
> Right now the operator doesn't provide a way to set retry limit and drop the 
> statement / batch in case of exceptions. We should add this support to 
> CassandraTransactionalStore. Created a Malhar jira ticket to track this: 
> https://issues.apache.org/jira/browse/APEXMALHAR-2367
> 
> -Priyanka
> 
>> On Tue, Dec 6, 2016 at 12:51 AM, Max Bridgewater <[email protected]> 
>> wrote:
>> Thanks, that's exactly what happened. One of my input event had Id as null. 
>> What happened from there is what confused me. But it now makes complete 
>> sense. Because this event was not delivered successfully, Apex kept 
>> retrying. This means that messages that came after the malformed one were 
>> stuck. I wonder if there is a way to limit the number of retries or if this 
>> is left to the application layer.
>> 
>>> On Mon, Dec 5, 2016 at 11:53 AM, Priyanka Gugale <[email protected]> wrote:
>>> Is it possible that your input has "null" value for id field? id seems to 
>>> be your primary key, so it cannot accept null values.
>>> 
>>> -Priyanka
>>> 
>>>> On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater 
>>>> <[email protected]> wrote:
>>>> Folks,
>>>> 
>>>> I am trying to write sample stuff to Cassandra. The operator keeps dying 
>>>> and being restated. The failure trace is below. This failure happens even 
>>>> if no data is going through the pipeline.
>>>> 
>>>> Here is how I create the Cassandra operator:
>>>> 
>>>>         List<FieldInfo> fieldInfos = Lists.newArrayList();
>>>>         fieldInfos.add(new FieldInfo("id", "id", null));
>>>>         fieldInfos.add(new FieldInfo("city", "city", null));
>>>>         fieldInfos.add(new FieldInfo("fname", "firstName", null));
>>>>         fieldInfos.add(new FieldInfo("lname", "lastName", null));
>>>> 
>>>>         KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", new   
>>>>   KafkaSinglePortInputOperator());
>>>>         
>>>> in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
>>>>         JsonParser parser = dag.addOperator("jsonParser", new 
>>>> JsonParser());
>>>>         CassandraTransactionalStore transactionalStore = new 
>>>> CassandraTransactionalStore();
>>>>         CassandraPOJOOutputOperator out = new 
>>>> CassandraPOJOOutputOperator();
>>>>         out.setStore(transactionalStore);
>>>>         out.setFieldInfos(fieldInfos);
>>>>         dag.addOperator("CassandraDataWriter", out);
>>>>         dag.addStream("parse", in.outputPort, parser.in);
>>>>         dag.addStream("data", parser.out, out.input);
>>>> 
>>>> The json parser seems to work well and deserializes Kafka events into 
>>>> POJOs that I then want to write to Cassandra.
>>>> 
>>>> The Cassandra schema is as follows:
>>>> 
>>>> CREATE TABLE testapp.testuser (
>>>>     id uuid PRIMARY KEY,
>>>>     city text,
>>>>     fname text,
>>>>     lname text
>>>> ) WITH bloom_filter_fp_chance = 0.01
>>>>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>>>>     AND comment = ''
>>>>     AND compaction = {'class': 
>>>> 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
>>>>     AND compression = {'sstable_compression': 
>>>> 'org.apache.cassandra.io.compress.LZ4Compressor'}
>>>>     AND dclocal_read_repair_chance = 0.1
>>>>     AND default_time_to_live = 0
>>>>     AND gc_grace_seconds = 864000
>>>>     AND max_index_interval = 2048
>>>>     AND memtable_flush_period_in_ms = 0
>>>>     AND min_index_interval = 128
>>>>     AND read_repair_chance = 0.0
>>>>     AND speculative_retry = '99.0PERCENTILE';
>>>> 
>>>> Again, even without sending data, the exception happens. What am I 
>>>> missing? Any hint would be appreciated.
>>>> 
>>>> 2016-12-05 16:24:31,643 INFO 
>>>> com.datatorrent.common.util.AsyncFSStorageAgent: using 
>>>> /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/application_1480950234717_0002/container_1480950234717_0002_01_000137/tmp/chkp6701939091095420196
>>>>  as the basepath for checkpointing.
>>>> 2016-12-05 16:24:31,704 ERROR 
>>>> com.datatorrent.stram.engine.StreamingContainer: Operator set 
>>>> [OperatorDeployInfo[id=3,name=CassandraDataWriter,type=GENERIC,checkpoint={58458e1b00000a4f,
>>>>  0, 
>>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input,streamId=data,sourceNodeId=2,sourcePortName=out,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
>>>>  stopped running due to an exception.
>>>> com.datastax.driver.core.exceptions.InvalidQueryException: Invalid null 
>>>> value for partition key part id
>>>>    at 
>>>> com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
>>>>    at 
>>>> com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
>>>>    at 
>>>> com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
>>>>    at 
>>>> com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:64)
>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to