Okay, I am an idiot.  The problem was in my converting the int size
back and forth from a byte array to an integer (it calc'd wrong with
large numbers).  I hadn't done this kind of bit manipulation since
college - that's my excuse and I'm sticking to it.  For reference:

public static int parseToInt32(byte[] bytes) {
        return (bytes[0] << 24)
                + ((bytes[1] & 0xFF) << 16)
                + ((bytes[2] & 0xFF) << 8)
                + (bytes[3] & 0xFF);
    }

    public static byte[] parseFromInt32(int value) {
        byte[] bytes = new byte[4];
        bytes[3] = (byte)value;
        bytes[2] = (byte)(value >>> 8);
        bytes[1] = (byte)(value >>> 16);
        bytes[0] = (byte)(value >>> 24);
        return bytes;
    }


--Matthew



On Thu, Apr 22, 2010 at 16:19, Matthew Pflueger
<[email protected]> wrote:
> I'm persisting and retrieving a randomly generated array of bytes
> between 1K and 8K in size on one node (no cluster - although I ran a
> cluster and the behavior did not change).  About 50% of the time
> everything works fine (both persisting and retrieving) but the other
> 50% I get the following error only on retrieving the object
> (specifically when executing the following line in the retrieve method
> below -  RpbGetResp getResp = RpbGetResp.parseFrom(getRespBytes)):
>
> com.google.protobuf.InvalidProtocolBufferException: While parsing a
> protocol message, the input ended unexpectedly in the middle of a
> field.  This could mean either than the input has been truncated or
> that an embedded message misreported its own length.
>
> I'm pretty sure I'm doing something stupid but for the life of me I
> can't figure it out.  Below is my code (rough but works):
>
>
>    public byte[] retrieve(String uuid) throws IOException {
>        RpbGetReq.Builder getReqBuilder = RpbGetReq.newBuilder();
>        getReqBuilder.setBucket(ByteString.copyFromUtf8("profiles"));
>        getReqBuilder.setKey(ByteString.copyFromUtf8(uuid));
>        assert getReqBuilder.isInitialized();
>        RpbGetReq getReq = getReqBuilder.build();
>        out.writeRawBytes(parseFromInt32(getReq.getSerializedSize()+1));
>        out.writeRawByte(9);
>        getReq.writeTo(out);
>        out.flush();
>
>        int getRespSize = parseToInt32(in.readRawBytes(4))-1;
>        byte getRespMsgCode = in.readRawByte();
>        if (getRespMsgCode != 10) {
>            throw new IllegalStateException("Error retrieving profile");
>        }
>        byte[] getRespBytes = in.readRawBytes(getRespSize);
>        assert getRespBytes.length == getRespSize : "Invalid number of
> bytes read!";
>        RpbGetResp getResp = RpbGetResp.parseFrom(getRespBytes);
>        if (getResp.getContentCount() == 0) {
>            return null;
>        }
>
>        return getResp.getContent(0).getValue().toByteArray();
>    }
>
>    public void persist(String uuid, byte[] payload) throws IOException {
>        RpbPutReq.Builder putReqBuilder = RpbPutReq.newBuilder();
>        putReqBuilder.setBucket(ByteString.copyFromUtf8("profiles"));
>        putReqBuilder.setKey(ByteString.copyFromUtf8(uuid));
>        RpbContent.Builder contentBuilder = RpbContent.newBuilder();
>        
> contentBuilder.setContentType(ByteString.copyFromUtf8(Constants.CTYPE_OCTET_STREAM));
>        contentBuilder.setValue(ByteString.copyFrom(payload));
>        putReqBuilder.setContent(contentBuilder);
>        assert putReqBuilder.isInitialized();
>        RpbPutReq putReq = putReqBuilder.build();
>        int size = putReq.getSerializedSize()+1;
>        out.writeRawBytes(parseFromInt32(size));
>        out.writeRawByte(11);
>        putReq.writeTo(out);
>        out.flush();
>
>        int putRespSize = parseToInt32(in.readRawBytes(4))-1;
>        byte putRespMsgCode = in.readRawByte();
>        if (putRespMsgCode != 12) {
>            throw new IllegalStateException("Error storing profile");
>        }
>    }
>
> The in and out streams were initialized like the following (in case
> you were wondering):
>
>        InetSocketAddress address = new InetSocketAddress(host, port);
>
>        socket = new Socket();
>        socket.setSoTimeout(1000);
>        socket.connect(address, 2000);
>
>        out = CodedOutputStream.newInstance(socket.getOutputStream());
>        in = CodedInputStream.newInstance(socket.getInputStream());
>
> I tried using different encapsulations of the stream (buffered,
> straight, etc) nothing changed the behavior...
>
> --Matthew
>

_______________________________________________
riak-users mailing list
[email protected]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to