[ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=376077&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376077
 ]

ASF GitHub Bot logged work on BEAM-7427:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Jan/20 06:20
            Start Date: 23/Jan/20 06:20
    Worklog Time Spent: 10m 
      Work Description: jbonofre commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-577523339
 
 
   Just to be clear: I strongly think my original implementation is largely 
better and the only change to do is to remove/transient the messages in the 
checkpoint mark.
   Let me explain. In JMS, the messages stay in destination (topic or queue) up 
to the ack.
   So, when we finalize the checkpoint mark, we ack the pending messages 
(stored in the checkpoint mark). If, for any reason, the checkpoint mark moves 
to another executor, and we don't have the pending messages, it's not a big 
deal as the messages are still in the destination, and will be consumed again.
   So, if you don't mind, I will move forward with my previous commit (where I 
use CLIENT_ACK and transient on messages in the checkpoint mark) adding the 
following changes:
   - checkpoint mark serialization coder (instead of avro coder)
   - transient on messages list
   Thoughts ?
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 376077)
    Time Spent: 7.5h  (was: 7h 20m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---------------------------------------------------------------
>
>                 Key: BEAM-7427
>                 URL: https://issues.apache.org/jira/browse/BEAM-7427
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-jms
>    Affects Versions: 2.12.0
>         Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>            Reporter: Mourad
>            Assignee: Mourad
>            Priority: Major
>          Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.<init>(Schema.java:403)
> at org.apache.avro.Schema$Field.<init>(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection<DFAMessage> messages = pipeline.apply("read messages from the 
> events broker", JmsIO.<DFAMessage>readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to