[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-28 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888597#comment-15888597
 ] 

bright chen commented on APEXCORE-635:
--

Yes, I agree it should be revisited after migrated to the netty. Otherwise a 
lot of work just wasted.
What about I prototype after migrated to the netty.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if memory is not 
> enough. 
> The fourth solution could be treat whole object as flat, allocate memory and 
> handle it. For example as following. This solution solve the problem of pass 
> memory. But it has other problems of third solution and introduced some other 
> problems:
> Can’t reuse the code: we already have the StringSerde, but ListSerde 
> have to implement almost same logic again. 
> The serializeItemToMemory() method should be implemented depend on different 
> item type.
> class ListSerde {
>   Slice toByteArray(List list) {
> byte[] b = new byte[…];  //hard estimate proper size.
> int 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-28 Thread Vlad Rozov (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15888527#comment-15888527
 ] 

Vlad Rozov commented on APEXCORE-635:
-

bq. Yes, Agree, One array of byte only treat as one object. But as the memory 
is relative big, it probably will not allocated in TLA( Thread Local Area ), 
which means it need to get lock for each allocation, so the allocation is not a 
very light operation.
Is your concern GC or allocating new byte array? From what I understand, it was 
GC.
bq. Output.getBuffer() probably not work in our case. The getBuffer() returns 
the reference of Output's internal buffer, which was used for putting the 
serialized bytes. As we need to share one instance of Output for multiple 
tuples, the buffer was reused when serialize next tuple. Which cause problem no 
matter we set the output's position to zero or not. If set position to zero, 
which means clean the previous serialized data. If not changed position, then 
the serialized data of next tuple concat to the previous one, but we don't know 
the boundary of each tuple, and the buffer keeping on growing which means 
output have to keep on allocate new big memory and copy the old data(if the 
initial buffer size is not very big).
Output.getBuffer() works. Tuples are serialized one at a time and after each 
serialization, bytes from the shared Output buffer are copied to the 
SerializedTuple that allocates a new buffer each time. It avoids extra copy 
that you refer to.
bq. Yes, the original prototype reset the buffer when it should not. But as I 
pointed in this proposal, the buffer will be reset after the data sent to 
socket. That why need to override the write() method.
The original prototype has a serious bug, so the results of benchmarking are 
not reliable. Prior to implementing the proposed approach, I suggest to 
implement a prototype in stages and have a prove that the approach will provide 
a significant performance improvement. Without such prove, the complexity of 
the proposal is not justifiable.
bq. Yes, agree. But I think it was worthy to change StreamCode, as suggested in 
the proposal, it's not easy to implemented the customized reusable 
serialization without memory copy with the previous interface. And it doesn't 
compatible with kryo's interface.
The current implementation(DefaultStatefulStreamCodec) of StatefulStreamCodec 
seems not natural for me. The pairs are cleared after first tuple. And for each 
other tuples, the pairs need to checked, and an instance of DataStatePair need 
to created to wrapper state(which is null except first tuple) and data.
I agree that the interface is not optimal and needs to be changed, but it is 
not possible to change it without breaking backward compatibility. It means 
that new interface needs to be introduced and BufferServer will need to deal 
with 3 different codec interfaces. Before this is done, I'd like to have a 
prove that there is significant performance benefit that justifies the 
complexity.
bq. The netlet client need to aware of the memory management mechanism in order 
to reset the memory. But the code is maintained in same class (current 
BufferServerPublisher; an extended class in proposal), so this should be ok.
The serialization formation don't have to change. it can compatible with the 
previous one. But if we want to take advantage of reserve() function of 
SerializationBuffer to avoid another extra copy, the format need to be changed 
to avoid use variable length.
Again, I'd like to see a staged prototype that proves that there is a 
performance benefit and that it is possible to avoid tight code coupling 
between Codec, Publisher and Tuple serialization format.

Overall, I am still (n) on the approach and I think that it needs to be 
revisited when netlet->netty migration is done. Netty has it's own memory 
allocator and it will be necessary to see how it can be integrated with Kryo 
and other serializers.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-23 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881293#comment-15881293
 ] 

bright chen commented on APEXCORE-635:
--

{quote}
Yes, an array is an object in java that may hold either primitive types or 
other objects. When an array is GC, it is GC as a single object. If an array 
holds primitive types, the array's memory is reclaimed in a single step, there 
is no GC of individual bytes/int/long in the array.
{quote}
Yes, Agree, One array of byte only treat as one object. But as the memory is 
relative big, it probably will not allocated in TLA( Thread Local Area ), which 
means it need to get lock for each allocation, so the allocation is not a very 
light operation. 

{quote}
I was referring to Output.getBuffer() and DefaultKryoStreamCodec that uses 
Output.getBuffer(). You are right that Output.toBytes() allocates a new array 
and copies bytes from Output internal buffer to the newly allocated buffer. 
This is a bug in DefaultStatefulStreamCodec, it should use Output.getBuffer().
{quote}
Output.getBuffer() probably not work in our case. The getBuffer() returns the 
reference of Output's internal buffer, which was used for putting the 
serialized bytes. As we need to share one instance of Output for multiple 
tuples, the buffer was reused when serialize next tuple. Which cause problem no 
matter we set the output's position to zero or not. If set position to zero, 
which means clean the previous serialized data. If not changed position, then 
the serialized data of next tuple concat to the previous one, but we don't know 
the boundary of each tuple, and the buffer keeping on growing which means 
output have to keep on allocate new big memory and copy the old data(if the 
initial buffer size is not very big).

{quote}
As I pointed to you, the prototype has a significant bug (reusing byte arrays 
that are not reusable due to fire and forget netlet behavior). What I would 
like to see is a series of small prototypes that prove that the approach 
provides significant performance gains.
{quote}
Yes, the original prototype reset the buffer when it should not. But as I 
pointed in this proposal, the buffer will be reset after the data sent to 
socket. That why need to override the write() method. 

{quote}
I don't see how without modifying the current definition of StreamCodec and 
StatefulStreamCodec the solution can be generalized. 
{quote}
Yes, agree. But I think it was worthy to change StreamCode, as suggested in the 
proposal, it's not easy to implemented the customized reusable serialization 
without memory copy with the previous interface. And it doesn't compatible with 
kryo's interface.
The current implementation(DefaultStatefulStreamCodec) of StatefulStreamCodec 
seems not natural for me. The pairs are cleared after first tuple. And for each 
other tuples, the pairs need to checked, and an instance of DataStatePair need 
to created to wrapper state(which is null except first tuple) and data.

{quote}
It may be possible to improve DefaultStatefulStreamCodec (concrete 
implementation of StatefulStreamCodec), but it seems to require a large amount 
of modifications to netlet and buffer server that other StreamCodec 
implementations will not benefit from. 
{quote}
We can change the behavior of netlet by override put() and write() method. So 
we don't need to change code in netlet.
The customized codec can benefit from it if change the codec interface as 
pointed in this proposal.

{quote}
Additionally, the proposal seems to require netlet client to be aware of codec 
implementation and any changes to tuple format will require changes to 
serialization and de-serialization code.
{quote}
The netlet client need to aware of the memory management mechanism in order to 
reset the memory. But the code is maintained in same class (current 
BufferServerPublisher; an extended class in proposal), so this should  be ok.
The serialization formation don't have to change. it can compatible with the 
previous one. But if we want to take advantage of reserve() function of 
SerializationBuffer to avoid another extra copy, the format need to be changed 
to avoid use variable length.




> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-19 Thread Vlad Rozov (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15873940#comment-15873940
 ] 

Vlad Rozov commented on APEXCORE-635:
-

bq. As I understand, array is a kind of object in java. Does java implemented 
differently? And how can JVM reuse allocated bytes without collection? Do you 
have any reference I can take a look?
Yes, an array is an object in java that may hold either primitive types or 
other objects. When an array is GC, it is GC as a single object. If an array 
holds primitive types, the array's memory is reclaimed in a single step, there 
is no GC of individual bytes/int/long in the array.
bq. I copied the implementation of kryo Output.toBytes() here ( kryo-2.24.0.jar 
). From this implementation toBytes() do allocate new memory and copy.
I was referring to Output.getBuffer() and DefaultKryoStreamCodec that uses 
Output.getBuffer(). You are right that Output.toBytes() allocates a new array 
and copies bytes from Output internal buffer to the newly allocated buffer. 
This is a bug in DefaultStatefulStreamCodec, it should use Output.getBuffer().
bq. I can get number, but that means need at least simply prototype. And based 
on my previous test for serialization string of 1000 ascii, The performance can 
gain 30%
As I pointed to you, the prototype has a significant bug (reusing byte arrays 
that are not reusable due to fire and forget netlet behavior). What I would 
like to see is a series of small prototypes that prove that the approach 
provides significant performance gains.

I don't see how without modifying the current definition of StreamCodec and 
StatefulStreamCodec the solution can be generalized. It may be possible to 
improve DefaultStatefulStreamCodec (concrete implementation of 
StatefulStreamCodec), but it seems to require a large amount of modifications 
to netlet and buffer server that other StreamCodec implementations will not 
benefit from. Additionally, the proposal seems to require netlet client to be 
aware of codec implementation and any changes to tuple format will require 
changes to serialization and de-serialization code.



> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-13 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864242#comment-15864242
 ] 

