This is an automated email from the ASF dual-hosted git repository. chaokunyang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-fury.git
The following commit(s) were added to refs/heads/main by this push: new bdada2d4 [Java] Support registration in thread safe fury (#1280) bdada2d4 is described below commit bdada2d4964dbe233ea1f3ae5ed3d362c357f899 Author: Shawn Yang <shawn.ck.y...@gmail.com> AuthorDate: Fri Dec 29 18:35:01 2023 +0800 [Java] Support registration in thread safe fury (#1280) Closes #1279 --- .../org/apache/fury/AbstractThreadSafeFury.java | 52 ++++++++++++++++++++++ .../src/main/java/org/apache/fury/Fury.java | 8 ++++ .../main/java/org/apache/fury/ThreadLocalFury.java | 19 +++++++- .../main/java/org/apache/fury/ThreadSafeFury.java | 36 +++++++++++++++ .../apache/fury/pool/ClassLoaderFuryPooled.java | 11 +++++ .../apache/fury/pool/FuryPooledObjectFactory.java | 2 +- .../java/org/apache/fury/pool/ThreadPoolFury.java | 16 ++++++- .../java/org/apache/fury/util/LoaderBinding.java | 34 ++++++++++++++ .../java/org/apache/fury/ThreadSafeFuryTest.java | 37 +++++++++++++++ 9 files changed, 211 insertions(+), 4 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java b/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java new file mode 100644 index 00000000..9f60a191 --- /dev/null +++ b/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fury; + +import java.util.function.Consumer; +import org.apache.fury.serializer.Serializer; + +public abstract class AbstractThreadSafeFury implements ThreadSafeFury { + @Override + public void register(Class<?> clz) { + processCallback(fury -> fury.register(clz)); + } + + @Override + public void register(Class<?> cls, boolean createSerializer) { + processCallback(fury -> fury.register(cls, createSerializer)); + } + + @Override + public void register(Class<?> cls, Short id) { + processCallback(fury -> fury.register(cls, id)); + } + + @Override + public void register(Class<?> cls, Short id, boolean createSerializer) { + processCallback(fury -> fury.register(cls, id, createSerializer)); + } + + @Override + public <T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass) { + processCallback(fury -> fury.registerSerializer(type, serializerClass)); + } + + protected abstract void processCallback(Consumer<Fury> callback); +} diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java b/java/fury-core/src/main/java/org/apache/fury/Fury.java index 5086bb6f..0ee93a9e 100644 --- a/java/fury-core/src/main/java/org/apache/fury/Fury.java +++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java @@ -54,6 +54,7 @@ import org.apache.fury.serializer.OpaqueObjects; import org.apache.fury.serializer.PrimitiveSerializers.LongSerializer; import org.apache.fury.serializer.Serializer; import org.apache.fury.serializer.SerializerFactory; +import org.apache.fury.serializer.Serializers; import org.apache.fury.serializer.StringSerializer; import org.apache.fury.type.Generics; import org.apache.fury.type.Type; @@ -176,6 +177,13 @@ public final class Fury { classResolver.register(cls, typeTag); } + /** + * Register a Serializer. + * + * @param type class needed to be serialized/deserialized + * @param serializerClass serializer class can be created with {@link Serializers#newSerializer} + * @param <T> type of class + */ public <T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass) { classResolver.registerSerializer(type, serializerClass); } diff --git a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java index f9ace460..e9d7441a 100644 --- a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java @@ -20,6 +20,8 @@ package org.apache.fury; import java.nio.ByteBuffer; +import java.util.WeakHashMap; +import java.util.function.Consumer; import java.util.function.Function; import javax.annotation.concurrent.ThreadSafe; import org.apache.fury.memory.MemoryBuffer; @@ -34,18 +36,24 @@ import org.apache.fury.util.LoaderBinding.StagingType; * will be created and destroyed frequently, which is slow. */ @ThreadSafe -public class ThreadLocalFury implements ThreadSafeFury { +public class ThreadLocalFury extends AbstractThreadSafeFury { private final ThreadLocal<MemoryBuffer> bufferLocal = ThreadLocal.withInitial(() -> MemoryUtils.buffer(32)); private final ThreadLocal<LoaderBinding> bindingThreadLocal; + private Consumer<Fury> factoryCallback; + private final WeakHashMap<LoaderBinding, Object> allFury; public ThreadLocalFury(Function<ClassLoader, Fury> furyFactory) { + factoryCallback = f -> {}; + allFury = new WeakHashMap<>(); bindingThreadLocal = ThreadLocal.withInitial( () -> { LoaderBinding binding = new LoaderBinding(furyFactory); + binding.setBindingCallback(factoryCallback); binding.setClassLoader(Thread.currentThread().getContextClassLoader()); + allFury.put(binding, null); return binding; }); // 1. init and warm for current thread. @@ -57,6 +65,15 @@ public class ThreadLocalFury implements ThreadSafeFury { fury.getConfig().getConfigHash(), fury.getClassResolver()); } + @Override + protected void processCallback(Consumer<Fury> callback) { + factoryCallback = factoryCallback.andThen(callback); + for (LoaderBinding binding : allFury.keySet()) { + binding.visitAllFury(callback); + binding.setBindingCallback(factoryCallback); + } + } + public <R> R execute(Function<Fury, R> action) { Fury fury = bindingThreadLocal.get().get(); return action.apply(fury); diff --git a/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java b/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java index 226f4df8..ad251a8f 100644 --- a/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java @@ -22,6 +22,8 @@ package org.apache.fury; import java.nio.ByteBuffer; import java.util.function.Function; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.serializer.Serializer; +import org.apache.fury.serializer.Serializers; import org.apache.fury.util.LoaderBinding; /** @@ -30,6 +32,40 @@ import org.apache.fury.util.LoaderBinding; */ public interface ThreadSafeFury { + /** register class. */ + void register(Class<?> cls); + + /** + * Register class. + * + * @param cls class to register + * @param createSerializer whether to create serializer, if true and codegen enabled, this will + * generate the serializer code too. + */ + void register(Class<?> cls, boolean createSerializer); + + /** register class with given id. */ + void register(Class<?> cls, Short id); + + /** + * Register class with specified id. + * + * @param cls class to register + * @param id id for provided class. + * @param createSerializer whether to create serializer, if true and codegen enabled, this will + * generate the serializer code too. + */ + void register(Class<?> cls, Short id, boolean createSerializer); + + /** + * Register a Serializer. + * + * @param type class needed to be serialized/deserialized + * @param serializerClass serializer class can be created with {@link Serializers#newSerializer} + * @param <T> type of class + */ + <T> void registerSerializer(Class<T> type, Class<? extends Serializer> serializerClass); + /** * Provide a context to execution operations on {@link Fury} directly and return the executed * result. diff --git a/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java b/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java index 9483ac7c..e99c8edd 100644 --- a/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java +++ b/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java @@ -20,11 +20,13 @@ package org.apache.fury.pool; import java.util.Queue; +import java.util.WeakHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.function.Function; import org.apache.fury.Fury; import org.apache.fury.util.LoggerFactory; @@ -36,6 +38,7 @@ public class ClassLoaderFuryPooled { private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderFuryPooled.class); private final Function<ClassLoader, Fury> furyFactory; + private Consumer<Fury> factoryCallback = f -> {}; private final ClassLoader classLoader; @@ -45,6 +48,8 @@ public class ClassLoaderFuryPooled { */ private final Queue<Fury> idleCacheQueue; + final WeakHashMap<Fury, Object> allFury = new WeakHashMap<>(); + /** active cache size's number change by : 1. getLoaderBind() 2. returnObject(LoaderBinding). */ private final AtomicInteger activeCacheNumber = new AtomicInteger(0); @@ -112,6 +117,12 @@ public class ClassLoaderFuryPooled { private void addFury() { Fury fury = furyFactory.apply(classLoader); + factoryCallback.accept(fury); idleCacheQueue.add(fury); + allFury.put(fury, null); + } + + void setFactoryCallback(Consumer<Fury> factoryCallback) { + this.factoryCallback = factoryCallback; } } diff --git a/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java b/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java index 22bcf939..3357001f 100644 --- a/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java +++ b/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java @@ -45,7 +45,7 @@ public class FuryPooledObjectFactory { * @see Cache * @see com.google.common.cache.CacheBuilder */ - private final Cache<ClassLoader, ClassLoaderFuryPooled> classLoaderFuryPooledCache; + final Cache<ClassLoader, ClassLoaderFuryPooled> classLoaderFuryPooledCache; /** ThreadLocal: ClassLoader. */ private final ThreadLocal<ClassLoader> classLoaderLocal = diff --git a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java index 4c49c2d8..8fc4a677 100644 --- a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java +++ b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java @@ -21,18 +21,20 @@ package org.apache.fury.pool; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import javax.annotation.concurrent.ThreadSafe; +import org.apache.fury.AbstractThreadSafeFury; import org.apache.fury.Fury; -import org.apache.fury.ThreadSafeFury; import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.memory.MemoryUtils; import org.apache.fury.util.LoaderBinding; @ThreadSafe -public class ThreadPoolFury implements ThreadSafeFury { +public class ThreadPoolFury extends AbstractThreadSafeFury { private final FuryPooledObjectFactory furyPooledObjectFactory; + private Consumer<Fury> factoryCallback = f -> {}; public ThreadPoolFury( Function<ClassLoader, Fury> furyFactory, @@ -44,6 +46,16 @@ public class ThreadPoolFury implements ThreadSafeFury { new FuryPooledObjectFactory(furyFactory, minPoolSize, maxPoolSize, expireTime, timeUnit); } + @Override + protected void processCallback(Consumer<Fury> callback) { + factoryCallback = factoryCallback.andThen(callback); + for (ClassLoaderFuryPooled furyPooled : + furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) { + furyPooled.allFury.keySet().forEach(callback); + furyPooled.setFactoryCallback(factoryCallback); + } + } + public <R> R execute(Function<Fury, R> action) { Fury fury = null; try { diff --git a/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java b/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java index 7961c38b..141ddaa7 100644 --- a/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java +++ b/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java @@ -20,7 +20,10 @@ package org.apache.fury.util; import java.lang.ref.SoftReference; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; import java.util.WeakHashMap; import java.util.function.Consumer; import java.util.function.Function; @@ -52,6 +55,33 @@ public final class LoaderBinding { return fury; } + public void visitAllFury(Consumer<Fury> consumer) { + if (furySoftMap.isEmpty()) { + for (Fury f : furyMap.values()) { + consumer.accept(f); + } + } else if (furyMap.isEmpty()) { + for (SoftReference<Fury> ref : furySoftMap.values()) { + Fury f = ref.get(); + if (f != null) { + consumer.accept(f); + } + } + } else { + Set<Fury> furySet = new HashSet<>(furyMap.size()); + Collections.addAll(furyMap.values()); + for (SoftReference<Fury> ref : furySoftMap.values()) { + Fury f = ref.get(); + if (f != null) { + furySet.add(f); + } + } + for (Fury f : furySet) { + consumer.accept(f); + } + } + } + public ClassLoader getClassLoader() { return loader; } @@ -143,6 +173,10 @@ public final class LoaderBinding { bindingCallback = bindingCallback.andThen(fury -> fury.register(clz, (short) id)); } + public void setBindingCallback(Consumer<Fury> bindingCallback) { + this.bindingCallback = bindingCallback; + } + public enum StagingType { /** * Don't cache fury. A new {@link Fury} will be created if classloader is switched to a new one. diff --git a/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java b/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java index bd28ba94..464ea6f5 100644 --- a/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java +++ b/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java @@ -35,6 +35,7 @@ import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.resolver.MetaContext; import org.apache.fury.serializer.Serializer; import org.apache.fury.test.bean.BeanA; +import org.apache.fury.test.bean.BeanB; import org.apache.fury.test.bean.Struct; import org.apache.fury.util.LoaderBinding.StagingType; import org.testng.Assert; @@ -72,6 +73,42 @@ public class ThreadSafeFuryTest extends FuryTestBase { assertFalse(hasException); } + @Test + public void testRegistration() throws Exception { + BeanB bean = BeanB.createBeanB(2); + ExecutorService executor = Executors.newSingleThreadExecutor(); + AtomicReference<Throwable> ex = new AtomicReference<>(); + { + ThreadSafeFury fury = + Fury.builder().requireClassRegistration(true).buildThreadSafeFuryPool(2, 4); + fury.register(BeanB.class); + Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean); + executor.execute( + () -> { + try { + Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean); + } catch (Throwable t) { + ex.set(t); + } + }); + Assert.assertNull(ex.get()); + } + { + ThreadSafeFury fury = Fury.builder().requireClassRegistration(true).buildThreadLocalFury(); + fury.register(BeanB.class); + Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean); + executor.execute( + () -> { + try { + Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean); + } catch (Throwable t) { + ex.set(t); + } + }); + Assert.assertNull(ex.get()); + } + } + @Test public void testSerialize() throws Exception { BeanA beanA = BeanA.createBeanA(2); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org