[ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=392593&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392593
 ]

ASF GitHub Bot logged work on BEAM-7427:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Feb/20 14:43
            Start Date: 25/Feb/20 14:43
    Worklog Time Spent: 10m 
      Work Description: Arthur-RD commented on issue #10644: [BEAM-7427] 
Refactor JmsCheckpointMark to use SerializableCoder
URL: https://github.com/apache/beam/pull/10644#issuecomment-590900778
 
 
   > Just to be clear: I strongly think my original implementation is largely 
better and the only change to do is to remove/transient the messages in the 
checkpoint mark.
   > Let me explain. In JMS, the messages stay in destination (topic or queue) 
up to the ack.
   > So, when we finalize the checkpoint mark, we ack the pending messages 
(stored in the checkpoint mark). If, for any reason, the checkpoint mark moves 
to another executor, and we don't have the pending messages, it's not a big 
deal as the messages are still in the destination, and will be consumed again.
   
   Hello,
   
   Thank you for this PR.
   
   However, could you please clarify this point? In particular:
      - **When does the finalizeCheckpoint method is used?**
      - **Could modifying the messages list to transient cause message loss?**
   
   My understanding is the following:
   We have a Consumer on a node A. It hasn't acknowledged the message 1 and is 
stopped by the host. The finalizeCheckpoint method is called. As the Consumer 
hasn't switched to another node, the messages list still contains this message 
and it's acknowledged.
   If unfortunately the Consumer has switched to a node B for any reason when 
the stop is asked, the message 1 is not transferred as the Checkpoint messages 
list is transient, so this consumer doesn't have this message anymore **BUT 
this message is still in the message broker as it hasn't been acked**.
   The message 1 will be consumed again by the consumer and isn't lost.
   
   Can you confirm it, please?
   **Does the only impact of changing messages to transient in the 
CheckpointMark is that they will be consumed twice, or can they be lost?**
   
   Thanks a lot for your answer,
   Arthur 
   
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 392593)
    Time Spent: 9h 50m  (was: 9h 40m)

> JmsCheckpointMark can not be correctly encoded
> ----------------------------------------------
>
>                 Key: BEAM-7427
>                 URL: https://issues.apache.org/jira/browse/BEAM-7427
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-jms
>    Affects Versions: 2.12.0, 2.13.0, 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 
> 2.19.0
>         Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>            Reporter: Mourad
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>             Fix For: 2.20.0
>
>          Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.<init>(Schema.java:403)
> at org.apache.avro.Schema$Field.<init>(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection<DFAMessage> messages = pipeline.apply("read messages from the 
> events broker", JmsIO.<DFAMessage>readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to