bright chen commented on APEXCORE-635:
--

Hi Vlad,
I think what you meant the issues I not addressed is a following: 
- Please benchmark copying one large block of memory in a single step and in 
multiple steps. What is the difference (in %)? Do the same for a direct buffer.
- How will control tuples be injected into a continuous block of memory?

my comments as following:
- From my previous comments, we can the problem of current mechanism is waste 2 
times memory copy and garbage collection. It's not the problem of copy one big 
bytes in one time or divided into multiple copies.
- The control tuples also write to the same SerializationBuffer.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if memory is not 
> enough. 
> The fourth solution could be treat whole object as flat, allocate memory and 
> handle it. For example as following. This solution 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-13 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864216#comment-15864216
 ] 

bright chen commented on APEXCORE-635:
--

Hi Vald,
- As I understand, array is a kind of object in java. Does java implemented 
differently? And how can JVM reuse allocated bytes without collection? Do you 
have any reference I can take a look?
- I copied the implementation of kryo Output.toBytes() here ( kryo-2.24.0.jar 
). From this implementation toBytes() do allocate new memory and copy.
public byte[] toBytes () {
byte[] newBuffer = new byte[position];
System.arraycopy(buffer, 0, newBuffer, 0, position);
return newBuffer;
}
- I can get number, but that means need at least simply prototype. And based on 
my previous test for serialization string of 1000 ascii, The performance can 
gain 30%
- Yes, it's pretty complex to implement all the proposal. But fortunately, most 
of memory manage stuff already implemented in Malhar. The things need to do is 
add some feature and integrate to BufferServer

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-11 Thread Vlad Rozov (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862609#comment-15862609
 ] 

