Sounds like a bug. Can you please create a JIRA with instructions for
reproducing the issue ?

Thanks,
Cham

On Thu, May 24, 2018 at 9:09 AM Edward Pricer <[email protected]>
wrote:

> Hi - hoping for a helping hand
>
> When trivially reading from an ActiveMQ queue, I eventually get a
> java.lang.NoSuchMethodException: javax.jms.Message.<init>() exception.
>
> The queue is populated out-of-process rapidly with a text message.
>
> Exception sometimes appears immediately, sometimes not for some time.
> Faster queue writes appears to exacerbate the problem.
>
> Beam 2.4.0, Java, DirectRunner
>
> Apparently the internals of DirectRunner are trying to clone a
> JmsCheckpointMark by
> encoding and decoding with a generic AvroCoder, but that's failing
> because part of the payload doesn't have a default constructor (in
> fact, it's trying to instantiate an interface). Do I need to be using
> JmsIO differently, is
> this a limitation of the DirectRunner, or is this actually a bug?
>
> Here's the test-case code. I don't think the publisher side is
> relevant, but let me know if it is.
>
> Pipeline p =
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
>
> // read from the queue
> ConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
>
> PCollection<String> inputStrings = p.apply("Read from queue",
> JmsIO.<String>readMessage() .withConnectionFactory(factory)
> .withQueue("somequeue") .withCoder(StringUtf8Coder.of())
> .withMessageMapper((JmsIO.MessageMapper<String>) message ->
> ((TextMessage) message).getText()));
>
> // decode PCollection<String> asStrings = inputStrings.apply("Decode
> Message", ParDo.of(new DoFn<String, String>() { @ProcessElement public
> void processElement(ProcessContext context) {
> System.out.println(context.element());
> context.output(context.element()); } })); p.run();
>
>
> Full stack trace:
>
> Exception in thread "main" java.lang.RuntimeException:
> java.lang.NoSuchMethodException: javax.jms.Message.<init>()
> at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
> at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219)
> at
> org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
> at
> org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318)
> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:170)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105)
> at
> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99)
> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
> at
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194)
> at
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124)
> at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
> at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoSuchMethodException: javax.jms.Message.<init>()
> at java.lang.Class.getConstructor0(Class.java:3082)
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
>
> Thanks!
>

Reply via email to