lukemin89 opened a new issue, #23084:
URL: https://github.com/apache/beam/issues/23084

   
   I spent a significant amount of time where the coder is missing or type is 
being erased, and located a single point.
   
   
   
   The issue is a combination of `ProtoCoder` and `MapElements`, as
   - `ProtoCoder` does not have `getEncodedTypeDescriptor()`, so it returns 
just `TypeDescriptor<Message>` instead of actual type.
   - `MapElements` only takes  `TypeDescriptor` as a parameter, and try to 
re-infer the coder.
   
   
   Proposed Fix:
   - set input coder to output coder in `Wait.OnSignal` after `MapElements`
   
   
   How to reproduce.
   
   ```
   import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
   import org.apache.beam.sdk.testing.TestPipeline;
   import org.apache.beam.sdk.transforms.Create;
   import org.apache.beam.sdk.transforms.Wait;
   import org.junit.Rule;
   import org.junit.Test;
   
   import com.google.protobuf.Empty;
   
   public class EmptyMessageTest {
     @Rule
     public final TestPipeline pipeline = TestPipeline.create();
   
     @Test
     public void test() {
       pipeline.getOptions().setStableUniqueNames(CheckEnabled.OFF);
   
       var emptyMessage = pipeline.apply(Create.of(Empty.getDefaultInstance()));
       var singleNull = pipeline.apply(Create.of((Void) null));
   
       emptyMessage.apply(Wait.on(singleNull));
       pipeline.run();
     }
   }
   ```
   
   ```
   Caused by: java.lang.NoSuchMethodException: 
com.google.protobuf.Message.getDefaultInstance()
        at java.base/java.lang.Class.getMethod(Class.java:2108)
        at 
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.getParser(ProtoCoder.java:297)
        at 
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:205)
        at 
org.apache.beam.sdk.extensions.protobuf.ProtoCoder.decode(ProtoCoder.java:108)
        at 
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
        at 
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101)
        at 
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:95)
        at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:144)
        at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
        at 
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to