[
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15861984#comment-15861984
]
Sandesh commented on APEXCORE-635:
----------------------------------
[~brightchen]
What is the overhead of garbage collection? You can take the benchmark
application in the Malhar for measurements.
> Proposal: Manage memory to avoid memory copy and garbage collection
> -------------------------------------------------------------------
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
> Issue Type: Wish
> Reporter: bright chen
> Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage
> collection and avoid unnecessary memory copy to increase the performance. In
> this proposal the term serde means serialization and deserialization. It’s
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which
> extends Kryo and optimize it by replace class by class id. And application
> developer can optimize serializer by implement interface StreamCodec.
> First, let’s look into the default codec DefaultStatefulStreamCodec. It
> basically optimize serde by replace class name by class id as my
> understanding. And the state information only send before sending first
> tuple, it’s kind like configuration for serde. So I suggest to separate this
> feature from serde. The benefit is the customized serde can still use this
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application
> developer point of view and look at how to implement StreamCodec. I take a
> simple tuple List<String> as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and
> ListSerde delegate String to StringSerde. The benefit of this solution is the
> StringSerde ListSerde can be reused. The problem is there need a lot of
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
> Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”); // new bytes
> byte[] b1 = new byte[b1.length + 4]; // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length); //copy bytes
> return new Slice(b1);
> }
> class ListSerde<T> {
> StreamCodec itemSerde; //the serde for serialize/deserialize item
> Slice toByteArray(List<T> list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
> Slice slice = itemSerde.toByteArray(item);
> size += slice.length;
> itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4]; //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
> }
> }
>
> from above code, we can see that around 2 times of required memory were
> allocated and data copied twice( one copy maybe special to string, but
> another copy is mandatory). And when bytes written to the socket, all
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the
> object but no way to pass memory. So the pass of memory will depends on
> implementation.
> Another big problem of this solution is it hard to reallocate proper
> memory(For this special case, it probably can allocate 2 times of all string
> length. ). And the memory allocated more than required would be wasted until
> data send to the socket(or allocate exact memory and copy the data to avoid
> waste memory). And the code also need to handle the case if memory is not
> enough.
> The fourth solution could be treat whole object as flat, allocate memory and
> handle it. For example as following. This solution solve the problem of pass
> memory. But it has other problems of third solution and introduced some other
> problems:
> Can’t reuse the code: we already have the StringSerde, but ListSerde<String>
> have to implement almost same logic again.
> The serializeItemToMemory() method should be implemented depend on different
> item type.
> class ListSerde<T> {
> Slice toByteArray(List<T> list) {
> byte[] b = new byte[…]; //hard estimate proper size.
> int size = 0;
> for(T item : list) {
> int length = serializeItemToMemory(item, b, size);
> size += length;
> }
> Allocate new memory to copy data if don’t want waste memory
> }
> }
> So, from the analysis of these solutions. It’s not easy to implement good and
> reusable customize serde.
> Third, let’s look at the Kryo serde. Kryo provides Output, so each field
> serde write to the same Output. This approach solve the memory problem. But
> the Output has some problem too.
> The Output, as a stream, can only write continuously. But it would be
> problem. For example, when Serialize String to LV format. We don’t know what
> the length could be before serialization.
> The Output don’t have cache, which means the serialized data must copy to the
> outside and manage them.
> The allocated memory can’t be reused without extra management.
> Another copy is required when add partition information.
> The memory allocated for different object are not continuous. Which mean need
> another copy when merge multiple serialized tuple into one block to send to
> socket.
> My suggest solution is:
> Add SerializationBuffer which extends from kryo Output and write data to
> BlockStream.
> BlockStream manages a list of block; BlockStream can reserve space and fill
> value to reserved space; BlockStream can reset the memory when data not used
> any more. We probably can use unsafe mode to increase the performance for
> this part in the future.
> Add MemReuseCodec interface which extends StreamCodec, Deprecated Slice
> toByteArray(T o) and add method void toByteArray(T o, SerializationBuffer
> output); Here, toByteArray will not return slice, as the codec could be the
> top level codec or a codec of a field. Call SerializationBuffer.toSlice() to
> get the slice of serialized data.
> In Publisher, keep two lists/arrays of slices, one list/array for serialize
> the tuples, another list/array for sending to the socket. When wake up for
> writing, switch the lists/arrays. Then merge the slice to large slice and
> call socket write. Reset the stream after data written.
> So the previous ListSerde can be implemented as following:
> class ListSerde<T> {
> MemReuseCodec itemSerde; //the serde for serialize/deserialize item
> void toByteArray(List<T> list, SerializationBuffer buffer) {
> buffer.reserveForLength();
> for(T item : list) {
> itemSerde.toByteArray(item, buffer);
> }
> buffer.fillLength();
> }
> }
> The benefit of this mechanism
> the memory can be reused instead of garbage collected after data send to
> socket
> avoid unnecessary memory copy. Basically can avoid all extra copy required by
> kryo.
> the data which send to socket can be easily merged in a block without extra
> memory copy.
> can easily integrate with Kryo serde due to SerializationBuffer extends from
> Output.
> The work need to do to integrate this mechanism to Apex without modifying
> netlet
> Add MemReuseCodec field in BufferServerPublisher, which initialize in
> setup() if the codec implements MemReuseCodec
> Change the DefaultStatefulStreamCodec to implement by using
> SerializationBuffer
> For integrate with socket, basically it only need to override write(byte[]
> message, int offset, int size) and write(). But unfortunately, write() is
> final. So need following walk around. Add interface ListenerExt which only
> have one method writeExt(); Change BufferServerPublisher implements
> ListenerExt. Add DefaultEventLoopExt which extends DefaultEventLoop and
> override handleSelectedKey, for selection key OP_WRITE, if it’s attachment
> implements ListenerExt, call ListenerExt.writeExt(); else call write().
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)