[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=378284=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378284
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 28/Jan/20 14:52
Start Date: 28/Jan/20 14:52
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #8757: [BEAM-7427] Fix 
JmsCheckpointMark Avro Encoding
URL: https://github.com/apache/beam/pull/8757#issuecomment-579282755
 
 
   For info #10644 was merged today. The fix will be part of Beam 2.20.0 since 
the vote for 2.19.0 has already started.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378284)
Time Spent: 9h 20m  (was: 9h 10m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0, 2.13.0, 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 
> 2.19.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Jean-Baptiste Onofré
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=378281=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378281
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 28/Jan/20 14:50
Start Date: 28/Jan/20 14:50
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #10644: [BEAM-7427] 
Refactor JmsCheckpointMark to use SerializableCoder
URL: https://github.com/apache/beam/pull/10644
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378281)
Time Spent: 9h  (was: 8h 50m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0, 2.13.0, 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 
> 2.19.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Jean-Baptiste Onofré
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=378282=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378282
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 28/Jan/20 14:50
Start Date: 28/Jan/20 14:50
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #10644: [BEAM-7427] Refactor 
JmsCheckpointMark to use SerializableCoder
URL: https://github.com/apache/beam/pull/10644#issuecomment-579281715
 
 
   Merged manually, thanks again JB!
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378282)
Time Spent: 9h 10m  (was: 9h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0, 2.13.0, 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 
> 2.19.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Jean-Baptiste Onofré
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=378280=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378280
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 28/Jan/20 14:48
Start Date: 28/Jan/20 14:48
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-579280837
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378280)
Time Spent: 8h 50m  (was: 8h 40m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0, 2.13.0, 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 
> 2.19.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Jean-Baptiste Onofré
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=378258=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378258
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 28/Jan/20 14:15
Start Date: 28/Jan/20 14:15
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-579265247
 
 
   @iemejia thanks ! I will switch to other IOs improvements ;)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378258)
Time Spent: 8h 40m  (was: 8.5h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0, 2.13.0, 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 
> 2.19.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Jean-Baptiste Onofré
>Priority: Major
> Fix For: 2.20.0
>
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=378249=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378249
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 28/Jan/20 13:52
Start Date: 28/Jan/20 13:52
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-579255395
 
 
   Oh you already get rid of state hehe, my bad ok looking again.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378249)