Vlad Rozov commented on APEXCORE-635:
-

- JVM does not garbage collect bytes, it garbage collects objects.
- Output.toBytes() does not copy anything
- Without actual numbers, the discussion is abstract and is not convincing. 
- Complexity of the proposal is quite high
- Please see my prior comment, there are still issues not addressed
(n)

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if memory is not 
> enough. 
> The fourth solution could be treat whole object as flat, allocate memory and 
> handle it. For example as following. This solution solve the problem of pass 
> memory. But it has other problems of third solution and introduced some other 
> problems:
> Can’t reuse the code: we already have the StringSerde, but ListSerde 
> have to implement almost same logic again. 
> The serializeItemToMemory() method should be implemented depend on different 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-10 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862087#comment-15862087
 ] 

bright chen commented on APEXCORE-635:
--

I think the impact of garbage collection would be great depended on the memory 
situation.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if memory is not 
> enough. 
> The fourth solution could be treat whole object as flat, allocate memory and 
> handle it. For example as following. This solution solve the problem of pass 
> memory. But it has other problems of third solution and introduced some other 
> problems:
> Can’t reuse the code: we already have the StringSerde, but ListSerde 
> have to implement almost same logic again. 
> The serializeItemToMemory() method should be implemented depend on different 
> item type.
> class ListSerde {
>   Slice toByteArray(List list) {
> byte[] b = new byte[…];  //hard estimate proper size.
> int size = 0;
> for(T item : list) {
>   int length = 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-10 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15861966#comment-15861966
 ] 

bright chen commented on APEXCORE-635:
--

- If we use direct buffer, it seems ByteBuffer has no way to avoid memcopy. 
What I meant to avoid memcopy and garbage collection is explained as following 
by comparing details of current and new procedure( assume each tuple serialized 
to N bytes, and write M tuples in one time):

current procedure(use kryo serde as example):
1) kryo serde:   write data to buffer of kryo’s output, assume the length of 
bytes is N
2) call Output.toBytes() to get bytes from kryo:   need memcopy, copy N bytes
3) put message type and partition:   need memcopy, copy N bytes, the total 
length become N+5; N bytes need to be garbage collected
4) repeat 1 to 3 M times, and generate M slices
5) copy the M slices to ByteBuffer:  M*(N+5) bytes need to be garbage collected
6) write ByterBuffer to socket

