Thanks, that's exactly what happened. One of my input event had Id as null. What happened from there is what confused me. But it now makes complete sense. Because this event was not delivered successfully, Apex kept retrying. This means that messages that came after the malformed one were stuck. I wonder if there is a way to limit the number of retries or if this is left to the application layer.
On Mon, Dec 5, 2016 at 11:53 AM, Priyanka Gugale <[email protected]> wrote: > Is it possible that your input has "null" value for id field? id seems to > be your primary key, so it cannot accept null values. > > -Priyanka > > On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater < > [email protected]> wrote: > >> Folks, >> >> I am trying to write sample stuff to Cassandra. The operator keeps dying >> and being restated. The failure trace is below. This failure happens even >> if no data is going through the pipeline. >> >> Here is how I create the Cassandra operator: >> >> List<FieldInfo> fieldInfos = Lists.newArrayList(); >> fieldInfos.add(new FieldInfo("id", "id", null)); >> fieldInfos.add(new FieldInfo("city", "city", null)); >> fieldInfos.add(new FieldInfo("fname", "firstName", null)); >> fieldInfos.add(new FieldInfo("lname", "lastName", null)); >> >> KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", >> new KafkaSinglePortInputOperator()); >> in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset >> .EARLIEST.name()); >> JsonParser parser = dag.addOperator("jsonParser", new >> JsonParser()); >> CassandraTransactionalStore transactionalStore = new >> CassandraTransactionalStore(); >> CassandraPOJOOutputOperator out = new >> CassandraPOJOOutputOperator(); >> out.setStore(transactionalStore); >> out.setFieldInfos(fieldInfos); >> dag.addOperator("CassandraDataWriter", out); >> dag.addStream("parse", in.outputPort, parser.in); >> dag.addStream("data", parser.out, out.input); >> >> The json parser seems to work well and deserializes Kafka events into >> POJOs that I then want to write to Cassandra. >> >> The Cassandra schema is as follows: >> >> CREATE TABLE testapp.testuser ( >> id uuid PRIMARY KEY, >> city text, >> fname text, >> lname text >> ) WITH bloom_filter_fp_chance = 0.01 >> AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' >> AND comment = '' >> AND compaction = {'class': 'org.apache.cassandra.db.compa >> ction.SizeTieredCompactionStrategy'} >> AND compression = {'sstable_compression': 'org.apache.cassandra.io >> .compress.LZ4Compressor'} >> AND dclocal_read_repair_chance = 0.1 >> AND default_time_to_live = 0 >> AND gc_grace_seconds = 864000 >> AND max_index_interval = 2048 >> AND memtable_flush_period_in_ms = 0 >> AND min_index_interval = 128 >> AND read_repair_chance = 0.0 >> AND speculative_retry = '99.0PERCENTILE'; >> >> Again, even without sending data, the exception happens. What am I >> missing? Any hint would be appreciated. >> >> 2016-12-05 16:24:31,643 INFO com.datatorrent.common.util.AsyncFSStorageAgent: >> using /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/appl >> ication_1480950234717_0002/container_1480950234717_0002_ >> 01_000137/tmp/chkp6701939091095420196 as the basepath for checkpointing. >> 2016-12-05 16:24:31,704 ERROR >> com.datatorrent.stram.engine.StreamingContainer: >> Operator set [OperatorDeployInfo[id=3,name=CassandraDataWriter,type=GENER >> IC,checkpoint={58458e1b00000a4f, 0, 0},inputs=[OperatorDeployInfo. >> InputDeployInfo[portName=input,streamId=data,sourceNodeId=2, >> sourcePortName=out,locality=<null>,partitionMask=0, >> partitionKeys=<null>]],outputs=[]]] stopped running due to an exception. >> com.datastax.driver.core.exceptions.InvalidQueryException: Invalid null >> value for partition key part id >> at com.datastax.driver.core.exceptions.InvalidQueryException. >> copy(InvalidQueryException.java:50) >> at com.datastax.driver.core.DriverThrowables.propagateCause(Dri >> verThrowables.java:37) >> at com.datastax.driver.core.DefaultResultSetFuture.getUninterru >> ptibly(DefaultResultSetFuture.java:245) >> at com.datastax.driver.core.AbstractSession.execute(AbstractSes >> sion.java:64) >> >> >> >