Time Spent: 8.5h  (was: 8h 20m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=378246=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378246
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 28/Jan/20 13:50
Start Date: 28/Jan/20 13:50
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-579254727
 
 
   Sorry for the delay, the extra commit with the fixes looks good. I was 
thinking that since the stored messages are not needed to restore the progress 
of the reads on `UnboundedJmsReader` maybe the simplest fix is just to let them 
transient as you proposed.
   About the State changes maybe let's do those in a subsequent PR so we can 
get this fix out more quickly. WDYT If you agree just let the class as it was 
before and then I will merge.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378246)
Time Spent: 8h 20m  (was: 8h 10m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=378223=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-378223
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 28/Jan/20 13:25
Start Date: 28/Jan/20 13:25
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-579243908
 
 
   As discussed, I've keep `JmsCheckpointMark` in a dedicated class.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 378223)
Time Spent: 8h 10m  (was: 8h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=376160=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376160
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 23/Jan/20 09:56
Start Date: 23/Jan/20 09:56
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-577608491
 
 
   FYI, I'm updating the PR to have JmsCheckpointMark in a separate class (but 
without State inner class which is useless IMHO).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 376160)
Time Spent: 8h  (was: 7h 50m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=376080=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376080
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 23/Jan/20 06:35
Start Date: 23/Jan/20 06:35
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369950985
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+public JmsCheckpointMark() {}
+
+public void add(Message message) throws Exception {
+  lock.writeLock().lock();
+  try {
+Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
+if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
+  oldestMessageTimestamp = currentMessageTimestamp;
+}
+messages.add(message);
+  } finally {
+lock.writeLock().unlock();
+  }
+}
+
+public Instant getOldestMessageTimestamp() {
+  lock.readLock().lock();
+  try {
+return this.oldestMessageTimestamp;
+  } finally {
+lock.readLock().unlock();
+  }
+}
+
+/**
+ * Acknowledge all outstanding message. Since we believe that messages 
will be delivered in
+ * timestamp order, and acknowledged messages will not be retried, the 
newest message in this
+ * batch is a good bound for future messages.
+ */
+@Override
+public void finalizeCheckpoint() {
+  lock.writeLock().lock();
+  try {
+for (Message message : messages) {
+  try {
+message.acknowledge();
+Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
+if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
+  oldestMessageTimestamp = currentMessageTimestamp;
+}
+  } catch (Exception e) {
+LOG.error("Exception while finalizing message: {}", e);
 
 Review comment:
   No, it's not a problem as the ack won't be sent.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 376080)
Time Spent: 7h 50m  (was: 7h 40m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=376079=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376079
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 23/Jan/20 06:34
Start Date: 23/Jan/20 06:34
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-577526821
 
 
   I have updated the PR to use serialization coder instead of avro coder by 
default.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 376079)
Time Spent: 7h 40m  (was: 7.5h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=376077=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376077
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 23/Jan/20 06:20
Start Date: 23/Jan/20 06:20
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-577523339
 
 
   Just to be clear: I strongly think my original implementation is largely 
better and the only change to do is to remove/transient the messages in the 
checkpoint mark.
   Let me explain. In JMS, the messages stay in destination (topic or queue) up 
to the ack.
   So, when we finalize the checkpoint mark, we ack the pending messages 
(stored in the checkpoint mark). If, for any reason, the checkpoint mark moves 
to another executor, and we don't have the pending messages, it's not a big 
deal as the messages are still in the destination, and will be consumed again.
   So, if you don't mind, I will move forward with my previous commit (where I 
use CLIENT_ACK and transient on messages in the checkpoint mark) adding the 
following changes:
   - checkpoint mark serialization coder (instead of avro coder)
   - transient on messages list
   Thoughts ?
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 376077)
Time Spent: 7.5h  (was: 7h 20m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=376068=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-376068
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 23/Jan/20 05:23
Start Date: 23/Jan/20 05:23
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-577510523
 
 
   In the last commit, I'm using a different approach.
   Now, the messages are automatically ack on the JMS broker. The pending 
messages stored in the checkpoint mark. As I don't have to ack the messages 
"explicitly" anymore, I can now convert the JMS Message as Beam JmsRecord, and 
so CheckpointMark is now serializable.
   This commit needs some polish, but I wanted to share the rough idea with you.
   Thoughts ?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 376068)
Time Spent: 7h 20m  (was: 7h 10m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375768=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375768
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 17:47
Start Date: 22/Jan/20 17:47
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369708568
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
 
 Review comment:
   We can limit the size of the messages list and call finalize checkpoint. 
State is useless IMHO, I would keep it in checkpoint mark as it's more straight 
forward to understand.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375768)
Time Spent: 7h 10m  (was: 7h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375753=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375753
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 17:20
Start Date: 22/Jan/20 17:20
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on pull request #10644: 
[BEAM-7427] Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369688135
 
 

 ##
 File path: 
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
 ##
 @@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jms;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
-import javax.jms.Message;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Checkpoint for an unbounded JmsIO.Read. Consists of JMS destination name, 
and the latest message
- * ID consumed so far.
- */
-@DefaultCoder(AvroCoder.class)
-public class JmsCheckpointMark implements UnboundedSource.CheckpointMark {
 
 Review comment:
   I agree with @iemejia - it would be more convenient to have 
`JmsCheckpointMark` as a separate class (as it was before).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375753)
Time Spent: 7h  (was: 6h 50m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375752=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375752
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 17:19
Start Date: 22/Jan/20 17:19
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on pull request #10644: 
[BEAM-7427] Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369690974
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
 
 Review comment:
   I'm also wondering if we can have potentially OOM here if the size of 
messages is large?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375752)
Time Spent: 6h 50m  (was: 6h 40m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375748=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375748
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 17:14
Start Date: 22/Jan/20 17:14
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on pull request #10644: 
[BEAM-7427] Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369690504
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+public JmsCheckpointMark() {}
+
+public void add(Message message) throws Exception {
+  lock.writeLock().lock();
+  try {
+Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
+if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
+  oldestMessageTimestamp = currentMessageTimestamp;
+}
+messages.add(message);
+  } finally {
+lock.writeLock().unlock();
+  }
+}
+
+public Instant getOldestMessageTimestamp() {
+  lock.readLock().lock();
+  try {
+return this.oldestMessageTimestamp;
+  } finally {
+lock.readLock().unlock();
+  }
+}
+
+/**
+ * Acknowledge all outstanding message. Since we believe that messages 
will be delivered in
+ * timestamp order, and acknowledged messages will not be retried, the 
newest message in this
+ * batch is a good bound for future messages.
+ */
+@Override
+public void finalizeCheckpoint() {
+  lock.writeLock().lock();
+  try {
+for (Message message : messages) {
+  try {
+message.acknowledge();
+Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
+if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
+  oldestMessageTimestamp = currentMessageTimestamp;
+}
+  } catch (Exception e) {
+LOG.error("Exception while finalizing message: {}", e);
 
 Review comment:
   Is it fine that we don't fail here in case of exception? Can it cause a data 
loss?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375748)
Time Spent: 6h 20m  (was: 6h 10m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375750=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375750
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 17:14
Start Date: 22/Jan/20 17:14
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on pull request #10644: 
[BEAM-7427] Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369690974
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
 
 Review comment:
   I'm also wondering if we can have OOM here if the size of messages is large?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375750)
Time Spent: 6h 40m  (was: 6.5h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375749=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375749
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 17:14
Start Date: 22/Jan/20 17:14
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on pull request #10644: 
[BEAM-7427] Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369688135
 
 

 ##
 File path: 
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
 ##
 @@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jms;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
-import javax.jms.Message;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Checkpoint for an unbounded JmsIO.Read. Consists of JMS destination name, 
and the latest message
- * ID consumed so far.
- */
-@DefaultCoder(AvroCoder.class)
-public class JmsCheckpointMark implements UnboundedSource.CheckpointMark {
 
 Review comment:
   I agree with @iemejia - it would more convenient to have `JmsCheckpointMark` 
as a separate class (as it was before).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375749)
Time Spent: 6.5h  (was: 6h 20m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375747=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375747
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 17:14
Start Date: 22/Jan/20 17:14
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on pull request #10644: 
[BEAM-7427] Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369685028
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
 
 Review comment:
   @jbonofre  Could this logic be implemented using `JmsCheckpointMark.State` 
class as it was before? Well, for this case, at least, we need to make `State` 
serialisable but it should not be a problem with conversion from `Message` to 
`JmsRecord`. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375747)
Time Spent: 6h 10m  (was: 6h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375745=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375745
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 17:07
Start Date: 22/Jan/20 17:07
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on pull request #10644: 
[BEAM-7427] Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369680900
 
 

 ##
 File path: 
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
 ##
 @@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jms;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
-import javax.jms.Message;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Checkpoint for an unbounded JmsIO.Read. Consists of JMS destination name, 
and the latest message
- * ID consumed so far.
- */
-@DefaultCoder(AvroCoder.class)
-public class JmsCheckpointMark implements UnboundedSource.CheckpointMark {
 
 Review comment:
   I agree with @iemejia - it would more convenient to have `JmsCheckpointMark` 
as a separate class (as it was before).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375745)
Time Spent: 6h  (was: 5h 50m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375735=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375735
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 16:54
Start Date: 22/Jan/20 16:54
Worklog Time Spent: 10m 
  Work Description: aromanenko-dev commented on pull request #10644: 
[BEAM-7427] Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369680900
 
 

 ##
 File path: 
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
 ##
 @@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jms;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
-import javax.jms.Message;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Checkpoint for an unbounded JmsIO.Read. Consists of JMS destination name, 
and the latest message
- * ID consumed so far.
- */
-@DefaultCoder(AvroCoder.class)
-public class JmsCheckpointMark implements UnboundedSource.CheckpointMark {
 
 Review comment:
   I agree with @iemejia - it would more convenient to have `JmsCheckpointMark` 
as a separate class (as it was before).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375735)
Time Spent: 5h 50m  (was: 5h 40m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375716=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375716
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 16:29
Start Date: 22/Jan/20 16:29
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369665830
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
 
 Review comment:
   The finalize checkpoint should ack the message. A better impl is:
   1. use auto_ack on session
   2. convert Message as JmsRecord and store in the checkpoint mark
   3. thanks to that, the record will be serializable and we can transport it.
   
   I'm doing the change.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375716)
Time Spent: 5h 40m  (was: 5.5h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375715=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375715
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 16:27
Start Date: 22/Jan/20 16:27
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369664609
 
 

 ##
 File path: 
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
 ##
 @@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jms;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
-import javax.jms.Message;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Checkpoint for an unbounded JmsIO.Read. Consists of JMS destination name, 
and the latest message
- * ID consumed so far.
- */
-@DefaultCoder(AvroCoder.class)
-public class JmsCheckpointMark implements UnboundedSource.CheckpointMark {
 
 Review comment:
   As the checkpoint mark is small, I used a inner class (like in MqttIO). But 
I can isolate it if you think it's better for maintenance.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375715)
Time Spent: 5.5h  (was: 5h 20m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375714=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375714
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 16:26
Start Date: 22/Jan/20 16:26
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369664034
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
 
 Review comment:
   That's not possible, simply to execute the ack on messages. So, I'm using 
the same approach as in MqttIO. Just keep the oldest timestamp, and don't ack 
the messages. Then, the messages stay in the broker and will be consumed by 
another executor.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375714)
Time Spent: 5h 20m  (was: 5h 10m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375710=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375710
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 16:24
Start Date: 22/Jan/20 16:24
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369663084
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+public JmsCheckpointMark() {}
+
+public void add(Message message) throws Exception {
+  lock.writeLock().lock();
+  try {
+Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
+if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
+  oldestMessageTimestamp = currentMessageTimestamp;
+}
+messages.add(message);
+  } finally {
+lock.writeLock().unlock();
+  }
+}
+
+public Instant getOldestMessageTimestamp() {
+  lock.readLock().lock();
+  try {
+return this.oldestMessageTimestamp;
+  } finally {
+lock.readLock().unlock();
+  }
+}
+
+/**
+ * Acknowledge all outstanding message. Since we believe that messages 
will be delivered in
+ * timestamp order, and acknowledged messages will not be retried, the 
newest message in this
+ * batch is a good bound for future messages.
+ */
+@Override
+public void finalizeCheckpoint() {
+  lock.writeLock().lock();
+  try {
+for (Message message : messages) {
+  try {
+message.acknowledge();
+Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
+if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
+  oldestMessageTimestamp = currentMessageTimestamp;
+}
+  } catch (Exception e) {
+LOG.error("Exception while finalizing message: {}", e);
+  }
+}
+messages.clear();
+  } finally {
+lock.writeLock().unlock();
+  }
+}
+
+// set an empty list to messages when deserialize
+private void readObject(java.io.ObjectInputStream stream)
+throws IOException, ClassNotFoundException {
+  stream.defaultReadObject();
+  messages = new ArrayList<>();
+}
+
+@Override
+public boolean equals(Object other) {
+  if (other instanceof JmsCheckpointMark) {
+JmsCheckpointMark that = (JmsCheckpointMark) other;
+return Objects.equals(this.oldestMessageTimestamp, 
that.oldestMessageTimestamp);
 
 Review comment:
   Messages list is transient. Do we really need it in the equals ?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375710)
Time Spent: 5h  (was: 4h 50m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375711=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375711
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 16:24
Start Date: 22/Jan/20 16:24
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369663216
 
 

 ##
 File path: 
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
 ##
 @@ -406,6 +408,26 @@ public void testCheckpointMarkSafety() throws Exception {
 runner.join();
   }
 
+  /** Test the checkpoint mark default coder, which is actually AvroCoder. */
+  @Test
+  public void testCheckpointMarkDefaultCoder() throws Exception {
+JmsIO.JmsCheckpointMark jmsCheckpointMark = new JmsIO.JmsCheckpointMark();
+jmsCheckpointMark.add(new ActiveMQMessage());
+Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
+CoderProperties.coderSerializable(coder);
+CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark);
+  }
+
+  @Test
+  public void testCheckpointMarkSerializableCoder() throws Exception {
+JmsIO.JmsCheckpointMark jmsCheckpointMark = new JmsIO.JmsCheckpointMark();
 
 Review comment:
   +1, I'm doing the change.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375711)
Time Spent: 5h 10m  (was: 5h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375654=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375654
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 15:34
Start Date: 22/Jan/20 15:34
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369616872
 
 

 ##
 File path: 
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
 ##
 @@ -406,6 +408,26 @@ public void testCheckpointMarkSafety() throws Exception {
 runner.join();
   }
 
+  /** Test the checkpoint mark default coder, which is actually AvroCoder. */
+  @Test
+  public void testCheckpointMarkDefaultCoder() throws Exception {
+JmsIO.JmsCheckpointMark jmsCheckpointMark = new JmsIO.JmsCheckpointMark();
+jmsCheckpointMark.add(new ActiveMQMessage());
+Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
+CoderProperties.coderSerializable(coder);
+CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark);
+  }
+
+  @Test
+  public void testCheckpointMarkSerializableCoder() throws Exception {
+JmsIO.JmsCheckpointMark jmsCheckpointMark = new JmsIO.JmsCheckpointMark();
 
 Review comment:
   This second test is not needed if you make 
`UnboundedJmsSource#getCheckpointMarkCoder` return the `SerializableCoder`. 
Also that is needed to get rid also of `AvroCoder` as part of these fixes.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375654)
Time Spent: 4h 40m  (was: 4.5h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375657=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375657
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 15:34
Start Date: 22/Jan/20 15:34
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369632204
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
 
 Review comment:
   Other extra bit is that `JmsRecord` equals and hashcode implementation do 
not take into account the same fields, this should be also addressed otherwise 
the equals contract might get broken.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375657)
Time Spent: 4h 50m  (was: 4h 40m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375651=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375651
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 15:33
Start Date: 22/Jan/20 15:33
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369619236
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+public JmsCheckpointMark() {}
+
+public void add(Message message) throws Exception {
+  lock.writeLock().lock();
+  try {
+Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
+if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
+  oldestMessageTimestamp = currentMessageTimestamp;
+}
+messages.add(message);
+  } finally {
+lock.writeLock().unlock();
+  }
+}
+
+public Instant getOldestMessageTimestamp() {
+  lock.readLock().lock();
+  try {
+return this.oldestMessageTimestamp;
+  } finally {
+lock.readLock().unlock();
+  }
+}
+
+/**
+ * Acknowledge all outstanding message. Since we believe that messages 
will be delivered in
+ * timestamp order, and acknowledged messages will not be retried, the 
newest message in this
+ * batch is a good bound for future messages.
+ */
+@Override
+public void finalizeCheckpoint() {
+  lock.writeLock().lock();
+  try {
+for (Message message : messages) {
+  try {
+message.acknowledge();
+Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
+if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
+  oldestMessageTimestamp = currentMessageTimestamp;
+}
+  } catch (Exception e) {
+LOG.error("Exception while finalizing message: {}", e);
+  }
+}
+messages.clear();
+  } finally {
+lock.writeLock().unlock();
+  }
+}
+
+// set an empty list to messages when deserialize
+private void readObject(java.io.ObjectInputStream stream)
+throws IOException, ClassNotFoundException {
+  stream.defaultReadObject();
+  messages = new ArrayList<>();
+}
+
+@Override
+public boolean equals(Object other) {
+  if (other instanceof JmsCheckpointMark) {
+JmsCheckpointMark that = (JmsCheckpointMark) other;
+return Objects.equals(this.oldestMessageTimestamp, 
that.oldestMessageTimestamp);
 
 Review comment:
   We need to add messages to the equals too.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375651)
Time Spent: 4h 20m  (was: 4h 10m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375652=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375652
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 15:33
Start Date: 22/Jan/20 15:33
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369620466
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
 
 Review comment:
   A `CheckpointMark` is basically an object to save pending unprocessed state, 
in the case of JmsIO pending state are the messages. If we make the messages 
`transient` we are losing data. So we need to fix this to make the messages 
part of the serialized state of the object. However since Jms `Message`s are 
not `Serializable` probably what we need to store here are `JmsRecord`s.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375652)
Time Spent: 4h 20m  (was: 4h 10m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375650=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375650
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 15:33
Start Date: 22/Jan/20 15:33
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369623895
 
 

 ##
 File path: sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
 ##
 @@ -404,9 +408,96 @@ private JmsIO() {}
 T mapMessage(Message message) throws Exception;
   }
 
+  /**
+   * Checkpoint for an unbounded JMS source. Consists of the JMS messages 
waiting to be acknowledged
+   * and oldest pending message timestamp.
+   */
+  @VisibleForTesting
+  static class JmsCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
+
+@VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+@VisibleForTesting transient List messages = new ArrayList<>();
 
 Review comment:
   Possible issue after changing the pending state to be `JmsRecord`s is the 
fact that the records need to recreate Messages to be able to ack on 
`finalizeCheckpoint`. Doing ack on the CheckpointMark is probably an error 
because you would need also the connection information, so better to move that 
logick out of the CheckpointMark.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375650)
Time Spent: 4h 20m  (was: 4h 10m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375649=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375649
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 15:33
Start Date: 22/Jan/20 15:33
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369616872
 
 

 ##
 File path: 
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
 ##
 @@ -406,6 +408,26 @@ public void testCheckpointMarkSafety() throws Exception {
 runner.join();
   }
 
+  /** Test the checkpoint mark default coder, which is actually AvroCoder. */
+  @Test
+  public void testCheckpointMarkDefaultCoder() throws Exception {
+JmsIO.JmsCheckpointMark jmsCheckpointMark = new JmsIO.JmsCheckpointMark();
+jmsCheckpointMark.add(new ActiveMQMessage());
+Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
+CoderProperties.coderSerializable(coder);
+CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark);
+  }
+
+  @Test
+  public void testCheckpointMarkSerializableCoder() throws Exception {
+JmsIO.JmsCheckpointMark jmsCheckpointMark = new JmsIO.JmsCheckpointMark();
 
 Review comment:
   This second test is not needed if you make 
`UnboundedJmsSource#getCheckpointMarkCoder` return the `SerializableCoder`. 
Also that is needed to get rid also of `AvroCoder` as part of this fixes.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375649)
Time Spent: 4h 10m  (was: 4h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375653=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375653
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 22/Jan/20 15:33
Start Date: 22/Jan/20 15:33
Worklog Time Spent: 10m 
  Work Description: iemejia commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#discussion_r369626193
 
 

 ##
 File path: 
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java
 ##
 @@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.jms;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
-import javax.jms.Message;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Checkpoint for an unbounded JmsIO.Read. Consists of JMS destination name, 
and the latest message
- * ID consumed so far.
- */
-@DefaultCoder(AvroCoder.class)
-public class JmsCheckpointMark implements UnboundedSource.CheckpointMark {
 
 Review comment:
   Better leave this as a separate class as it was, no? KafkaIO got splitted 
because it was almost unmanageable with so much code in one file, or is there a 
fundamental reason to do this?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375653)
Time Spent: 4.5h  (was: 4h 20m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375114=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375114
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 21/Jan/20 18:03
Start Date: 21/Jan/20 18:03
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #8757: [BEAM-7427] Fix 
JmsCheckpointMark Avro Encoding
URL: https://github.com/apache/beam/pull/8757#issuecomment-576805331
 
 
   Superseded by #10644 10644
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375114)
Time Spent: 3h 50m  (was: 3h 40m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375115=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375115
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 21/Jan/20 18:03
Start Date: 21/Jan/20 18:03
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on pull request #8757: [BEAM-7427] 
Fix JmsCheckpointMark Avro Encoding
URL: https://github.com/apache/beam/pull/8757
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375115)
Time Spent: 4h  (was: 3h 50m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375112=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375112
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 21/Jan/20 18:02
Start Date: 21/Jan/20 18:02
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on pull request #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644
 
 
   This refactoring uses JmsCheckpointMark coder compatible.
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [X] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [X] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [X] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=375113=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375113
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 21/Jan/20 18:02
Start Date: 21/Jan/20 18:02
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #10644: [BEAM-7427] 
Refactore JmsCheckpointMark to be usage via Coder
URL: https://github.com/apache/beam/pull/10644#issuecomment-576805090
 
 
   R: @iemejia @aromanenko-dev 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 375113)
Time Spent: 3h 40m  (was: 3.5h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=374939=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-374939
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 21/Jan/20 12:17
Start Date: 21/Jan/20 12:17
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #8757: [BEAM-7427] Fix 
JmsCheckpointMark Avro Encoding
URL: https://github.com/apache/beam/pull/8757#issuecomment-576656045
 
 
   I think I will use the same approach as I did in MQTT with transient. I'm 
testing it.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 374939)
Time Spent: 3h 20m  (was: 3h 10m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=374836=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-374836
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 21/Jan/20 06:49
Start Date: 21/Jan/20 06:49
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #8757: [BEAM-7427] Fix 
JmsCheckpointMark Avro Encoding
URL: https://github.com/apache/beam/pull/8757#issuecomment-576542230
 
 
   I'm evaluating the following options:
   0. Add a test with AvroCoder on checkpoint mark
   1. use JmsRecord in the checkpoint instead of Message to ack. The issue is 
how to "load" Message from JmsRecord.
   2. use Session to ack instead of message. The Session stores the unack 
messages, so the checkpoint mark could delegate to the session.
   
   I'm evaluating the different options. I keep you posted soon.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 374836)
Time Spent: 3h 10m  (was: 3h)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=368277=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-368277
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 08/Jan/20 17:14
Start Date: 08/Jan/20 17:14
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #8757: [BEAM-7427] Fix 
JmsCheckpointMark Avro Encoding
URL: https://github.com/apache/beam/pull/8757#issuecomment-572167618
 
 
   Hi all, sorry I forgot to move forward on this one.
   
   I'm resuming my work on it. Sorry for the delay.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 368277)
Time Spent: 3h  (was: 2h 50m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7427) JmsCheckpointMark Avro Serialization issue with UnboundedSource

2020-01-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7427?focusedWorklogId=368247=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-368247
 ]

ASF GitHub Bot logged work on BEAM-7427:


Author: ASF GitHub Bot
Created on: 08/Jan/20 16:23
Start Date: 08/Jan/20 16:23
Worklog Time Spent: 10m 
  Work Description: kolban-google commented on issue #8757: [BEAM-7427] Fix 
JmsCheckpointMark Avro Encoding
URL: https://github.com/apache/beam/pull/8757#issuecomment-572146349
 
 
   I built the patch using this Pull Request and initially thought that it had 
solved the puzzle.  However, very quickly I found that I was failing with 
[BEAM-4409](https://issues.apache.org/jira/browse/BEAM-4409).  As I now read 
this Pull Request in detail, I seem to see that y'all are aware of this too.  
The last I seem to see is from mr @jbonofre back in 2019-09-04 where there is a 
comment that it is being worked upon.  Are there any updates on this?  Any 
approximate timeframe for a patch to test?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 368247)
Time Spent: 2h 50m  (was: 2h 40m)

> JmsCheckpointMark Avro Serialization issue with UnboundedSource
> ---
>
> Key: BEAM-7427
> URL: https://issues.apache.org/jira/browse/BEAM-7427
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-jms
>Affects Versions: 2.12.0
> Environment: Message Broker : solace
> JMS Client (Over AMQP) : "org.apache.qpid:qpid-jms-client:0.42.0
>Reporter: Mourad
>Assignee: Mourad
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> I get the following exception when reading from unbounded JMS Source:
>   
> {code:java}
> Caused by: org.apache.avro.SchemaParseException: Illegal character in: this$0
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.(Schema.java:403)
> at org.apache.avro.Schema$Field.(Schema.java:396)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:622)
> at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740)
> at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
> at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
> at 
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
> at 
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
> {code}
>  
> The exception is thrown by Avro when introspecting {{JmsCheckpointMark}} to 
> generate schema.
> JmsIO config :
>  
> {code:java}
> PCollection messages = pipeline.apply("read messages from the 
> events broker", JmsIO.readMessage() 
> .withConnectionFactory(jmsConnectionFactory) .withTopic(options.getTopic()) 
> .withMessageMapper(new DFAMessageMapper()) 
> .withCoder(AvroCoder.of(DFAMessage.class)));
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)