new procedure(use kryo serde as example):
1) write message type and partition to SerializationBuffer
2) kryo serde: serde to SerializationBuffer, assume the length of bytes is N
3) call SerializationBuffer.toSlice() to get the slice: return the wrapper 
instead copy the data
4) repeat 1 to 3 M times, and generate M slices
5) merge the M slices into one in most of case(if these slices in same block). 
If in different block, probably handle one block each time.
6) copy or wrapper the to ByteBuffer. Probably need copy if want to use direct 
mode.
7) write ByterBuffer to socket. reset the SerializationBuffer

So, by comparing current and new procedure, current procedure need to copy 
around 3*N*M bytes and garbage collection of 2*N*M  bytes, but new procedure 
only need copy N*M bytes and no need garbage collection. So new approach save 
2*N*M memory copy and garbage collection.

- The memory of different blocks is not continuous as we want dynamic allocate 
block, but the slices in same block are continuous. As the size of block is 
much large than slices.

- The current fire and forget approach means no way to reuse the memory. That's 
why need to override write() method by walk around way. The new implementation 
of write() will reset the SerializationBuffer after wrote data to socket. The 
new approach don't need acknowledgement neither. 

- Yes, for variable length, it is not possible to reserve the space for length. 
But I doubled why we have to use variable length. It save at most 3 bytes for 
each message, but made parser harder. It's not common for protocols to use 
variable length. Even IP the fixed length. In worst case, even if we don't use 
the reserve feature of SerializationBuffer, we still can save 2*N*M memory copy 
and garbage collection.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-09 Thread Vlad Rozov (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15860770#comment-15860770
 ] 

Vlad Rozov commented on APEXCORE-635:
-

- Memory copy is required as it moves data from JVM heap to direct (off-heap) 
buffer. It is done prior to writing the buffer to a network channel.
- Please benchmark copying one large block of memory in a single step and in 
multiple steps. What is the difference (in %)? Do the same for a direct buffer.
- How will control tuples be injected into a continuous block of memory?
- The current approach is fire and forget, the proposed requires some kind of 
an acknowledgment. I am not convinced that the design that requires an 
acknowledgment will be faster.
- There is a plan to move to netty from netlet and netty provides buffer pool 
allocator that may better fit the proposed design.
- The current format uses variable length (1, 2, 3 or 4 bytes) prefix and it is 
not possible to know offset where data needs to be written prior to the lenght 
of the serialized data is known. 

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-09 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15860433#comment-15860433
 ] 

bright chen commented on APEXCORE-635:
--

The reason for override BufferServerPublisher.write()
- The current implementation of write() assume the buffer of the slices are not 
continuous and used memcopy to merge the bytes to send to socket. The new 
mechanism make sure the memory of continuous slices in same block are 
continuous and don't need memcopy for merge the bytes.
- The current mechanism don't reuse the memory, so the write() can just send 
data to the socket and leave the data to be garbage collected. The new 
mechanism reuse the memory, so the write() need the send the data and then 
reset the buffer(As there has no way to know what data has sent to socket 
outside of write()).

