Done. For future reference - https://issues.apache.org/jira/browse/BEAM-4409 On Thu, May 24, 2018 at 8:41 PM Chamikara Jayalath <[email protected]> wrote:
> Sounds like a bug. Can you please create a JIRA with instructions for reproducing the issue ? > Thanks, > Cham > On Thu, May 24, 2018 at 9:09 AM Edward Pricer < [email protected]> wrote: >> Hi - hoping for a helping hand >> When trivially reading from an ActiveMQ queue, I eventually get a >> java.lang.NoSuchMethodException: javax.jms.Message.<init>() exception. >> The queue is populated out-of-process rapidly with a text message. >> Exception sometimes appears immediately, sometimes not for some time. >> Faster queue writes appears to exacerbate the problem. >> Beam 2.4.0, Java, DirectRunner >> Apparently the internals of DirectRunner are trying to clone a >> JmsCheckpointMark by >> encoding and decoding with a generic AvroCoder, but that's failing >> because part of the payload doesn't have a default constructor (in >> fact, it's trying to instantiate an interface). Do I need to be using >> JmsIO differently, is >> this a limitation of the DirectRunner, or is this actually a bug? >> Here's the test-case code. I don't think the publisher side is >> relevant, but let me know if it is. >> Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); >> // read from the queue >> ConnectionFactory factory = new >> ActiveMQConnectionFactory("tcp://localhost:61616"); >> PCollection<String> inputStrings = p.apply("Read from queue", >> JmsIO.<String>readMessage() .withConnectionFactory(factory) >> .withQueue("somequeue") .withCoder(StringUtf8Coder.of()) >> .withMessageMapper((JmsIO.MessageMapper<String>) message -> >> ((TextMessage) message).getText())); >> // decode PCollection<String> asStrings = inputStrings.apply("Decode >> Message", ParDo.of(new DoFn<String, String>() { @ProcessElement public >> void processElement(ProcessContext context) { >> System.out.println(context.element()); >> context.output(context.element()); } })); p.run(); >> Full stack trace: >> Exception in thread "main" java.lang.RuntimeException: >> java.lang.NoSuchMethodException: javax.jms.Message.<init>() >> at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) >> at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) >> at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) >> at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) >> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >> at org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219) >> at org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137) >> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) >> at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302) >> at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >> at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >> at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >> at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318) >> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:170) >> at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122) >> at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105) >> at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99) >> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148) >> at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194) >> at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124) >> at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161) >> at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.NoSuchMethodException: javax.jms.Message.<init>() >> at java.lang.Class.getConstructor0(Class.java:3082) >> at java.lang.Class.getDeclaredConstructor(Class.java:2178) >> at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) >> Thanks!
