[ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375654&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375654
 ]

ASF GitHub Bot logged work on BEAM-7427:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Jan/20 15:34
            Start Date: 22/Jan/20 15:34
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369616872
 
 

 ##########
 File path: 
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
 ##########
 @@ -406,6 +408,26 @@ public void testCheckpointMarkSafety() throws Exception {
     runner.join();
   }
 
+  /** Test the checkpoint mark default coder, which is actually AvroCoder. */
+  @Test
+  public void testCheckpointMarkDefaultCoder() throws Exception {
+    JmsIO.JmsCheckpointMark jmsCheckpointMark = new JmsIO.JmsCheckpointMark();
+    jmsCheckpointMark.add(new ActiveMQMessage());
+    Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
+    CoderProperties.coderSerializable(coder);
+    CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark);
+  }
+
+  @Test
+  public void testCheckpointMarkSerializableCoder() throws Exception {
+    JmsIO.JmsCheckpointMark jmsCheckpointMark = new JmsIO.JmsCheckpointMark();
 
 Review comment:
   This second test is not needed if you make 
`UnboundedJmsSource#getCheckpointMarkCoder` return the `SerializableCoder`. 
Also that is needed to get rid also of `AvroCoder` as part of these fixes.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 375654)
    Time Spent: 4h 40m  (was: 4.5h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---------------------------------------------------------------
>
>                 Key: BEAM-7427
>                 URL: https://issues.apache.org/jira/browse/BEAM-7427
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-jms
>    Affects Versions: 2.12.0
>         Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>            Reporter: Mourad
>            Assignee: Mourad
>            Priority: Major
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.<init>(Schema.java:403)
> at org.apache.avro.Schema$Field.<init>(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection<DFAMessage> messages = pipeline.apply("read messages from the 
> events broker", JmsIO.<DFAMessage>readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to