[ https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888597#comment-15888597 ]
bright chen commented on APEXCORE-635: -------------------------------------- Yes, I agree it should be revisited after migrated to the netty. Otherwise a lot of work just wasted. What about I prototype after migrated to the netty. > Proposal: Manage memory to avoid memory copy and garbage collection > ------------------------------------------------------------------- > > Key: APEXCORE-635 > URL: https://issues.apache.org/jira/browse/APEXCORE-635 > Project: Apache Apex Core > Issue Type: Wish > Reporter: bright chen > Assignee: bright chen > > Manage memory to avoid memory copy and garbage collection > The aim of this proposal is to reuse the memory to avoid the garbage > collection and avoid unnecessary memory copy to increase the performance. In > this proposal the term serde means serialization and deserialization. It’s > same as codec. > Currently, apex by default use DefaultStatefulStreamCodec for serde, which > extends Kryo and optimize it by replace class by class id. And application > developer can optimize serializer by implement interface StreamCodec. > First, let’s look into the default codec DefaultStatefulStreamCodec. It > basically optimize serde by replace class name by class id as my > understanding. And the state information only send before sending first > tuple, it’s kind like configuration for serde. So I suggest to separate this > feature from serde. The benefit is the customized serde can still use this > feature. And the kryo have some limitation which I’ll state later. > Second, Let’s look at the customized serde. Let’s stand from application > developer point of view and look at how to implement StreamCodec. I take a > simple tuple List<String> as example. > The first solution is use kryo. This is basically same as apex default codec. > The second solution is implement StreamCodec for String and List, and > ListSerde delegate String to StringSerde. The benefit of this solution is the > StringSerde ListSerde can be reused. The problem is there need a lot of > temporary memory and memory copy. Following is the sample implement. > Class StringSerde { > Slice toByteArray(String o) { > byte[] b = o.getBytes(“UTF8”); // new bytes > byte[] b1 = new byte[b1.length + 4]; // new bytes > set the length of the string at the first 4 bytes > System.arrayCopy(b, 0, b1, 4, b.length); //copy bytes > return new Slice(b1); > } > class ListSerde<T> { > StreamCodec itemSerde; //the serde for serialize/deserialize item > Slice toByteArray(List<T> list) { > Slice[] itemSlices = new Slice[list.size()]; > int size = 0; > int index = 0; > for(T item : list) { > Slice slice = itemSerde.toByteArray(item); > size += slice.length; > itemSlices[index++] = slice; > } > byte[] b = new byte[size+4]; //allocated the memory > set the length of the list at the first 4 bytes > copy the data from itemSlices > return new Slice(b); > } > } > > from above code, we can see that around 2 times of required memory were > allocated and data copied twice( one copy maybe special to string, but > another copy is mandatory). And when bytes written to the socket, all > allocated memory can’t be reused but need to be garbage collected. > The above tuple only have two levels, if the tuple have n level, n times of > required memory would be allocated and n-1 time of data copy is required. > The third solution could be allocate memory and then pass the memory and > offset to item serde. There are some problems for this solution: > How to pass the memory from caller? As our previous interface only pass the > object but no way to pass memory. So the pass of memory will depends on > implementation. > Another big problem of this solution is it hard to reallocate proper > memory(For this special case, it probably can allocate 2 times of all string > length. ). And the memory allocated more than required would be wasted until > data send to the socket(or allocate exact memory and copy the data to avoid > waste memory). And the code also need to handle the case if memory is not > enough. > The fourth solution could be treat whole object as flat, allocate memory and > handle it. For example as following. This solution solve the problem of pass > memory. But it has other problems of third solution and introduced some other > problems: > Can’t reuse the code: we already have the StringSerde, but ListSerde<String> > have to implement almost same logic again. > The serializeItemToMemory() method should be implemented depend on different > item type. > class ListSerde<T> { > Slice toByteArray(List<T> list) { > byte[] b = new byte[…]; //hard estimate proper size. > int size = 0; > for(T item : list) { > int length = serializeItemToMemory(item, b, size); > size += length; > } > Allocate new memory to copy data if don’t want waste memory > } > } > So, from the analysis of these solutions. It’s not easy to implement good and > reusable customize serde. > Third, let’s look at the Kryo serde. Kryo provides Output, so each field > serde write to the same Output. This approach solve the memory problem. But > the Output has some problem too. > The Output, as a stream, can only write continuously. But it would be > problem. For example, when Serialize String to LV format. We don’t know what > the length could be before serialization. > The Output don’t have cache, which means the serialized data must copy to the > outside and manage them. > The allocated memory can’t be reused without extra management. > Another copy is required when add partition information. > The memory allocated for different object are not continuous. Which mean need > another copy when merge multiple serialized tuple into one block to send to > socket. > My suggest solution is: > Add SerializationBuffer which extends from kryo Output and write data to > BlockStream. > BlockStream manages a list of block; BlockStream can reserve space and fill > value to reserved space; BlockStream can reset the memory when data not used > any more. We probably can use unsafe mode to increase the performance for > this part in the future. > Add MemReuseCodec interface which extends StreamCodec, Deprecated Slice > toByteArray(T o) and add method void toByteArray(T o, SerializationBuffer > output); Here, toByteArray will not return slice, as the codec could be the > top level codec or a codec of a field. Call SerializationBuffer.toSlice() to > get the slice of serialized data. > In Publisher, keep two lists/arrays of slices, one list/array for serialize > the tuples, another list/array for sending to the socket. When wake up for > writing, switch the lists/arrays. Then merge the slice to large slice and > call socket write. Reset the stream after data written. > So the previous ListSerde can be implemented as following: > class ListSerde<T> { > MemReuseCodec itemSerde; //the serde for serialize/deserialize item > void toByteArray(List<T> list, SerializationBuffer buffer) { > buffer.reserveForLength(); > for(T item : list) { > itemSerde.toByteArray(item, buffer); > } > buffer.fillLength(); > } > } > The benefit of this mechanism > the memory can be reused instead of garbage collected after data send to > socket > avoid unnecessary memory copy. Basically can avoid all extra copy required by > kryo. > the data which send to socket can be easily merged in a block without extra > memory copy. > can easily integrate with Kryo serde due to SerializationBuffer extends from > Output. > The work need to do to integrate this mechanism to Apex without modifying > netlet > Add MemReuseCodec field in BufferServerPublisher, which initialize in > setup() if the codec implements MemReuseCodec > Change the DefaultStatefulStreamCodec to implement by using > SerializationBuffer > For integrate with socket, basically it only need to override write(byte[] > message, int offset, int size) and write(). But unfortunately, write() is > final. So need following walk around. Add interface ListenerExt which only > have one method writeExt(); Change BufferServerPublisher implements > ListenerExt. Add DefaultEventLoopExt which extends DefaultEventLoop and > override handleSelectedKey, for selection key OP_WRITE, if it’s attachment > implements ListenerExt, call ListenerExt.writeExt(); else call write(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)