Thanks, I filed https://issues.apache.org/jira/browse/BEAM-5496 with the details of your report.
Would you be interested in submitting a patch with a test that exercises the bug? On Tue, Sep 25, 2018 at 1:21 AM flyisland <[email protected]> wrote: > Hi > > There is a bug of the built-in MqttIO, please check the < > https://github.com/apache/beam/blob/master/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java#L336>, > this readObject() method forget to invoke the "stream.defaultReadObject()" > method. > > // set an empty list to messages when deserialize > private void readObject(java.io.ObjectInputStream stream) > throws IOException, ClassNotFoundException { > messages = new ArrayList<>(); > } > } > > So there is an exception while the runner tried to deserialize the > checkpoint object. > java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: 95 > unexpected extra bytes after decoding > org.apache.beam.sdk.io.mqtt.MqttIO$MqttCheckpointMark@6764e219 at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:340) > ... > > > >
