[ 
https://issues.apache.org/jira/browse/BEAM-4409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía resolved BEAM-4409.
--------------------------------
    Fix Version/s: 2.20.0
       Resolution: Fixed

> NoSuchMethodException reading from JmsIO
> ----------------------------------------
>
>                 Key: BEAM-4409
>                 URL: https://issues.apache.org/jira/browse/BEAM-4409
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-jms
>    Affects Versions: 2.4.0
>         Environment: Linux, Java 1.8, Beam 2.4, Direct Runner, ActiveMQ
>            Reporter: Edward Pricer
>            Priority: Major
>             Fix For: 2.20.0
>
>
> Running with the DirectRunner, and reading from a queue with JmsIO as an 
> unbounded source will produce a NoSuchMethodException. This occurs as the 
> UnboundedReadEvaluatorFactory.UnboundedReadEvaluator attempts to clone the 
> JmsCheckpointMark with the default (Avro) coder.
> The following trivial code on the reader side reproduces the error 
> (DirectRunner must be in path). The messages on the queue for this test case 
> were simple TextMessages. I found this exception is triggered more readily 
> when messages are published rapidly (~200/second)
> {code:java}
> Pipeline p = 
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
> // read from the queue
> ConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
> PCollection<String> inputStrings = p.apply("Read from queue",
> JmsIO.<String>readMessage() .withConnectionFactory(factory)
> .withQueue("somequeue") .withCoder(StringUtf8Coder.of())
> .withMessageMapper((JmsIO.MessageMapper<String>) message ->
> ((TextMessage) message).getText()));
> // decode 
> PCollection<String> asStrings = inputStrings.apply("Decode Message", 
> ParDo.of(new DoFn<String, String>() { @ProcessElement public
> void processElement(ProcessContext context) {
> System.out.println(context.element());
> context.output(context.element()); } })); 
> p.run();
> {code}
> Stack trace:
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: 
> java.lang.NoSuchMethodException: javax.jms.Message.<init>() at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) at 
> org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) at 
> org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
>  at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>  at 
> org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219)
>  at 
> org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137)
>  at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
>  at 
> org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302)
>  at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>  at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>  at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) 
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) 
> at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318) at 
> org.apache.beam.sdk.coders.Coder.decode(Coder.java:170) at 
> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122) 
> at 
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105) 
> at 
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99) 
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148) at 
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194)
>  at 
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124)
>  at 
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
>  at 
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.NoSuchMethodException: javax.jms.Message.<init>() at 
> java.lang.Class.getConstructor0(Class.java:3082) at 
> java.lang.Class.getDeclaredConstructor(Class.java:2178) at 
> org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
> {code}
>  
> And a more contrived example of how to produce the exception:
> {code:java}
> package org.apache.beam.sdk.io.jms; 
> import org.apache.activemq.command.ActiveMQTextMessage; 
> import org.apache.beam.sdk.coders.Coder; import 
> org.apache.beam.sdk.util.CoderUtils; 
> final class CoderErrorExample { public static void main(String[] args) throws 
> Exception { 
>   Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
>   JmsCheckpointMark checkpointMark = new JmsCheckpointMark(); 
>   checkpointMark.addMessage(new ActiveMQTextMessage());
>   CoderUtils.clone(coder, checkpointMark); // from 
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedReadEvaluator#getReader
> } 
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to