[ https://issues.apache.org/jira/browse/BEAM-4409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010819#comment-17010819 ]
Ismaël Mejía commented on BEAM-4409: ------------------------------------ This is related to BEAM-7427 so let's follow the discussion there since there seems to be more audience on the other ticket. > NoSuchMethodException reading from JmsIO > ---------------------------------------- > > Key: BEAM-4409 > URL: https://issues.apache.org/jira/browse/BEAM-4409 > Project: Beam > Issue Type: Bug > Components: io-java-jms > Affects Versions: 2.4.0 > Environment: Linux, Java 1.8, Beam 2.4, Direct Runner, ActiveMQ > Reporter: Edward Pricer > Priority: Major > > Running with the DirectRunner, and reading from a queue with JmsIO as an > unbounded source will produce a NoSuchMethodException. This occurs as the > UnboundedReadEvaluatorFactory.UnboundedReadEvaluator attempts to clone the > JmsCheckpointMark with the default (Avro) coder. > The following trivial code on the reader side reproduces the error > (DirectRunner must be in path). The messages on the queue for this test case > were simple TextMessages. I found this exception is triggered more readily > when messages are published rapidly (~200/second) > {code:java} > Pipeline p = > Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); > // read from the queue > ConnectionFactory factory = new > ActiveMQConnectionFactory("tcp://localhost:61616"); > PCollection<String> inputStrings = p.apply("Read from queue", > JmsIO.<String>readMessage() .withConnectionFactory(factory) > .withQueue("somequeue") .withCoder(StringUtf8Coder.of()) > .withMessageMapper((JmsIO.MessageMapper<String>) message -> > ((TextMessage) message).getText())); > // decode > PCollection<String> asStrings = inputStrings.apply("Decode Message", > ParDo.of(new DoFn<String, String>() { @ProcessElement public > void processElement(ProcessContext context) { > System.out.println(context.element()); > context.output(context.element()); } })); > p.run(); > {code} > Stack trace: > {code:java} > Exception in thread "main" java.lang.RuntimeException: > java.lang.NoSuchMethodException: javax.jms.Message.<init>() at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) at > org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) at > org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.reflect.ReflectDatumReader.readCollection(ReflectDatumReader.java:219) > at > org.apache.avro.reflect.ReflectDatumReader.readArray(ReflectDatumReader.java:137) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) > at > org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:302) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.beam.sdk.coders.AvroCoder.decode(AvroCoder.java:318) at > org.apache.beam.sdk.coders.Coder.decode(Coder.java:170) at > org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:122) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:105) > at > org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:99) > at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148) at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:194) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:124) > at > org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161) > at > org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.NoSuchMethodException: javax.jms.Message.<init>() at > java.lang.Class.getConstructor0(Class.java:3082) at > java.lang.Class.getDeclaredConstructor(Class.java:2178) at > org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347) > {code} > > And a more contrived example of how to produce the exception: > {code:java} > package org.apache.beam.sdk.io.jms; > import org.apache.activemq.command.ActiveMQTextMessage; > import org.apache.beam.sdk.coders.Coder; import > org.apache.beam.sdk.util.CoderUtils; > final class CoderErrorExample { public static void main(String[] args) throws > Exception { > Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder(); > JmsCheckpointMark checkpointMark = new JmsCheckpointMark(); > checkpointMark.addMessage(new ActiveMQTextMessage()); > CoderUtils.clone(coder, checkpointMark); // from > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedReadEvaluator#getReader > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)