Is it possible that your input has "null" value for id field? id seems to
be your primary key, so it cannot accept null values.

-Priyanka

On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater <[email protected]>
wrote:

> Folks,
>
> I am trying to write sample stuff to Cassandra. The operator keeps dying
> and being restated. The failure trace is below. This failure happens even
> if no data is going through the pipeline.
>
> Here is how I create the Cassandra operator:
>
>         List<FieldInfo> fieldInfos = Lists.newArrayList();
>         fieldInfos.add(new FieldInfo("id", "id", null));
>         fieldInfos.add(new FieldInfo("city", "city", null));
>         fieldInfos.add(new FieldInfo("fname", "firstName", null));
>         fieldInfos.add(new FieldInfo("lname", "lastName", null));
>
>         KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn",
> new     KafkaSinglePortInputOperator());
>         in.setInitialOffset(AbstractKafkaInputOperator.
> InitialOffset.EARLIEST.name());
>         JsonParser parser = dag.addOperator("jsonParser", new
> JsonParser());
>         CassandraTransactionalStore transactionalStore = new
> CassandraTransactionalStore();
>         CassandraPOJOOutputOperator out = new
> CassandraPOJOOutputOperator();
>         out.setStore(transactionalStore);
>         out.setFieldInfos(fieldInfos);
>         dag.addOperator("CassandraDataWriter", out);
>         dag.addStream("parse", in.outputPort, parser.in);
>         dag.addStream("data", parser.out, out.input);
>
> The json parser seems to work well and deserializes Kafka events into
> POJOs that I then want to write to Cassandra.
>
> The Cassandra schema is as follows:
>
> CREATE TABLE testapp.testuser (
>     id uuid PRIMARY KEY,
>     city text,
>     fname text,
>     lname text
> ) WITH bloom_filter_fp_chance = 0.01
>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>     AND comment = ''
>     AND compaction = {'class': 'org.apache.cassandra.db.compaction.
> SizeTieredCompactionStrategy'}
>     AND compression = {'sstable_compression': 'org.apache.cassandra.io.
> compress.LZ4Compressor'}
>     AND dclocal_read_repair_chance = 0.1
>     AND default_time_to_live = 0
>     AND gc_grace_seconds = 864000
>     AND max_index_interval = 2048
>     AND memtable_flush_period_in_ms = 0
>     AND min_index_interval = 128
>     AND read_repair_chance = 0.0
>     AND speculative_retry = '99.0PERCENTILE';
>
> Again, even without sending data, the exception happens. What am I
> missing? Any hint would be appreciated.
>
> 2016-12-05 16:24:31,643 INFO com.datatorrent.common.util.AsyncFSStorageAgent:
> using /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/
> application_1480950234717_0002/container_1480950234717_0002_01_000137/tmp/chkp6701939091095420196
> as the basepath for checkpointing.
> 2016-12-05 16:24:31,704 ERROR com.datatorrent.stram.engine.StreamingContainer:
> Operator set [OperatorDeployInfo[id=3,name=CassandraDataWriter,type=
> GENERIC,checkpoint={58458e1b00000a4f, 0, 0},inputs=[OperatorDeployInfo.
> InputDeployInfo[portName=input,streamId=data,
> sourceNodeId=2,sourcePortName=out,locality=<null>,
> partitionMask=0,partitionKeys=<null>]],outputs=[]]] stopped running due
> to an exception.
> com.datastax.driver.core.exceptions.InvalidQueryException: Invalid null
> value for partition key part id
> at com.datastax.driver.core.exceptions.InvalidQueryException.copy(
> InvalidQueryException.java:50)
> at com.datastax.driver.core.DriverThrowables.propagateCause(
> DriverThrowables.java:37)
> at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(
> DefaultResultSetFuture.java:245)
> at com.datastax.driver.core.AbstractSession.execute(
> AbstractSession.java:64)
>
>
>

Reply via email to