[ https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=392593&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392593 ]
ASF GitHub Bot logged work on BEAM-7427: ---------------------------------------- Author: ASF GitHub Bot Created on: 25/Feb/20 14:43 Start Date: 25/Feb/20 14:43 Worklog Time Spent: 10m Work Description: Arthur-RD commented on issue #10644: [BEAM-7427] Refactor JmsCheckpointMark to use SerializableCoder URL: https://github.com/apache/beam/pull/10644#issuecomment-590900778 > Just to be clear: I strongly think my original implementation is largely better and the only change to do is to remove/transient the messages in the checkpoint mark. > Let me explain. In JMS, the messages stay in destination (topic or queue) up to the ack. > So, when we finalize the checkpoint mark, we ack the pending messages (stored in the checkpoint mark). If, for any reason, the checkpoint mark moves to another executor, and we don't have the pending messages, it's not a big deal as the messages are still in the destination, and will be consumed again. Hello, Thank you for this PR. However, could you please clarify this point? In particular: - **When does the finalizeCheckpoint method is used?** - **Could modifying the messages list to transient cause message loss?** My understanding is the following: We have a Consumer on a node A. It hasn't acknowledged the message 1 and is stopped by the host. The finalizeCheckpoint method is called. As the Consumer hasn't switched to another node, the messages list still contains this message and it's acknowledged. If unfortunately the Consumer has switched to a node B for any reason when the stop is asked, the message 1 is not transferred as the Checkpoint messages list is transient, so this consumer doesn't have this message anymore **BUT this message is still in the message broker as it hasn't been acked**. The message 1 will be consumed again by the consumer and isn't lost. Can you confirm it, please? **Does the only impact of changing messages to transient in the CheckpointMark is that they will be consumed twice, or can they be lost?** Thanks a lot for your answer, Arthur ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 392593) Time Spent: 9h 50m (was: 9h 40m) > JmsCheckpointMark can not be correctly encoded > ---------------------------------------------- > > Key: BEAM-7427 > URL: https://issues.apache.org/jira/browse/BEAM-7427 > Project: Beam > Issue Type: Bug > Components: io-java-jms > Affects Versions: 2.12.0, 2.13.0, 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, > 2.19.0 > Environment: Message Broker : solace > JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0 > Reporter: Mourad > Assignee: Jean-Baptiste Onofré > Priority: Major > Fix For: 2.20.0 > > Time Spent: 9h 50m > Remaining Estimate: 0h > > I get the following exception when reading from unbounded JMS Source: > > {code:java} > Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0 > at org.apache.avro.Schema.validateName(Schema.java:1151) > at org.apache.avro.Schema.access$200(Schema.java:81) > at org.apache.avro.Schema$Field.<init>(Schema.java:403) > at org.apache.avro.Schema$Field.<init>(Schema.java:396) > at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622) > at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740) > at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604) > at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) > at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) > at > avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) > at > avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) > at > avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) > at > avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) > {code} > > The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to > generate schema. > JmsIO config : > > {code:java} > PCollection<DFAMessage> messages = pipeline.apply("read messages from the > events broker", JmsIO.<DFAMessage>readMessage() > .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) > .withMessageMapper(new DFAMessageMapper()) > .withCoder(AvroCoder.of(DFAMessage.class))); > {code} > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)