This is an automated email from the ASF dual-hosted git repository.

chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-fury.git


The following commit(s) were added to refs/heads/main by this push:
     new bdada2d4 [Java] Support registration in thread safe fury (#1280)
bdada2d4 is described below

commit bdada2d4964dbe233ea1f3ae5ed3d362c357f899
Author: Shawn Yang <shawn.ck.y...@gmail.com>
AuthorDate: Fri Dec 29 18:35:01 2023 +0800

    [Java] Support registration in thread safe fury (#1280)
    
    Closes #1279
---
 .../org/apache/fury/AbstractThreadSafeFury.java    | 52 ++++++++++++++++++++++
 .../src/main/java/org/apache/fury/Fury.java        |  8 ++++
 .../main/java/org/apache/fury/ThreadLocalFury.java | 19 +++++++-
 .../main/java/org/apache/fury/ThreadSafeFury.java  | 36 +++++++++++++++
 .../apache/fury/pool/ClassLoaderFuryPooled.java    | 11 +++++
 .../apache/fury/pool/FuryPooledObjectFactory.java  |  2 +-
 .../java/org/apache/fury/pool/ThreadPoolFury.java  | 16 ++++++-
 .../java/org/apache/fury/util/LoaderBinding.java   | 34 ++++++++++++++
 .../java/org/apache/fury/ThreadSafeFuryTest.java   | 37 +++++++++++++++
 9 files changed, 211 insertions(+), 4 deletions(-)

diff --git 
a/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java 
b/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java
new file mode 100644
index 00000000..9f60a191
--- /dev/null
+++ b/java/fury-core/src/main/java/org/apache/fury/AbstractThreadSafeFury.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fury;
+
+import java.util.function.Consumer;
+import org.apache.fury.serializer.Serializer;
+
+public abstract class AbstractThreadSafeFury implements ThreadSafeFury {
+  @Override
+  public void register(Class<?> clz) {
+    processCallback(fury -> fury.register(clz));
+  }
+
+  @Override
+  public void register(Class<?> cls, boolean createSerializer) {
+    processCallback(fury -> fury.register(cls, createSerializer));
+  }
+
+  @Override
+  public void register(Class<?> cls, Short id) {
+    processCallback(fury -> fury.register(cls, id));
+  }
+
+  @Override
+  public void register(Class<?> cls, Short id, boolean createSerializer) {
+    processCallback(fury -> fury.register(cls, id, createSerializer));
+  }
+
+  @Override
+  public <T> void registerSerializer(Class<T> type, Class<? extends 
Serializer> serializerClass) {
+    processCallback(fury -> fury.registerSerializer(type, serializerClass));
+  }
+
+  protected abstract void processCallback(Consumer<Fury> callback);
+}
diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java 
b/java/fury-core/src/main/java/org/apache/fury/Fury.java
index 5086bb6f..0ee93a9e 100644
--- a/java/fury-core/src/main/java/org/apache/fury/Fury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java
@@ -54,6 +54,7 @@ import org.apache.fury.serializer.OpaqueObjects;
 import org.apache.fury.serializer.PrimitiveSerializers.LongSerializer;
 import org.apache.fury.serializer.Serializer;
 import org.apache.fury.serializer.SerializerFactory;
+import org.apache.fury.serializer.Serializers;
 import org.apache.fury.serializer.StringSerializer;
 import org.apache.fury.type.Generics;
 import org.apache.fury.type.Type;
@@ -176,6 +177,13 @@ public final class Fury {
     classResolver.register(cls, typeTag);
   }
 
+  /**
+   * Register a Serializer.
+   *
+   * @param type class needed to be serialized/deserialized
+   * @param serializerClass serializer class can be created with {@link 
Serializers#newSerializer}
+   * @param <T> type of class
+   */
   public <T> void registerSerializer(Class<T> type, Class<? extends 
Serializer> serializerClass) {
     classResolver.registerSerializer(type, serializerClass);
   }
diff --git a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java 
b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java
index f9ace460..e9d7441a 100644
--- a/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/ThreadLocalFury.java
@@ -20,6 +20,8 @@
 package org.apache.fury;
 
 import java.nio.ByteBuffer;
+import java.util.WeakHashMap;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.fury.memory.MemoryBuffer;
@@ -34,18 +36,24 @@ import org.apache.fury.util.LoaderBinding.StagingType;
  * will be created and destroyed frequently, which is slow.
  */
 @ThreadSafe
-public class ThreadLocalFury implements ThreadSafeFury {
+public class ThreadLocalFury extends AbstractThreadSafeFury {
   private final ThreadLocal<MemoryBuffer> bufferLocal =
       ThreadLocal.withInitial(() -> MemoryUtils.buffer(32));
 
   private final ThreadLocal<LoaderBinding> bindingThreadLocal;
+  private Consumer<Fury> factoryCallback;
+  private final WeakHashMap<LoaderBinding, Object> allFury;
 
   public ThreadLocalFury(Function<ClassLoader, Fury> furyFactory) {
+    factoryCallback = f -> {};
+    allFury = new WeakHashMap<>();
     bindingThreadLocal =
         ThreadLocal.withInitial(
             () -> {
               LoaderBinding binding = new LoaderBinding(furyFactory);
+              binding.setBindingCallback(factoryCallback);
               
binding.setClassLoader(Thread.currentThread().getContextClassLoader());
+              allFury.put(binding, null);
               return binding;
             });
     // 1. init and warm for current thread.
@@ -57,6 +65,15 @@ public class ThreadLocalFury implements ThreadSafeFury {
         fury.getConfig().getConfigHash(), fury.getClassResolver());
   }
 
+  @Override
+  protected void processCallback(Consumer<Fury> callback) {
+    factoryCallback = factoryCallback.andThen(callback);
+    for (LoaderBinding binding : allFury.keySet()) {
+      binding.visitAllFury(callback);
+      binding.setBindingCallback(factoryCallback);
+    }
+  }
+
   public <R> R execute(Function<Fury, R> action) {
     Fury fury = bindingThreadLocal.get().get();
     return action.apply(fury);
diff --git a/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java 
b/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java
index 226f4df8..ad251a8f 100644
--- a/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/ThreadSafeFury.java
@@ -22,6 +22,8 @@ package org.apache.fury;
 import java.nio.ByteBuffer;
 import java.util.function.Function;
 import org.apache.fury.memory.MemoryBuffer;
+import org.apache.fury.serializer.Serializer;
+import org.apache.fury.serializer.Serializers;
 import org.apache.fury.util.LoaderBinding;
 
 /**
@@ -30,6 +32,40 @@ import org.apache.fury.util.LoaderBinding;
  */
 public interface ThreadSafeFury {
 
+  /** register class. */
+  void register(Class<?> cls);
+
+  /**
+   * Register class.
+   *
+   * @param cls class to register
+   * @param createSerializer whether to create serializer, if true and codegen 
enabled, this will
+   *     generate the serializer code too.
+   */
+  void register(Class<?> cls, boolean createSerializer);
+
+  /** register class with given id. */
+  void register(Class<?> cls, Short id);
+
+  /**
+   * Register class with specified id.
+   *
+   * @param cls class to register
+   * @param id id for provided class.
+   * @param createSerializer whether to create serializer, if true and codegen 
enabled, this will
+   *     generate the serializer code too.
+   */
+  void register(Class<?> cls, Short id, boolean createSerializer);
+
+  /**
+   * Register a Serializer.
+   *
+   * @param type class needed to be serialized/deserialized
+   * @param serializerClass serializer class can be created with {@link 
Serializers#newSerializer}
+   * @param <T> type of class
+   */
+  <T> void registerSerializer(Class<T> type, Class<? extends Serializer> 
serializerClass);
+
   /**
    * Provide a context to execution operations on {@link Fury} directly and 
return the executed
    * result.
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java 
b/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java
index 9483ac7c..e99c8edd 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/pool/ClassLoaderFuryPooled.java
@@ -20,11 +20,13 @@
 package org.apache.fury.pool;
 
 import java.util.Queue;
+import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.fury.Fury;
 import org.apache.fury.util.LoggerFactory;
@@ -36,6 +38,7 @@ public class ClassLoaderFuryPooled {
   private static final Logger LOG = 
LoggerFactory.getLogger(ClassLoaderFuryPooled.class);
 
   private final Function<ClassLoader, Fury> furyFactory;
+  private Consumer<Fury> factoryCallback = f -> {};
 
   private final ClassLoader classLoader;
 
@@ -45,6 +48,8 @@ public class ClassLoaderFuryPooled {
    */
   private final Queue<Fury> idleCacheQueue;
 
+  final WeakHashMap<Fury, Object> allFury = new WeakHashMap<>();
+
   /** active cache size's number change by : 1. getLoaderBind() 2. 
returnObject(LoaderBinding). */
   private final AtomicInteger activeCacheNumber = new AtomicInteger(0);
 
@@ -112,6 +117,12 @@ public class ClassLoaderFuryPooled {
 
   private void addFury() {
     Fury fury = furyFactory.apply(classLoader);
+    factoryCallback.accept(fury);
     idleCacheQueue.add(fury);
+    allFury.put(fury, null);
+  }
+
+  void setFactoryCallback(Consumer<Fury> factoryCallback) {
+    this.factoryCallback = factoryCallback;
   }
 }
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java
 
b/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java
index 22bcf939..3357001f 100644
--- 
a/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java
+++ 
b/java/fury-core/src/main/java/org/apache/fury/pool/FuryPooledObjectFactory.java
@@ -45,7 +45,7 @@ public class FuryPooledObjectFactory {
    * @see Cache
    * @see com.google.common.cache.CacheBuilder
    */
-  private final Cache<ClassLoader, ClassLoaderFuryPooled> 
classLoaderFuryPooledCache;
+  final Cache<ClassLoader, ClassLoaderFuryPooled> classLoaderFuryPooledCache;
 
   /** ThreadLocal: ClassLoader. */
   private final ThreadLocal<ClassLoader> classLoaderLocal =
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java 
b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
index 4c49c2d8..8fc4a677 100644
--- a/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
+++ b/java/fury-core/src/main/java/org/apache/fury/pool/ThreadPoolFury.java
@@ -21,18 +21,20 @@ package org.apache.fury.pool;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.fury.AbstractThreadSafeFury;
 import org.apache.fury.Fury;
-import org.apache.fury.ThreadSafeFury;
 import org.apache.fury.memory.MemoryBuffer;
 import org.apache.fury.memory.MemoryUtils;
 import org.apache.fury.util.LoaderBinding;
 
 @ThreadSafe
-public class ThreadPoolFury implements ThreadSafeFury {
+public class ThreadPoolFury extends AbstractThreadSafeFury {
 
   private final FuryPooledObjectFactory furyPooledObjectFactory;
+  private Consumer<Fury> factoryCallback = f -> {};
 
   public ThreadPoolFury(
       Function<ClassLoader, Fury> furyFactory,
@@ -44,6 +46,16 @@ public class ThreadPoolFury implements ThreadSafeFury {
         new FuryPooledObjectFactory(furyFactory, minPoolSize, maxPoolSize, 
expireTime, timeUnit);
   }
 
+  @Override
+  protected void processCallback(Consumer<Fury> callback) {
+    factoryCallback = factoryCallback.andThen(callback);
+    for (ClassLoaderFuryPooled furyPooled :
+        furyPooledObjectFactory.classLoaderFuryPooledCache.asMap().values()) {
+      furyPooled.allFury.keySet().forEach(callback);
+      furyPooled.setFactoryCallback(factoryCallback);
+    }
+  }
+
   public <R> R execute(Function<Fury, R> action) {
     Fury fury = null;
     try {
diff --git 
a/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java 
b/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java
index 7961c38b..141ddaa7 100644
--- a/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java
+++ b/java/fury-core/src/main/java/org/apache/fury/util/LoaderBinding.java
@@ -20,7 +20,10 @@
 package org.apache.fury.util;
 
 import java.lang.ref.SoftReference;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.WeakHashMap;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -52,6 +55,33 @@ public final class LoaderBinding {
     return fury;
   }
 
+  public void visitAllFury(Consumer<Fury> consumer) {
+    if (furySoftMap.isEmpty()) {
+      for (Fury f : furyMap.values()) {
+        consumer.accept(f);
+      }
+    } else if (furyMap.isEmpty()) {
+      for (SoftReference<Fury> ref : furySoftMap.values()) {
+        Fury f = ref.get();
+        if (f != null) {
+          consumer.accept(f);
+        }
+      }
+    } else {
+      Set<Fury> furySet = new HashSet<>(furyMap.size());
+      Collections.addAll(furyMap.values());
+      for (SoftReference<Fury> ref : furySoftMap.values()) {
+        Fury f = ref.get();
+        if (f != null) {
+          furySet.add(f);
+        }
+      }
+      for (Fury f : furySet) {
+        consumer.accept(f);
+      }
+    }
+  }
+
   public ClassLoader getClassLoader() {
     return loader;
   }
@@ -143,6 +173,10 @@ public final class LoaderBinding {
     bindingCallback = bindingCallback.andThen(fury -> fury.register(clz, 
(short) id));
   }
 
+  public void setBindingCallback(Consumer<Fury> bindingCallback) {
+    this.bindingCallback = bindingCallback;
+  }
+
   public enum StagingType {
     /**
      * Don't cache fury. A new {@link Fury} will be created if classloader is 
switched to a new one.
diff --git 
a/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java 
b/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java
index bd28ba94..464ea6f5 100644
--- a/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java
+++ b/java/fury-core/src/test/java/org/apache/fury/ThreadSafeFuryTest.java
@@ -35,6 +35,7 @@ import org.apache.fury.memory.MemoryBuffer;
 import org.apache.fury.resolver.MetaContext;
 import org.apache.fury.serializer.Serializer;
 import org.apache.fury.test.bean.BeanA;
+import org.apache.fury.test.bean.BeanB;
 import org.apache.fury.test.bean.Struct;
 import org.apache.fury.util.LoaderBinding.StagingType;
 import org.testng.Assert;
@@ -72,6 +73,42 @@ public class ThreadSafeFuryTest extends FuryTestBase {
     assertFalse(hasException);
   }
 
+  @Test
+  public void testRegistration() throws Exception {
+    BeanB bean = BeanB.createBeanB(2);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    AtomicReference<Throwable> ex = new AtomicReference<>();
+    {
+      ThreadSafeFury fury =
+          
Fury.builder().requireClassRegistration(true).buildThreadSafeFuryPool(2, 4);
+      fury.register(BeanB.class);
+      Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean);
+      executor.execute(
+          () -> {
+            try {
+              Assert.assertEquals(fury.deserialize(fury.serialize(bean)), 
bean);
+            } catch (Throwable t) {
+              ex.set(t);
+            }
+          });
+      Assert.assertNull(ex.get());
+    }
+    {
+      ThreadSafeFury fury = 
Fury.builder().requireClassRegistration(true).buildThreadLocalFury();
+      fury.register(BeanB.class);
+      Assert.assertEquals(fury.deserialize(fury.serialize(bean)), bean);
+      executor.execute(
+          () -> {
+            try {
+              Assert.assertEquals(fury.deserialize(fury.serialize(bean)), 
bean);
+            } catch (Throwable t) {
+              ex.set(t);
+            }
+          });
+      Assert.assertNull(ex.get());
+    }
+  }
+
   @Test
   public void testSerialize() throws Exception {
     BeanA beanA = BeanA.createBeanA(2);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org
For additional commands, e-mail: commits-h...@fury.apache.org

Reply via email to