I was wondering if there could be different strategies to choose from depending on the scenario. One could be to drop the events in that window. (Trying to avoid the word batch as I understand Apex doesn't do batch ;))
In my case, I would have preferred to update the upstream operator to filter out events with null Ids. Or I would have added a filter operator. Would this have been a viable way out? On Tue, Dec 6, 2016 at 4:35 AM, Ananth G <[email protected]> wrote: > Dropping the entire batch even if one entry is invalid seems to be too > stringent . Thoughts? > > Regards > Ananth > > On 6 Dec. 2016, at 5:08 pm, Priyanka Gugale <[email protected]> wrote: > > Hi Max, > > Right now the operator doesn't provide a way to set retry limit and drop > the statement / batch in case of exceptions. We should add this support to > CassandraTransactionalStore. > Created a Malhar jira ticket to track this: https://issues.apache. > org/jira/browse/APEXMALHAR-2367 > > -Priyanka > > On Tue, Dec 6, 2016 at 12:51 AM, Max Bridgewater < > [email protected]> wrote: > >> Thanks, that's exactly what happened. One of my input event had Id as >> null. What happened from there is what confused me. But it now makes >> complete sense. Because this event was not delivered successfully, Apex >> kept retrying. This means that messages that came after the malformed one >> were stuck. I wonder if there is a way to limit the number of retries or if >> this is left to the application layer. >> >> On Mon, Dec 5, 2016 at 11:53 AM, Priyanka Gugale <[email protected]> >> wrote: >> >>> Is it possible that your input has "null" value for id field? id seems >>> to be your primary key, so it cannot accept null values. >>> >>> -Priyanka >>> >>> On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater < >>> [email protected]> wrote: >>> >>>> Folks, >>>> >>>> I am trying to write sample stuff to Cassandra. The operator keeps >>>> dying and being restated. The failure trace is below. This failure happens >>>> even if no data is going through the pipeline. >>>> >>>> Here is how I create the Cassandra operator: >>>> >>>> List<FieldInfo> fieldInfos = Lists.newArrayList(); >>>> fieldInfos.add(new FieldInfo("id", "id", null)); >>>> fieldInfos.add(new FieldInfo("city", "city", null)); >>>> fieldInfos.add(new FieldInfo("fname", "firstName", null)); >>>> fieldInfos.add(new FieldInfo("lname", "lastName", null)); >>>> >>>> KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", >>>> new KafkaSinglePortInputOperator()); >>>> in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset >>>> .EARLIEST.name()); >>>> JsonParser parser = dag.addOperator("jsonParser", new >>>> JsonParser()); >>>> CassandraTransactionalStore transactionalStore = new >>>> CassandraTransactionalStore(); >>>> CassandraPOJOOutputOperator out = new >>>> CassandraPOJOOutputOperator(); >>>> out.setStore(transactionalStore); >>>> out.setFieldInfos(fieldInfos); >>>> dag.addOperator("CassandraDataWriter", out); >>>> dag.addStream("parse", in.outputPort, parser.in); >>>> dag.addStream("data", parser.out, out.input); >>>> >>>> The json parser seems to work well and deserializes Kafka events into >>>> POJOs that I then want to write to Cassandra. >>>> >>>> The Cassandra schema is as follows: >>>> >>>> CREATE TABLE testapp.testuser ( >>>> id uuid PRIMARY KEY, >>>> city text, >>>> fname text, >>>> lname text >>>> ) WITH bloom_filter_fp_chance = 0.01 >>>> AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' >>>> AND comment = '' >>>> AND compaction = {'class': 'org.apache.cassandra.db.compa >>>> ction.SizeTieredCompactionStrategy'} >>>> AND compression = {'sstable_compression': 'org.apache.cassandra.io >>>> .compress.LZ4Compressor'} >>>> AND dclocal_read_repair_chance = 0.1 >>>> AND default_time_to_live = 0 >>>> AND gc_grace_seconds = 864000 >>>> AND max_index_interval = 2048 >>>> AND memtable_flush_period_in_ms = 0 >>>> AND min_index_interval = 128 >>>> AND read_repair_chance = 0.0 >>>> AND speculative_retry = '99.0PERCENTILE'; >>>> >>>> Again, even without sending data, the exception happens. What am I >>>> missing? Any hint would be appreciated. >>>> >>>> 2016-12-05 16:24:31,643 INFO >>>> com.datatorrent.common.util.AsyncFSStorageAgent: >>>> using /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/appl >>>> ication_1480950234717_0002/container_1480950234717_0002_01_0 >>>> 00137/tmp/chkp6701939091095420196 as the basepath for checkpointing. >>>> 2016-12-05 16:24:31,704 ERROR >>>> com.datatorrent.stram.engine.StreamingContainer: >>>> Operator set [OperatorDeployInfo[id=3,name= >>>> CassandraDataWriter,type=GENERIC,checkpoint={58458e1b00000a4f, 0, >>>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=input >>>> ,streamId=data,sourceNodeId=2,sourcePortName=out,locality=<n >>>> ull>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] stopped >>>> running due to an exception. >>>> com.datastax.driver.core.exceptions.InvalidQueryException: Invalid >>>> null value for partition key part id >>>> at com.datastax.driver.core.exceptions.InvalidQueryException.co >>>> py(InvalidQueryException.java:50) >>>> at com.datastax.driver.core.DriverThrowables.propagateCause(Dri >>>> verThrowables.java:37) >>>> at com.datastax.driver.core.DefaultResultSetFuture.getUninterru >>>> ptibly(DefaultResultSetFuture.java:245) >>>> at com.datastax.driver.core.AbstractSession.execute(AbstractSes >>>> sion.java:64) >>>> >>>> >>>> >>> >> >