The reason why this mechanism performance better than kryo are:
- The memory can be reused instead of garbage collected after data send to 
socket, so avoid garbage collection
- Avoid unnecessary memory copy. Basically can avoid all extra copy required by 
kryo. the kryo need to copy data after serialized as the kryo internal buffer 
need to be used for next serialization. And for LV(length value) format of 
serialization, kryo Output doesn't provide mechanism to write serialized length 
without memcopy(In most of time, the serialized length can only know after 
serialized).
- the data which send to socket can be easily merged in a block without extra 
memory copy.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for 

[jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection

2017-02-09 Thread Vlad Rozov (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15859910#comment-15859910
 ] 

Vlad Rozov commented on APEXCORE-635:
-

The end goal of the override BufferServerPublisher.write() in is not clear. Why 
is the override necessary? It is also not clear why Kryo (that is a library 
designed to do efficient serialization and de-serialization) will not match the 
proposed mechanism. If the goal is to serialize Collections (list in the 
example) using Kryo, Kryo has CollectionsSerializer and will (should?) do 
something similar to what is proposed. IMO, it will be much better not to even 
use Kryo when Tuples are strongly typed and implement StreamCodec without using 
Kryo. Default StreamCodec is provided to handle cases when Tuples are not 
strongly typed, in which case it is required to use reflection. The same will 
apply to unchecked Collections. (n) till all those issues are resolved.

> Proposal: Manage memory to avoid memory copy and garbage collection
> ---
>
> Key: APEXCORE-635
> URL: https://issues.apache.org/jira/browse/APEXCORE-635
> Project: Apache Apex Core
>  Issue Type: Wish
>Reporter: bright chen
>Assignee: bright chen
>
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage 
> collection and avoid unnecessary memory copy to increase the performance. In 
> this proposal the term serde means serialization and deserialization. It’s 
> same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which 
> extends Kryo and optimize it by replace class by class id. And application 
> developer can optimize serializer by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It 
> basically optimize serde by replace class name by class id as my 
> understanding. And the state information only send before sending first 
> tuple, it’s kind like configuration for serde. So I suggest to separate this 
> feature from serde. The benefit is the customized serde can still use this 
> feature. And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application 
> developer point of view and look at how to implement StreamCodec. I take a 
> simple tuple List as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and 
> ListSerde delegate String to StringSerde. The benefit of this solution is the 
> StringSerde ListSerde can be reused. The problem is there need a lot of 
> temporary memory and memory copy. Following is the sample implement.
> Class StringSerde {
>   Slice toByteArray(String o) {
> byte[] b = o.getBytes(“UTF8”);  // new bytes
> byte[] b1 = new byte[b1.length + 4];  // new bytes
> set the length of the string at the first 4 bytes
> System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
> return new Slice(b1);
>   }
> class ListSerde {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List list) {
> Slice[] itemSlices = new Slice[list.size()];
> int size = 0;
> int index = 0;
> for(T item : list) {
>   Slice slice = itemSerde.toByteArray(item);
>   size += slice.length;
>   itemSlices[index++] = slice;
> }
> byte[] b = new byte[size+4];   //allocated the memory
> set the length of the list at the first 4 bytes
> copy the data from itemSlices
> return new Slice(b);
>   }
> }
>   
> from above code, we can see that around 2 times of required memory were 
> allocated and data copied twice( one copy maybe special to string, but 
> another copy is mandatory). And when bytes written to the socket, all 
> allocated memory can’t be reused but need to be garbage collected.
> The above tuple only have two levels, if the tuple have n level, n times of 
> required memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and 
> offset to item serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the 
> object but no way to pass memory. So the pass of memory will depends on 
> implementation.
> Another big problem of this solution is it hard to reallocate proper 
> memory(For this special case, it probably can allocate 2 times of all string 
> length. ). And the memory allocated more than required would be wasted until 
> data send to the socket(or allocate exact memory and copy the data to avoid 
> waste memory). And the code also need to handle the case if