I'm persisting and retrieving a randomly generated array of bytes
between 1K and 8K in size on one node (no cluster - although I ran a
cluster and the behavior did not change).  About 50% of the time
everything works fine (both persisting and retrieving) but the other
50% I get the following error only on retrieving the object
(specifically when executing the following line in the retrieve method
below -  RpbGetResp getResp = RpbGetResp.parseFrom(getRespBytes)):

com.google.protobuf.InvalidProtocolBufferException: While parsing a
protocol message, the input ended unexpectedly in the middle of a
field.  This could mean either than the input has been truncated or
that an embedded message misreported its own length.

I'm pretty sure I'm doing something stupid but for the life of me I
can't figure it out.  Below is my code (rough but works):


    public byte[] retrieve(String uuid) throws IOException {
        RpbGetReq.Builder getReqBuilder = RpbGetReq.newBuilder();
        getReqBuilder.setBucket(ByteString.copyFromUtf8("profiles"));
        getReqBuilder.setKey(ByteString.copyFromUtf8(uuid));
        assert getReqBuilder.isInitialized();
        RpbGetReq getReq = getReqBuilder.build();
        out.writeRawBytes(parseFromInt32(getReq.getSerializedSize()+1));
        out.writeRawByte(9);
        getReq.writeTo(out);
        out.flush();

        int getRespSize = parseToInt32(in.readRawBytes(4))-1;
        byte getRespMsgCode = in.readRawByte();
        if (getRespMsgCode != 10) {
            throw new IllegalStateException("Error retrieving profile");
        }
        byte[] getRespBytes = in.readRawBytes(getRespSize);
        assert getRespBytes.length == getRespSize : "Invalid number of
bytes read!";
        RpbGetResp getResp = RpbGetResp.parseFrom(getRespBytes);
        if (getResp.getContentCount() == 0) {
            return null;
        }

        return getResp.getContent(0).getValue().toByteArray();
    }

    public void persist(String uuid, byte[] payload) throws IOException {
        RpbPutReq.Builder putReqBuilder = RpbPutReq.newBuilder();
        putReqBuilder.setBucket(ByteString.copyFromUtf8("profiles"));
        putReqBuilder.setKey(ByteString.copyFromUtf8(uuid));
        RpbContent.Builder contentBuilder = RpbContent.newBuilder();
        
contentBuilder.setContentType(ByteString.copyFromUtf8(Constants.CTYPE_OCTET_STREAM));
        contentBuilder.setValue(ByteString.copyFrom(payload));
        putReqBuilder.setContent(contentBuilder);
        assert putReqBuilder.isInitialized();
        RpbPutReq putReq = putReqBuilder.build();
        int size = putReq.getSerializedSize()+1;
        out.writeRawBytes(parseFromInt32(size));
        out.writeRawByte(11);
        putReq.writeTo(out);
        out.flush();

        int putRespSize = parseToInt32(in.readRawBytes(4))-1;
        byte putRespMsgCode = in.readRawByte();
        if (putRespMsgCode != 12) {
            throw new IllegalStateException("Error storing profile");
        }
    }

The in and out streams were initialized like the following (in case
you were wondering):

        InetSocketAddress address = new InetSocketAddress(host, port);

        socket = new Socket();
        socket.setSoTimeout(1000);
        socket.connect(address, 2000);

        out = CodedOutputStream.newInstance(socket.getOutputStream());
        in = CodedInputStream.newInstance(socket.getInputStream());

I tried using different encapsulations of the stream (buffered,
straight, etc) nothing changed the behavior...

--Matthew

_______________________________________________
riak-users mailing list
[email protected]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to