I'm dealing with a large stream of messages.  Each message can be of type A, 
B, or C.  Each message is a few hundred bytes, and the stream may contain 
millions of messages.

I know that the traditional advice for reading a bunch of concatenated 
messages 
<https://developers.google.com/protocol-buffers/docs/techniques#streaming> is 
to have a size followed by a message, I'd need both a type (to tell if I 
have A, B, or C) and a size.  It seemed reasonable to encode the type and 
size as varints, making them the same wire format as if I'd defined a 
container message.  That can be a message with a single oneof.  I've 
defined a container for these that looks like this:

message Container {
  oneof contents {
    A a = 1;
    B b = 2;
    C c = 3;
  }
}

Since all of the Parse methods will read an unbounded amount of data, my 
main loop has to parse the tag and size, put a bound on the stream, and 
then dispatch to the inner message's parser.  Here's a rough outline of 
what I do:

extern void process_item(const Container &container);

bool process_stream(google::protobuf::io::ZeroCopyInputStream *input) {
  Container container;
  while (true) {
    google::protobuf::io::CodedInputStream decoder(input);
    auto tagpair = decoder.ReadTagWithCutoffNoLastTag(127);
    if (!tagpair.second)
      return false;
    uint32_t tag = tagpair.first;
    if (tag == 0)
      return true;
    int size;
    if (!decoder.ReadVarintSizeAsInt(&size))
      return false;

    decoder.PushLimit(size);
    switch (tag) {
      case (Container::kAFieldNumber << 3 | 2):
        if (!container.mutable_a()->ParseFromCodedStream(&decoder))
          return false;
        break;
      case (Container::kBFieldNumber << 3 | 2):
        if (!container.mutable_b()->ParseFromCodedStream(&decoder))
          return false;
        break;
      case (Container::kCFieldNumber << 3 | 2):
        if (!container.mutable_c()->ParseFromCodedStream(&decoder))
          return false;
        break;
      default:
        return false;
    }
    if (!decoder.ConsumedEntireMessage() || decoder.BytesUntilLimit() != 0) 
{
      return false;
    }
    
    process_item(container);
  }
}


This is essentially a stripped-down version of the loop in 
Container::MergePartialFromCodedStream, without support for unknown fields 
and other exceptional situations.

It seems like there would be an easier way to do this.  I haven't come up 
with anything I'm fond of, though.  Some ideas I have require copying the 
input stream around (undesirable; zero-copy is useful for performance 
here), or fail if a size's varint crosses the boundary from 
ZeroCopyInputStream::Next, or have similar problems.

Anybody have suggestions for improvements?

(I'm also simplifying the above code snippet: the stream is actually 
self-describing, prefixed by a FileDescriptorSet, for legacy reasons.  That 
means that all this is going through the reflection interface.  Also, I 
have a visitor that gets dispatched with the inner object based on 
OneofContentsCase, rather than a single object that takes a Container.  
Finally, we're currently using Protobuf 2.5.0, so some features - like 
those provided by oneof - are implemented by my code.  That's also why you 
see me using one Container through the whole loop: since I don't have 
arenas in 2.5.0, it's more efficient for me to reuse one object and let it 
manage the cached members.  Indeed, doing this with oneof but not using 
arenas in 3.5.1 would cause a lot of unnecessary memory churn, since oneof 
doesn't reuse objects like optional and repeated, contrary to the note in 
the C++ docs 
<https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.message_lite#MessageLite.Clear.details>
.)

Cheerio,
Piquan

-- 
You received this message because you are subscribed to the Google Groups 
"Protocol Buffers" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to protobuf+unsubscr...@googlegroups.com.
To post to this group, send email to protobuf@googlegroups.com.
Visit this group at https://groups.google.com/group/protobuf.
For more options, visit https://groups.google.com/d/optout.

Reply via email to