[
https://issues.apache.org/jira/browse/APEXMALHAR-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15351979#comment-15351979
]
bright chen commented on APEXMALHAR-2126:
-----------------------------------------
let me take endWindow() of
org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl<K, V> as an
example.
public void endWindow()
{
for (K key: cache.getChangedKeys()) {
store.put(this.bucket, SliceUtils.concatenate(identifier,
serdeKey.serialize(key)),
SliceUtils.concatenate(identifier,
serdeValue.serialize(cache.get(key))));
}
…
}
public static Slice concatenate(byte[] a, Slice b)
{
int size = a.length + b.length;
byte[] bytes = new byte[size];
System.arraycopy(a, 0, bytes, 0, a.length);
System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
return new Slice(bytes);
}
so, each time call concatenate(), a new memory block allocated.
But with new approach, these can be avoided
private transient BufferSlice slice = new BufferSlice(SIZE);
public void endWindow()
{
slice.clear();
for (K key: cache.getChangedKeys()) {
store.put(this.bucket,
slice.append(identifier).append(serdeKey.serialize(key)).endSlice(),
slice.append(identifier).append(serdeValue.serialize(cache.get(key))).endSlice());
}
…
}
The class can be implemented like following:
public class BufferSlice
{
protected byte[] buffer;
protected int offset;
protected int length;
public BufferSlice(int size)
{
buffer = new byte[size];
}
public void append(byte[] data)
{
//TODO: handle over sized
System.arraycopy(buffer, offset + length, data, 0, data.length);
length += data.length;
}
public Slice endSlice()
{
Slice slice = new Slice(buffer, offset, length);
offset += length;
length = 0;
return slice;
}
}
> Suggest: Share Slice Buffer
> ---------------------------
>
> Key: APEXMALHAR-2126
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2126
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: bright chen
>
> I think the intention of Slice(com.datatorrent.netlet.util.Slice) was to
> share the buffer and avoid unnecessary memory allocation/deallocation. But
> the intension is not self-explain and lack of method to share the memory. And
> the util class org.apache.apex.malhar.lib.utils.serde.SliceUtils also create
> new memory and copy the data.
> I suggest to implement another class(Say BufferSlice), which
> - initialize buffer with relative large buffer
> - support append(byte[] data, int offset, int length)
> - dynamic reallocated buffer or throw exception when buffer is full ( based
> on the management strategy)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)