Hi Max,

Right now the operator doesn't provide a way to set retry limit and drop
the statement / batch in case of exceptions. We should add this support
to CassandraTransactionalStore. Created a Malhar jira ticket to track this:
https://issues.apache.org/jira/browse/APEXMALHAR-2367

-Priyanka

On Tue, Dec 6, 2016 at 12:51 AM, Max Bridgewater <[email protected]>
wrote:

> Thanks, that's exactly what happened. One of my input event had Id as
> null. What happened from there is what confused me. But it now makes
> complete sense. Because this event was not delivered successfully, Apex
> kept retrying. This means that messages that came after the malformed one
> were stuck. I wonder if there is a way to limit the number of retries or if
> this is left to the application layer.
>
> On Mon, Dec 5, 2016 at 11:53 AM, Priyanka Gugale <[email protected]>
> wrote:
>
>> Is it possible that your input has "null" value for id field? id seems to
>> be your primary key, so it cannot accept null values.
>>
>> -Priyanka
>>
>> On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater <
>> [email protected]> wrote:
>>
>>> Folks,
>>>
>>> I am trying to write sample stuff to Cassandra. The operator keeps dying
>>> and being restated. The failure trace is below. This failure happens even
>>> if no data is going through the pipeline.
>>>
>>> Here is how I create the Cassandra operator:
>>>
>>>         List<FieldInfo> fieldInfos = Lists.newArrayList();
>>>         fieldInfos.add(new FieldInfo("id", "id", null));
>>>         fieldInfos.add(new FieldInfo("city", "city", null));
>>>         fieldInfos.add(new FieldInfo("fname", "firstName", null));
>>>         fieldInfos.add(new FieldInfo("lname", "lastName", null));
>>>
>>>         KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn",
>>> new     KafkaSinglePortInputOperator());
>>>         in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset
>>> .EARLIEST.name());
>>>         JsonParser parser = dag.addOperator("jsonParser", new
>>> JsonParser());
>>>         CassandraTransactionalStore transactionalStore = new
>>> CassandraTransactionalStore();
>>>         CassandraPOJOOutputOperator out = new
>>> CassandraPOJOOutputOperator();
>>>         out.setStore(transactionalStore);
>>>         out.setFieldInfos(fieldInfos);
>>>         dag.addOperator("CassandraDataWriter", out);
>>>         dag.addStream("parse", in.outputPort, parser.in);
>>>         dag.addStream("data", parser.out, out.input);
>>>
>>> The json parser seems to work well and deserializes Kafka events into
>>> POJOs that I then want to write to Cassandra.
>>>
>>> The Cassandra schema is as follows:
>>>
>>> CREATE TABLE testapp.testuser (
>>>     id uuid PRIMARY KEY,
>>>     city text,
>>>     fname text,
>>>     lname text
>>> ) WITH bloom_filter_fp_chance = 0.01
>>>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>>>     AND comment = ''
>>>     AND compaction = {'class': 'org.apache.cassandra.db.compa
>>> ction.SizeTieredCompactionStrategy'}
>>>     AND compression = {'sstable_compression': 'org.apache.cassandra.io
>>> .compress.LZ4Compressor'}
>>>     AND dclocal_read_repair_chance = 0.1
>>>     AND default_time_to_live = 0
>>>     AND gc_grace_seconds = 864000
>>>     AND max_index_interval = 2048
>>>     AND memtable_flush_period_in_ms = 0
>>>     AND min_index_interval = 128
>>>     AND read_repair_chance = 0.0
>>>     AND speculative_retry = '99.0PERCENTILE';
>>>
>>> Again, even without sending data, the exception happens. What am I
>>> missing? Any hint would be appreciated.
>>>
>>> 2016-12-05 16:24:31,643 INFO 
>>> com.datatorrent.common.util.AsyncFSStorageAgent:
>>> using /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/appl
>>> ication_1480950234717_0002/container_1480950234717_0002_01_
>>> 000137/tmp/chkp6701939091095420196 as the basepath for checkpointing.
>>> 2016-12-05 16:24:31,704 ERROR 
>>> com.datatorrent.stram.engine.StreamingContainer:
>>> Operator set [OperatorDeployInfo[id=3,name=
>>> CassandraDataWriter,type=GENERIC,checkpoint={58458e1b00000a4f, 0,
>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input
>>> ,streamId=data,sourceNodeId=2,sourcePortName=out,locality=<n
>>> ull>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] stopped
>>> running due to an exception.
>>> com.datastax.driver.core.exceptions.InvalidQueryException: Invalid null
>>> value for partition key part id
>>> at com.datastax.driver.core.exceptions.InvalidQueryException.co
>>> py(InvalidQueryException.java:50)
>>> at com.datastax.driver.core.DriverThrowables.propagateCause(Dri
>>> verThrowables.java:37)
>>> at com.datastax.driver.core.DefaultResultSetFuture.getUninterru
>>> ptibly(DefaultResultSetFuture.java:245)
>>> at com.datastax.driver.core.AbstractSession.execute(AbstractSes
>>> sion.java:64)
>>>
>>>
>>>
>>
>

Reply via email to