Is it possible that your input has "null" value for id field? id seems to be your primary key, so it cannot accept null values.
-Priyanka On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater <[email protected]> wrote: > Folks, > > I am trying to write sample stuff to Cassandra. The operator keeps dying > and being restated. The failure trace is below. This failure happens even > if no data is going through the pipeline. > > Here is how I create the Cassandra operator: > > List<FieldInfo> fieldInfos = Lists.newArrayList(); > fieldInfos.add(new FieldInfo("id", "id", null)); > fieldInfos.add(new FieldInfo("city", "city", null)); > fieldInfos.add(new FieldInfo("fname", "firstName", null)); > fieldInfos.add(new FieldInfo("lname", "lastName", null)); > > KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn", > new KafkaSinglePortInputOperator()); > in.setInitialOffset(AbstractKafkaInputOperator. > InitialOffset.EARLIEST.name()); > JsonParser parser = dag.addOperator("jsonParser", new > JsonParser()); > CassandraTransactionalStore transactionalStore = new > CassandraTransactionalStore(); > CassandraPOJOOutputOperator out = new > CassandraPOJOOutputOperator(); > out.setStore(transactionalStore); > out.setFieldInfos(fieldInfos); > dag.addOperator("CassandraDataWriter", out); > dag.addStream("parse", in.outputPort, parser.in); > dag.addStream("data", parser.out, out.input); > > The json parser seems to work well and deserializes Kafka events into > POJOs that I then want to write to Cassandra. > > The Cassandra schema is as follows: > > CREATE TABLE testapp.testuser ( > id uuid PRIMARY KEY, > city text, > fname text, > lname text > ) WITH bloom_filter_fp_chance = 0.01 > AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' > AND comment = '' > AND compaction = {'class': 'org.apache.cassandra.db.compaction. > SizeTieredCompactionStrategy'} > AND compression = {'sstable_compression': 'org.apache.cassandra.io. > compress.LZ4Compressor'} > AND dclocal_read_repair_chance = 0.1 > AND default_time_to_live = 0 > AND gc_grace_seconds = 864000 > AND max_index_interval = 2048 > AND memtable_flush_period_in_ms = 0 > AND min_index_interval = 128 > AND read_repair_chance = 0.0 > AND speculative_retry = '99.0PERCENTILE'; > > Again, even without sending data, the exception happens. What am I > missing? Any hint would be appreciated. > > 2016-12-05 16:24:31,643 INFO com.datatorrent.common.util.AsyncFSStorageAgent: > using /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/ > application_1480950234717_0002/container_1480950234717_0002_01_000137/tmp/chkp6701939091095420196 > as the basepath for checkpointing. > 2016-12-05 16:24:31,704 ERROR com.datatorrent.stram.engine.StreamingContainer: > Operator set [OperatorDeployInfo[id=3,name=CassandraDataWriter,type= > GENERIC,checkpoint={58458e1b00000a4f, 0, 0},inputs=[OperatorDeployInfo. > InputDeployInfo[portName=input,streamId=data, > sourceNodeId=2,sourcePortName=out,locality=<null>, > partitionMask=0,partitionKeys=<null>]],outputs=[]]] stopped running due > to an exception. > com.datastax.driver.core.exceptions.InvalidQueryException: Invalid null > value for partition key part id > at com.datastax.driver.core.exceptions.InvalidQueryException.copy( > InvalidQueryException.java:50) > at com.datastax.driver.core.DriverThrowables.propagateCause( > DriverThrowables.java:37) > at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly( > DefaultResultSetFuture.java:245) > at com.datastax.driver.core.AbstractSession.execute( > AbstractSession.java:64) > > >
