scwhittle commented on code in PR #34873:
URL: https://github.com/apache/beam/pull/34873#discussion_r2077109138


##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -840,4 +843,38 @@ public boolean equals(@Nullable Object other) {
   public int hashCode() {
     return Objects.hash(getClass(), typeDescriptor, datumFactory, 
schemaSupplier.get());
   }
+
+  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {

Review Comment:
   Actually I think that this could still lead to duplicate reader/writers if 
we deserialize the same logical coder multiple times.
   
   I think that could be fixed by using readResolve which allows modifying the 
deserialized object (just learning about this myself). 
https://docs.oracle.com/javase/8/docs/platform/serialization/spec/input.html#a5903
   
   One idea that seems clean is to change to serialize the cache key for 
AvroCoder using writeReplace and then use readResolve of that key to 
lookup/create the coder in the cache based upon that key.  There is an example 
in this file of that with SerializableSchemaString.
   



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java:
##########
@@ -242,7 +252,18 @@ public static <T> AvroCoder<T> of(Class<T> type, Schema 
schema) {
    * @param <T> the element type
    */
   public static <T> AvroCoder<T> of(AvroDatumFactory<T> datumFactory, Schema 
schema) {
-    return new AvroCoder<>(datumFactory, schema);
+    Class<T> type = datumFactory.getType();
+    return fromCacheOrCreate(type, schema, () -> new AvroCoder<>(datumFactory, 
schema));
+  }
+
+  private static <T> AvroCoder<T> fromCacheOrCreate(
+      Class<T> type, Schema schema, Callable<AvroCoder<T>> avroCoderCreator) {

Review Comment:
   it might be safer to have this take a type ENUM as well (specific, reflect). 
Otherwise AvroCoder.specific() factory could return something previously cached 
via AvroCoder.reflect() and vice versa.
   
   That might not be expected usage but seems like we might as well guard 
against that since it woudl be pretty confusing.



##########
sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java:
##########
@@ -416,6 +417,40 @@ public void testReflectRecordEncoding() throws Exception {
     CoderProperties.coderDecodeEncodeEqual(coderWithSchema, 
AVRO_SPECIFIC_RECORD);
   }
 
+  @Test
+  public void testCoderCached() {
+    Schema schema = AVRO_SPECIFIC_RECORD.getSchema();
+    TypeDescriptor<TestAvro> typeDescriptor = 
TypeDescriptor.of(TestAvro.class);
+    Class<TestAvro> clazz = TestAvro.class;
+    boolean useReflectApi = false;
+    AvroDatumFactory<TestAvro> datumFactory = new 
AvroDatumFactory.ReflectDatumFactory<>(clazz);
+
+    assertSame(AvroCoder.of(clazz), AvroCoder.of(clazz));
+    assertSame(AvroCoder.of(clazz, useReflectApi), AvroCoder.of(clazz, 
useReflectApi));
+    assertSame(AvroCoder.of(typeDescriptor), AvroCoder.of(typeDescriptor));
+    assertSame(
+        AvroCoder.of(typeDescriptor, useReflectApi), 
AvroCoder.of(typeDescriptor, useReflectApi));
+    assertSame(AvroCoder.of(clazz, schema), AvroCoder.of(clazz, schema));
+    assertSame(
+        AvroCoder.of(clazz, schema, useReflectApi), AvroCoder.of(clazz, 
schema, useReflectApi));
+    assertSame(AvroCoder.of(datumFactory, schema), AvroCoder.of(datumFactory, 
schema));
+
+    assertSame(AvroCoder.specific(clazz), AvroCoder.specific(clazz));

Review Comment:
   assert that AvroCoder.specific(clazz) is different than 
AvroCoder.reflect(clazz)



##########
sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroGenericCoder.java:
##########
@@ -17,17 +17,27 @@
  */
 package org.apache.beam.sdk.extensions.avro.coders;
 
+import java.util.concurrent.ExecutionException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
 
 /** AvroCoder specialisation for GenericRecord, needed for cross-language 
transforms. */
 public class AvroGenericCoder extends AvroCoder<GenericRecord> {
+  private static final Cache<Schema, AvroGenericCoder> 
AVRO_GENERIC_CODER_CACHE =
+      CacheBuilder.newBuilder().build();

Review Comment:
   use the weakValues option?  I'm not sure if there are cases where schemas 
are created dynamically and this cache could grow unbounded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to