[ 
https://issues.apache.org/jira/browse/SAMZA-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shanthoosh Venkataraman updated SAMZA-2167:
-------------------------------------------
    Summary: Should not close the MetadataStore at the end of 
ProcessJobFactory.  (was: Should not close the MetadataStore at the end of 
ProcessJobFactory .)

> Should not close the MetadataStore at the end of ProcessJobFactory.
> -------------------------------------------------------------------
>
>                 Key: SAMZA-2167
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2167
>             Project: Samza
>          Issue Type: Improvement
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> Currently in ProcessJobFactory the metadata-store connection is closed after 
> generating the JobModel. 
> To read from coordinator stream only once in samza-yarn ApplicationMaster, we 
> ended up making the LocalityManager, TaskAssignmentManager, 
> ChanglogStreamManager etc.  However, after the above change closing the 
> metadata store in ProcessJobFactory after generating the JobModel, results in 
> the following exception when the servlet API queries the JobModel:
> {code:java}
> org.codehaus.jackson.map.JsonMappingException: 
> samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy 
> has stopped. (through reference chain: 
> org.apache.samza.job.model.JobModel["all-container-locality"])
>       at 
> org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218)
>       at 
> org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183)
>       at 
> org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140)
>       at 
> org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158)
>       at 
> org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112)
>       at 
> org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610)
>       at 
> org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256)
>       at 
> org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575)
>       at 
> org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081)
>       at 
> org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39)
>       at 
> org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32)
>       at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
>       at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800)
>       at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
>       at 
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
>       at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
>       at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
>       at 
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
>       at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>       at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
>       at org.eclipse.jetty.server.Server.handle(Server.java:497)
>       at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
>       at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245)
>       at 
> org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.samza.SamzaException: 
> samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy 
> has stopped.
>       at 
> org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311)
>       at 
> org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
>       at 
> org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
>       at 
> org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154)
>       at 
> org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148)
>       at 
> org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98)
>       at 
> org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74)
>       at 
> org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63)
>       at 
> org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123)
>       at 
> org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:483)
>       at 
> org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483)
>       at 
> org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418)
>       at 
> org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150)
>       ... 25 more
> 2019-04-16 12:16:46.090 [qtp220371218-112] HttpChannel [WARN] /
> org.codehaus.jackson.map.JsonMappingException: 
> samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy 
> has stopped. (through reference chain: 
> org.apache.samza.job.model.JobModel["all-container-locality"])
>       at 
> org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:218)
>       at 
> org.codehaus.jackson.map.JsonMappingException.wrapWithPath(JsonMappingException.java:183)
>       at 
> org.codehaus.jackson.map.ser.std.SerializerBase.wrapAndThrow(SerializerBase.java:140)
>       at 
> org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:158)
>       at 
> org.codehaus.jackson.map.ser.BeanSerializer.serialize(BeanSerializer.java:112)
>       at 
> org.codehaus.jackson.map.ser.StdSerializerProvider._serializeValue(StdSerializerProvider.java:610)
>       at 
> org.codehaus.jackson.map.ser.StdSerializerProvider.serializeValue(StdSerializerProvider.java:256)
>       at 
> org.codehaus.jackson.map.ObjectMapper._configAndWriteValue(ObjectMapper.java:2575)
>       at 
> org.codehaus.jackson.map.ObjectMapper.writeValue(ObjectMapper.java:2081)
>       at 
> org.apache.samza.coordinator.server.ServletBase$class.doGet(ServletBase.scala:39)
>       at 
> org.apache.samza.coordinator.server.JobServlet.doGet(JobServlet.scala:32)
>       at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
>       at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
>       at 
> org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:800)
>       at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
>       at 
> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
>       at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
>       at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
>       at 
> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
>       at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
>       at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>       at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
>       at org.eclipse.jetty.server.Server.handle(Server.java:497)
>       at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
>       at 
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:245)
>       at 
> org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
>       at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.samza.SamzaException: 
> samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy 
> has stopped.
>       at 
> org.apache.samza.system.kafka.KafkaSystemConsumer.poll(KafkaSystemConsumer.java:311)
>       at 
> org.apache.samza.system.SystemStreamPartitionIterator.refresh(SystemStreamPartitionIterator.java:87)
>       at 
> org.apache.samza.system.SystemStreamPartitionIterator.hasNext(SystemStreamPartitionIterator.java:58)
>       at 
> org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.readMessagesFromCoordinatorStream(CoordinatorStreamStore.java:154)
>       at 
> org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.all(CoordinatorStreamStore.java:148)
>       at 
> org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.readMessagesFromCoordinatorStore(NamespaceAwareCoordinatorStreamStore.java:98)
>       at 
> org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore.all(NamespaceAwareCoordinatorStreamStore.java:74)
>       at 
> org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:63)
>       at 
> org.apache.samza.job.model.JobModel.populateContainerLocalityMappings(JobModel.java:123)
>       at 
> org.apache.samza.job.model.JobModel.getAllContainerLocality(JobModel.java:135)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:483)
>       at 
> org.codehaus.jackson.map.ser.BeanPropertyWriter.get(BeanPropertyWriter.java:483)
>       at 
> org.codehaus.jackson.map.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:418)
>       at 
> org.codehaus.jackson.map.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:150)
>       ... 25 more
> 2019-04-16 12:16:46.091 [qtp220371218-112] HttpChannel [WARN] Could not send 
> response error 500: org.codehaus.jackson.map.JsonMappingException: 
> samzametadatasystem:kafka_consumer-hello_beam_yarn-i001: KafkaConsumerProxy 
> has stopped. (through reference chain: 
> org.apache.samza.job.model.JobModel["all-container-locality"])
> {code}
> The above exception causes the local deployment to fail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to