This is an automated email from the ASF dual-hosted git repository. chaokunyang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-fury.git
The following commit(s) were added to refs/heads/main by this push: new ca996d4e feat(java): extract public Fury methods to BaseFury (#1467) ca996d4e is described below commit ca996d4ed7cfedf8370ca993bc429972fbacf518 Author: Nikita Ivchenko <munoo...@gmail.com> AuthorDate: Sat Apr 6 10:22:18 2024 +0300 feat(java): extract public Fury methods to BaseFury (#1467) Extract public Fury method to the `BaseFury` interface and implement them in `ThreadPoolFury` and `ThreadLocalFury`, so end users can avoid using `execute()` and other utility methods. Off topic: further steps might be changing `BaseFury` interface name to `Fury` while existing class renaming to something like `FuryImpl`. `FuryBuilder` should return the new `Fury` interface, so we will hide system methods like `xwriteRef` and others. WDYT? --- .../org/apache/fury/AbstractThreadSafeFury.java | 11 ++ .../src/main/java/org/apache/fury/BaseFury.java | 42 ++++++++ .../src/main/java/org/apache/fury/Fury.java | 20 +++- .../main/java/org/apache/fury/ThreadLocalFury.java | 97 ++++++++++++++++++ .../java/org/apache/fury/pool/ThreadPoolFury.java | 114 +++++++++++++++++++++ 5 files changed, 282 insertions(+), 2 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java b/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java index 9f60a191..4085e7d9 100644 --- a/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java @@ -21,6 +21,7 @@ package org.apache.fury; import java.util.function.Consumer; import org.apache.fury.serializer.Serializer; +import org.apache.fury.serializer.SerializerFactory; public abstract class AbstractThreadSafeFury implements ThreadSafeFury { @Override @@ -48,5 +49,15 @@ public abstract class AbstractThreadSafeFury implements ThreadSafeFury { processCallback(fury -> fury.registerSerializer(type, serializerClass)); } + @Override + public void registerSerializer(Class<?> type, Serializer<?> serializer) { + processCallback(fury -> fury.registerSerializer(type, serializer)); + } + + @Override + public void setSerializerFactory(SerializerFactory serializerFactory) { + processCallback(fury -> fury.setSerializerFactory(serializerFactory)); + } + protected abstract void processCallback(Consumer<Fury> callback); } diff --git a/java/fury-core/src/main/java/org/apache/fury/BaseFury.java b/java/fury-core/src/main/java/org/apache/fury/BaseFury.java index 2fc943cf..e268dfb5 100644 --- a/java/fury-core/src/main/java/org/apache/fury/BaseFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/BaseFury.java @@ -19,8 +19,12 @@ package org.apache.fury; +import java.io.InputStream; +import java.io.OutputStream; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.serializer.BufferCallback; import org.apache.fury.serializer.Serializer; +import org.apache.fury.serializer.SerializerFactory; import org.apache.fury.serializer.Serializers; /** All Fury’s basic interface, including Fury’s basic methods. */ @@ -64,9 +68,16 @@ public interface BaseFury { */ <T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass); + void registerSerializer(Class<?> type, Serializer<?> serializer); + + void setSerializerFactory(SerializerFactory serializerFactory); + /** Return serialized <code>obj</code> as a byte array. */ byte[] serialize(Object obj); + /** Return serialized <code>obj</code> as a byte array. */ + byte[] serialize(Object obj, BufferCallback callback); + /** * Serialize <code>obj</code> to a off-heap buffer specified by <code>address</code> and <code> * size</code>. @@ -76,9 +87,18 @@ public interface BaseFury { /** Serialize data into buffer. */ MemoryBuffer serialize(MemoryBuffer buffer, Object obj); + /** Serialize <code>obj</code> to a <code>buffer</code>. */ + MemoryBuffer serialize(MemoryBuffer buffer, Object obj, BufferCallback callback); + + void serialize(OutputStream outputStream, Object obj); + + void serialize(OutputStream outputStream, Object obj, BufferCallback callback); + /** Deserialize <code>obj</code> from a byte array. */ Object deserialize(byte[] bytes); + Object deserialize(byte[] bytes, Iterable<MemoryBuffer> outOfBandBuffers); + /** * Deserialize <code>obj</code> from a off-heap buffer specified by <code>address</code> and * <code>size</code>. @@ -88,6 +108,12 @@ public interface BaseFury { /** Deserialize <code>obj</code> from a <code>buffer</code>. */ Object deserialize(MemoryBuffer buffer); + Object deserialize(MemoryBuffer buffer, Iterable<MemoryBuffer> outOfBandBuffers); + + Object deserialize(InputStream inputStream); + + Object deserialize(InputStream inputStream, Iterable<MemoryBuffer> outOfBandBuffers); + /** * Serialize java object without class info, deserialization should use {@link * #deserializeJavaObject}. @@ -100,6 +126,8 @@ public interface BaseFury { */ void serializeJavaObject(MemoryBuffer buffer, Object obj); + void serializeJavaObject(OutputStream outputStream, Object obj); + /** * Deserialize java object from binary without class info, serialization should use {@link * #serializeJavaObject}. @@ -111,4 +139,18 @@ public interface BaseFury { * #serializeJavaObject}. */ <T> T deserializeJavaObject(MemoryBuffer buffer, Class<T> cls); + + <T> T deserializeJavaObject(InputStream inputStream, Class<T> cls); + + byte[] serializeJavaObjectAndClass(Object obj); + + void serializeJavaObjectAndClass(MemoryBuffer buffer, Object obj); + + void serializeJavaObjectAndClass(OutputStream outputStream, Object obj); + + Object deserializeJavaObjectAndClass(byte[] data); + + Object deserializeJavaObjectAndClass(MemoryBuffer buffer); + + Object deserializeJavaObjectAndClass(InputStream inputStream); } diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java b/java/fury-core/src/main/java/org/apache/fury/Fury.java index 407ae725..202c2f3a 100644 --- a/java/fury-core/src/main/java/org/apache/fury/Fury.java +++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java @@ -173,10 +173,12 @@ public final class Fury implements BaseFury { classResolver.registerSerializer(type, serializerClass); } + @Override public void registerSerializer(Class<?> type, Serializer<?> serializer) { classResolver.registerSerializer(type, serializer); } + @Override public void setSerializerFactory(SerializerFactory serializerFactory) { classResolver.setSerializerFactory(serializerFactory); } @@ -202,7 +204,7 @@ public final class Fury implements BaseFury { return bytes; } - /** Return serialized <code>obj</code> as a byte array. */ + @Override public byte[] serialize(Object obj, BufferCallback callback) { MemoryBuffer buf = getBuffer(); buf.writerIndex(0); @@ -217,7 +219,7 @@ public final class Fury implements BaseFury { return serialize(buffer, obj, null); } - /** Serialize <code>obj</code> to a <code>buffer</code>. */ + @Override public MemoryBuffer serialize(MemoryBuffer buffer, Object obj, BufferCallback callback) { this.bufferCallback = callback; int maskIndex = buffer.writerIndex(); @@ -261,10 +263,12 @@ public final class Fury implements BaseFury { } } + @Override public void serialize(OutputStream outputStream, Object obj) { serialize(outputStream, obj, null); } + @Override public void serialize(OutputStream outputStream, Object obj, BufferCallback callback) { MemoryBuffer buf = getBuffer(); buf.writerIndex(0); @@ -685,6 +689,7 @@ public final class Fury implements BaseFury { return deserialize(MemoryUtils.wrap(bytes), null); } + @Override public Object deserialize(byte[] bytes, Iterable<MemoryBuffer> outOfBandBuffers) { return deserialize(MemoryUtils.wrap(bytes), outOfBandBuffers); } @@ -712,6 +717,7 @@ public final class Fury implements BaseFury { * It is an error for <code>outOfBandBuffers</code> to be null if the serialized stream was * produced with a non-null `bufferCallback`. */ + @Override public Object deserialize(MemoryBuffer buffer, Iterable<MemoryBuffer> outOfBandBuffers) { try { jitContext.lock(); @@ -760,10 +766,12 @@ public final class Fury implements BaseFury { } } + @Override public Object deserialize(InputStream inputStream) { return deserialize(inputStream, null); } + @Override public Object deserialize(InputStream inputStream, Iterable<MemoryBuffer> outOfBandBuffers) { try { MemoryBuffer buf = getBuffer(); @@ -1043,6 +1051,7 @@ public final class Fury implements BaseFury { * Serialize java object without class info, deserialization should use {@link * #deserializeJavaObject}. */ + @Override public void serializeJavaObject(OutputStream outputStream, Object obj) { serializeToStream(outputStream, buf -> serializeJavaObject(buf, obj)); } @@ -1082,6 +1091,7 @@ public final class Fury implements BaseFury { * Deserialize java object from binary by passing class info, serialization should use {@link * #serializeJavaObject}. */ + @Override @SuppressWarnings("unchecked") public <T> T deserializeJavaObject(InputStream inputStream, Class<T> cls) { return (T) deserializeFromStream(inputStream, buf -> this.deserializeJavaObject(buf, cls)); @@ -1091,6 +1101,7 @@ public final class Fury implements BaseFury { * Deserialize java object from binary by passing class info, serialization should use {@link * #deserializeJavaObjectAndClass}. */ + @Override public byte[] serializeJavaObjectAndClass(Object obj) { MemoryBuffer buf = getBuffer(); buf.writerIndex(0); @@ -1104,6 +1115,7 @@ public final class Fury implements BaseFury { * Serialize java object with class info, deserialization should use {@link * #deserializeJavaObjectAndClass}. */ + @Override public void serializeJavaObjectAndClass(MemoryBuffer buffer, Object obj) { try { jitContext.lock(); @@ -1121,6 +1133,7 @@ public final class Fury implements BaseFury { * Serialize java object with class info, deserialization should use {@link * #deserializeJavaObjectAndClass}. */ + @Override public void serializeJavaObjectAndClass(OutputStream outputStream, Object obj) { serializeToStream(outputStream, buf -> serializeJavaObjectAndClass(buf, obj)); } @@ -1129,6 +1142,7 @@ public final class Fury implements BaseFury { * Deserialize class info and java object from binary, serialization should use {@link * #serializeJavaObjectAndClass}. */ + @Override public Object deserializeJavaObjectAndClass(byte[] data) { return deserializeJavaObjectAndClass(MemoryBuffer.fromByteArray(data)); } @@ -1137,6 +1151,7 @@ public final class Fury implements BaseFury { * Deserialize class info and java object from binary, serialization should use {@link * #serializeJavaObjectAndClass}. */ + @Override public Object deserializeJavaObjectAndClass(MemoryBuffer buffer) { try { jitContext.lock(); @@ -1158,6 +1173,7 @@ public final class Fury implements BaseFury { * Deserialize class info and java object from binary, serialization should use {@link * #serializeJavaObjectAndClass}. */ + @Override public Object deserializeJavaObjectAndClass(InputStream inputStream) { return deserializeFromStream(inputStream, this::deserializeJavaObjectAndClass); } diff --git a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java index e9d7441a..af705fc9 100644 --- a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java @@ -19,6 +19,8 @@ package org.apache.fury; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.WeakHashMap; import java.util.function.Consumer; @@ -27,6 +29,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.memory.MemoryUtils; import org.apache.fury.resolver.ClassResolver; +import org.apache.fury.serializer.BufferCallback; import org.apache.fury.util.LoaderBinding; import org.apache.fury.util.LoaderBinding.StagingType; @@ -74,11 +77,13 @@ public class ThreadLocalFury extends AbstractThreadSafeFury { } } + @Override public <R> R execute(Function<Fury, R> action) { Fury fury = bindingThreadLocal.get().get(); return action.apply(fury); } + @Override public byte[] serialize(Object obj) { MemoryBuffer buffer = bufferLocal.get(); buffer.writerIndex(0); @@ -86,31 +91,79 @@ public class ThreadLocalFury extends AbstractThreadSafeFury { return buffer.getBytes(0, buffer.writerIndex()); } + @Override + public byte[] serialize(Object obj, BufferCallback callback) { + MemoryBuffer buffer = bufferLocal.get(); + buffer.writerIndex(0); + bindingThreadLocal.get().get().serialize(buffer, obj, callback); + return buffer.getBytes(0, buffer.writerIndex()); + } + @Override public MemoryBuffer serialize(Object obj, long address, int size) { return bindingThreadLocal.get().get().serialize(obj, address, size); } + @Override public MemoryBuffer serialize(MemoryBuffer buffer, Object obj) { return bindingThreadLocal.get().get().serialize(buffer, obj); } + @Override + public MemoryBuffer serialize(MemoryBuffer buffer, Object obj, BufferCallback callback) { + return bindingThreadLocal.get().get().serialize(buffer, obj, callback); + } + + @Override + public void serialize(OutputStream outputStream, Object obj) { + bindingThreadLocal.get().get().serialize(outputStream, obj); + } + + @Override + public void serialize(OutputStream outputStream, Object obj, BufferCallback callback) { + bindingThreadLocal.get().get().serialize(outputStream, obj, callback); + } + + @Override public Object deserialize(byte[] bytes) { return bindingThreadLocal.get().get().deserialize(bytes); } + @Override + public Object deserialize(byte[] bytes, Iterable<MemoryBuffer> outOfBandBuffers) { + return bindingThreadLocal.get().get().deserialize(bytes, outOfBandBuffers); + } + + @Override public Object deserialize(long address, int size) { return bindingThreadLocal.get().get().deserialize(address, size); } + @Override public Object deserialize(MemoryBuffer buffer) { return bindingThreadLocal.get().get().deserialize(buffer); } + @Override public Object deserialize(ByteBuffer byteBuffer) { return bindingThreadLocal.get().get().deserialize(MemoryUtils.wrap(byteBuffer)); } + @Override + public Object deserialize(MemoryBuffer buffer, Iterable<MemoryBuffer> outOfBandBuffers) { + return bindingThreadLocal.get().get().deserialize(buffer, outOfBandBuffers); + } + + @Override + public Object deserialize(InputStream inputStream) { + return bindingThreadLocal.get().get().deserialize(inputStream); + } + + @Override + public Object deserialize(InputStream inputStream, Iterable<MemoryBuffer> outOfBandBuffers) { + return bindingThreadLocal.get().get().deserialize(inputStream, outOfBandBuffers); + } + @Override public byte[] serializeJavaObject(Object obj) { return bindingThreadLocal.get().get().serializeJavaObject(obj); @@ -121,6 +174,11 @@ public class ThreadLocalFury extends AbstractThreadSafeFury { bindingThreadLocal.get().get().serializeJavaObject(buffer, obj); } + @Override + public void serializeJavaObject(OutputStream outputStream, Object obj) { + bindingThreadLocal.get().get().serializeJavaObject(outputStream, obj); + } + @Override public <T> T deserializeJavaObject(byte[] data, Class<T> cls) { return bindingThreadLocal.get().get().deserializeJavaObject(data, cls); @@ -131,18 +189,57 @@ public class ThreadLocalFury extends AbstractThreadSafeFury { return bindingThreadLocal.get().get().deserializeJavaObject(buffer, cls); } + @Override + public <T> T deserializeJavaObject(InputStream inputStream, Class<T> cls) { + return bindingThreadLocal.get().get().deserializeJavaObject(inputStream, cls); + } + + @Override + public byte[] serializeJavaObjectAndClass(Object obj) { + return bindingThreadLocal.get().get().serializeJavaObjectAndClass(obj); + } + + @Override + public void serializeJavaObjectAndClass(MemoryBuffer buffer, Object obj) { + bindingThreadLocal.get().get().serializeJavaObjectAndClass(buffer, obj); + } + + @Override + public void serializeJavaObjectAndClass(OutputStream outputStream, Object obj) { + bindingThreadLocal.get().get().serializeJavaObjectAndClass(outputStream, obj); + } + + @Override + public Object deserializeJavaObjectAndClass(byte[] data) { + return bindingThreadLocal.get().get().deserializeJavaObjectAndClass(data); + } + + @Override + public Object deserializeJavaObjectAndClass(MemoryBuffer buffer) { + return bindingThreadLocal.get().get().deserializeJavaObjectAndClass(buffer); + } + + @Override + public Object deserializeJavaObjectAndClass(InputStream inputStream) { + return bindingThreadLocal.get().get().deserializeJavaObjectAndClass(inputStream); + } + + @Override public void setClassLoader(ClassLoader classLoader) { setClassLoader(classLoader, StagingType.SOFT_STAGING); } + @Override public void setClassLoader(ClassLoader classLoader, StagingType stagingType) { bindingThreadLocal.get().setClassLoader(classLoader, stagingType); } + @Override public ClassLoader getClassLoader() { return bindingThreadLocal.get().getClassLoader(); } + @Override public void clearClassLoader(ClassLoader loader) { bindingThreadLocal.get().clearClassLoader(loader); } diff --git a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java index 97549203..86bb1e3a 100644 --- a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java @@ -19,6 +19,8 @@ package org.apache.fury.pool; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -28,6 +30,7 @@ import org.apache.fury.AbstractThreadSafeFury; import org.apache.fury.Fury; import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.memory.MemoryUtils; +import org.apache.fury.serializer.BufferCallback; import org.apache.fury.util.LoaderBinding; @ThreadSafe @@ -56,6 +59,7 @@ public class ThreadPoolFury extends AbstractThreadSafeFury { } } + @Override public <R> R execute(Function<Fury, R> action) { Fury fury = null; try { @@ -66,35 +70,89 @@ public class ThreadPoolFury extends AbstractThreadSafeFury { } } + @Override public byte[] serialize(Object obj) { return execute(fury -> fury.serialize(obj)); } + @Override + public byte[] serialize(Object obj, BufferCallback callback) { + return execute(fury -> fury.serialize(obj, callback)); + } + @Override public MemoryBuffer serialize(Object obj, long address, int size) { return execute(fury -> fury.serialize(obj, address, size)); } + @Override public MemoryBuffer serialize(MemoryBuffer buffer, Object obj) { return execute(fury -> fury.serialize(buffer, obj)); } + @Override + public MemoryBuffer serialize(MemoryBuffer buffer, Object obj, BufferCallback callback) { + return execute(fury -> fury.serialize(buffer, obj, callback)); + } + + @Override + public void serialize(OutputStream outputStream, Object obj) { + execute( + fury -> { + fury.serialize(outputStream, obj); + return null; + }); + } + + @Override + public void serialize(OutputStream outputStream, Object obj, BufferCallback callback) { + execute( + fury -> { + fury.serialize(outputStream, obj, callback); + return null; + }); + } + + @Override public Object deserialize(byte[] bytes) { return execute(fury -> fury.deserialize(bytes)); } + @Override + public Object deserialize(byte[] bytes, Iterable<MemoryBuffer> outOfBandBuffers) { + return execute(fury -> fury.deserialize(bytes, outOfBandBuffers)); + } + + @Override public Object deserialize(long address, int size) { return execute(fury -> fury.deserialize(address, size)); } + @Override public Object deserialize(MemoryBuffer buffer) { return execute(fury -> fury.deserialize(buffer)); } + @Override public Object deserialize(ByteBuffer byteBuffer) { return execute(fury -> fury.deserialize(MemoryUtils.wrap(byteBuffer))); } + @Override + public Object deserialize(MemoryBuffer buffer, Iterable<MemoryBuffer> outOfBandBuffers) { + return execute(fury -> fury.deserialize(buffer, outOfBandBuffers)); + } + + @Override + public Object deserialize(InputStream inputStream) { + return execute(fury -> fury.deserialize(inputStream)); + } + + @Override + public Object deserialize(InputStream inputStream, Iterable<MemoryBuffer> outOfBandBuffers) { + return execute(fury -> fury.deserialize(inputStream, outOfBandBuffers)); + } + @Override public byte[] serializeJavaObject(Object obj) { return execute(fury -> fury.serializeJavaObject(obj)); @@ -109,6 +167,15 @@ public class ThreadPoolFury extends AbstractThreadSafeFury { }); } + @Override + public void serializeJavaObject(OutputStream outputStream, Object obj) { + execute( + fury -> { + fury.serializeJavaObject(outputStream, obj); + return null; + }); + } + @Override public <T> T deserializeJavaObject(byte[] data, Class<T> cls) { return execute(fury -> fury.deserializeJavaObject(data, cls)); @@ -119,18 +186,65 @@ public class ThreadPoolFury extends AbstractThreadSafeFury { return execute(fury -> fury.deserializeJavaObject(buffer, cls)); } + @Override + public <T> T deserializeJavaObject(InputStream inputStream, Class<T> cls) { + return execute(fury -> fury.deserializeJavaObject(inputStream, cls)); + } + + @Override + public byte[] serializeJavaObjectAndClass(Object obj) { + return execute(fury -> fury.serializeJavaObjectAndClass(obj)); + } + + @Override + public void serializeJavaObjectAndClass(MemoryBuffer buffer, Object obj) { + execute( + fury -> { + fury.serializeJavaObjectAndClass(buffer, obj); + return null; + }); + } + + @Override + public void serializeJavaObjectAndClass(OutputStream outputStream, Object obj) { + execute( + fury -> { + fury.serializeJavaObjectAndClass(outputStream, obj); + return null; + }); + } + + @Override + public Object deserializeJavaObjectAndClass(byte[] data) { + return execute(fury -> fury.deserializeJavaObjectAndClass(data)); + } + + @Override + public Object deserializeJavaObjectAndClass(MemoryBuffer buffer) { + return execute(fury -> fury.deserializeJavaObjectAndClass(buffer)); + } + + @Override + public Object deserializeJavaObjectAndClass(InputStream inputStream) { + return execute(fury -> fury.deserializeJavaObjectAndClass(inputStream)); + } + + @Override public void setClassLoader(ClassLoader classLoader) { setClassLoader(classLoader, LoaderBinding.StagingType.SOFT_STAGING); } + @Override public void setClassLoader(ClassLoader classLoader, LoaderBinding.StagingType stagingType) { furyPooledObjectFactory.setClassLoader(classLoader, stagingType); } + @Override public ClassLoader getClassLoader() { return furyPooledObjectFactory.getClassLoader(); } + @Override public void clearClassLoader(ClassLoader loader) { furyPooledObjectFactory.clearClassLoader(loader); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org